Skip to content

Commit

Permalink
use zmqstream instead of pure sockets
Browse files Browse the repository at this point in the history
  • Loading branch information
minrk committed May 23, 2011
1 parent 81393a8 commit f6bb8d5
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 18 deletions.
19 changes: 11 additions & 8 deletions playground/scale0.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import zmq
import uuid
import tnetstrings
from zmq.eventloop import ioloop
from zmq.eventloop import ioloop, zmqstream

class Dispatcher():
def __init__(self,
Expand Down Expand Up @@ -53,20 +53,22 @@ def __init__(self,
self.LRU = []
self.pings = []

self.context = zmq.Context()
self.context = zmq.Context.instance()
self.loop = ioloop.IOLoop.instance()

self.worker_xrep_socket = self.context.socket(zmq.XREP)
self.worker_xrep_socket.setsockopt(zmq.IDENTITY, "%s-worker" % self.my_id)
self.worker_xrep_socket.bind(worker_xrep_socket_uri)

self.worker_xrep_stream = zmqstream.ZMQStream(self.worker_xrep_socket, self.loop)
self.worker_xrep_stream.on_recv(self.worker_handler)

self.loop = ioloop.IOLoop.instance()

self.loop.add_handler(self.worker_xrep_socket, self.worker_handler, zmq.POLLIN)
# self.loop.add_handler(self.worker_xrep_socket, self.worker_handler, zmq.POLLIN)
ioloop.PeriodicCallback(self.send_pings, self.heartbeat_interval, self.loop).start()

self.loop.start()

def worker_handler(self, sock, events):
def worker_handler(self, message):
""" worker_handler handles messages from worker sockets. Messages
are 3+ part ZeroMQ multipart messages. (worker_id, command, request).
Expand All @@ -80,8 +82,9 @@ def worker_handler(self, sock, events):
request is the rest of the message, can be multiple parts and Scale0
will generally ignore it except to pass it on.
"""
sock = self.worker_xrep_stream

message = sock.recv_multipart()
# message = sock.recv_multipart()
getattr(self, message[1].lower())(sock, message)

def send_pings(self):
Expand Down Expand Up @@ -111,7 +114,7 @@ def heartbeat(self, sock, message):
""" For heartbeat we just shoot the request right back at the sender.
Don't even bother to parse anything to save time.
"""
sock.send_multipart(sock.recv_multipart())
sock.send_multipart(message)

def ready(self, sock, message):
""" ready is the worker informing Scale0 it can accept more jobs.
Expand Down
24 changes: 14 additions & 10 deletions playground/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import zmq
import uuid
import tnetstrings
from zmq.eventloop import ioloop
from zmq.eventloop import ioloop, zmqstream

class Worker():
def __init__(self, connect_to, listen_on="tcp://127.0.0.1:9080"):
Expand All @@ -13,31 +13,35 @@ def __init__(self, connect_to, listen_on="tcp://127.0.0.1:9080"):
Router in the Broker will make requests to.
"""
self.my_id = str(uuid.uuid4())
self.context = zmq.Context()
self.context = zmq.Context.instance()
self.loop = ioloop.IOLoop.instance()
self.listen_on = listen_on

self.broker_socket = self.context.socket(zmq.XREQ)
self.broker_socket.setsockopt(zmq.IDENTITY, "broker-%s" % self.my_id)
self.broker_socket.connect(connect_to)
self.broker_stream = zmqstream.ZMQStream(self.broker_socket, self.loop)

self.listener_socket = self.context.socket(zmq.XREP)
self.listener_socket.setsockopt(zmq.IDENTITY, "listener-%s" % self.my_id)
self.listener_socket.bind(self.listen_on)
self.listener_stream = zmqstream.ZMQStream(self.listener_socket, self.loop)

self.heartbeat_stamp = None
self.heartbeats = []

self.loop = ioloop.IOLoop.instance()

""" self.connection_state can be 1 of 3 ints
0: not connected (not in LRU queue on broker)
1: connection pending (READY sent)
2: connected (OK recieved, in LRU queue)
"""
self.connection_state = 0

self.loop.add_handler(self.broker_socket, self.broker_handler, zmq.POLLIN)
self.loop.add_handler(self.listener_socket, self.listener_handler, zmq.POLLIN)

self.broker_stream.on_recv(self.broker_handler)
self.listener_stream.on_recv(self.listener_handler)
# self.loop.add_handler(self.broker_socket, self.broker_handler, zmq.POLLIN)
# self.loop.add_handler(self.listener_socket, self.listener_handler, zmq.POLLIN)

ioloop.DelayedCallback(self.connect, 1000, self.loop).start()
ioloop.PeriodicCallback(self.send_heartbeat, 1000, self.loop).start()
Expand All @@ -50,8 +54,8 @@ def send_heartbeat(self):
self.heartbeats.append(self.heartbeat_stamp)
self.broker_socket.send_multipart(["HEARTBEAT", self.heartbeat_stamp])

def broker_handler(self, sock, events):
(command, request) = sock.recv_multipart()
def broker_handler(self, msg):
(command, request) = msg
if command == "OK":
self.connect_state = 2
print 'In LRU Queue'
Expand All @@ -63,8 +67,8 @@ def broker_handler(self, sock, events):
self.heartbeats.remove(request)
print self.heartbeats

def listener_handler(self, sock, events):
(sock_id, command, request) = sock.recv_multipart()
def listener_handler(self, msg):
(sock_id, command, request) = msg
if command == "PING":
print 'got ping %s' % request
self.broker_socket.send_multipart(["PONG", request])
Expand Down

0 comments on commit f6bb8d5

Please sign in to comment.