Permalink
Browse files

changed the socket name in the worker, also just ran a test of runnin…

…g test_worker.py multiple times and it worked, there is no requirement other than telling it what to connect to.
  • Loading branch information...
joerussbowman committed May 31, 2011
1 parent deecfe6 commit 663cca669d7f968a6f0237e0b4af0a327e831aee
Showing with 9 additions and 9 deletions.
  1. +9 −9 scale0/test_worker.py
View
@@ -17,10 +17,10 @@ def __init__(self, connect_to, listen_on="tcp://127.0.0.1:9080"):
self.loop = ioloop.IOLoop.instance()
self.listen_on = listen_on
- self.broker_socket = self.context.socket(zmq.XREQ)
- self.broker_socket.setsockopt(zmq.IDENTITY, "%s" % self.my_id)
- self.broker_socket.connect(connect_to)
- self.broker_stream = zmqstream.ZMQStream(self.broker_socket, self.loop)
+ self.xreq_socket = self.context.socket(zmq.XREQ)
+ self.xreq_socket.setsockopt(zmq.IDENTITY, "%s" % self.my_id)
+ self.xreq_socket.connect(connect_to)
+ self.xreq_stream = zmqstream.ZMQStream(self.xreq_socket, self.loop)
self.sub_socket = self.context.socket(zmq.SUB)
self.sub_socket.connect("tcp://127.0.0.1:8082")
@@ -40,7 +40,7 @@ def __init__(self, connect_to, listen_on="tcp://127.0.0.1:9080"):
"""
self.connection_state = 0
- self.broker_stream.on_recv(self.broker_handler)
+ self.xreq_stream.on_recv(self.xreq_handler)
self.sub_stream.on_recv(self.sub_handler)
ioloop.DelayedCallback(self.connect, 1000, self.loop).start()
@@ -53,9 +53,9 @@ def send_heartbeat(self):
self.heartbeat_stamp = str(time.time())
print 'sending heartbeat %s' % self.heartbeat_stamp
self.heartbeats.append(self.heartbeat_stamp)
- self.broker_socket.send_multipart(["HEARTBEAT", self.heartbeat_stamp])
+ self.xreq_socket.send_multipart(["HEARTBEAT", self.heartbeat_stamp])
- def broker_handler(self, msg):
+ def xreq_handler(self, msg):
(command, request) = msg
if command == "HEARTBEAT":
if request == self.heartbeat_stamp:
@@ -68,7 +68,7 @@ def broker_handler(self, msg):
def sub_handler(self, msg):
""" Trying to move to pub/sub for getting messages to workers. """
if msg[0] == "PING":
- self.broker_socket.send_multipart(msg)
+ self.xreq_socket.send_multipart(msg)
if msg[0] == self.my_id:
(id, command) = msg[:2]
if command == "OK":
@@ -81,7 +81,7 @@ def sub_handler(self, msg):
def connect(self):
if self.connection_state < 1:
print 'connecting to broker'
- self.broker_socket.send_multipart(["READY",
+ self.xreq_socket.send_multipart(["READY",
"test"])
self.connection_state = 1

0 comments on commit 663cca6

Please sign in to comment.