Skip to content

Commit

Permalink
going ahead and plugging in router as I go, would have to tear the ot…
Browse files Browse the repository at this point in the history
…her method skipping it apart to do it, but have to quit for now so getting a commit and push to github in, just in case I work on this with another machine
  • Loading branch information
joerussbowman committed May 13, 2011
1 parent 3e96879 commit abe5ead
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 12 deletions.
4 changes: 0 additions & 4 deletions docs/protocol.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ PING: This is the standard heartbeat request. When a Broker receives a PING
config_version:
Worker only, configuration version.

type: Added by the Dispatcher, this will be Client or Worker depending
on which socket the PING comes in on. If it's a Client it will not be
added to the LRU Queue.

PONG: This is the response to PING
Parameters:
time:
Expand Down
54 changes: 46 additions & 8 deletions playground/scale0.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ def __init__(self, connect_to):

self.listener_socket.send_json(message)


# sender, conn_id, service, body = msg.split(' ', 3)


class Dispatcher():
""" The Dispatcher will accept requests on the client_socket,
then pull a worker from the LRU Queue and pass it to a Router
Expand All @@ -86,6 +90,17 @@ def __init__(self,
routers=2):

self.my_id = my_id

# LRU Queue would look something like
# [
# {"connection": "tcp://127.0.0.1:55555", "services": ["web"]}
# {"connection": "tcp://127.0.0.1:55556", "services": ["web"]}
# {"connection": "tcp://127.0.0.1:44444", "services": ["news", "mail"]}
# ]
# Eventually I'll move it to an object with getter and setters which
# can use something like gaeutilities event to notify the main
# application when a worker is added. That way requests don't
# get dropped.
self.LRU = []

self.context = zmq.Context()
Expand All @@ -105,27 +120,51 @@ def __init__(self,
self.client_socket.bind(client_socket_uri)
self.worker_socket.bind(worker_socket_url)
self.dispatcher_socket.bind("%s/%s_dispatcher" % (dispatcher_socket_base,
self.my_id)
self.router__response_socket.bind("%s/%s_router_response" % (
router_response_socket_base, self.my_id)
self.my_id))
self.router_response_socket.bind("%s/%s_router_response" % (
router_response_socket_base, self.my_id))

poller = zmq.Poller()
poller.register(client_socket, zmq.POLLIN)

while True:
sock = dict(poller.poll())

if sock.get(client_socket) == zmq.POLLIN:
multi_message = client_socket.recv_multipart()
if sock.get(worker_socket) == zmq.POLLIN:
# from the worker we are expecting a multipart message,
# part0 = protocol command, part1 = rest
message_parts = worker_socket.recv_multipart()
(command, request) = message_parts
if command.upper() == "PING":
# PING is "uri services time"
# services are comma delimited
(uri, services, time) = request.split(" ", 3)
# TODO: validate time here
self.LRU.append({"connection": uri,
"services": services.split(",")})

if sock.get(client_socket) == zmq.POLLIN:
# from the client we are expecting a multipart message
# part0 = service, part1 = request
message_parts = client_socket.recv_multipart()
(service, request) = message_parts
conn = None
for i in LRU:
if service in i["services"]:
connection = i["connection"]
LRU.remove(i)
break
# TODO: Here would be where to plug in something
# if conn == None so that new listeners can be checked for
dispatcher_socket.send_multipart([connection, request])


class Router():
""" stub of how I think it will work, stopping this and going
to just manage everything with the dispatcher at first. This is getting a
bit complicated and I want to stop and do one thing at a time"""

def__init__(self, context,
def __init__(self, context,
dispatcher_socket_uri,
router_response_socket_uri,
port,
Expand All @@ -142,7 +181,6 @@ class Router():
self.response_socket = context.socket(zmq.ROUTER)
self.dispatcher_socket.setsockopt(zmq.IDENTITY,
"%s-router-response" % self.my_id)
self.dispatcher_socket.bind(

poller = zmq.Poller()
poller.register(dispatcher_socket, zmq.POLLIN)
Expand All @@ -152,7 +190,7 @@ class Router():

if sock.get(dispatcher_socket) == zmq.POLLIN:
multi_message = work_receiver.recv_multipart()
transport = json.loads(multi_message[0])




Expand Down

0 comments on commit abe5ead

Please sign in to comment.