Skip to content

Commit

Permalink
Use semaphores and deques to synchronize access to message queues.
Browse files Browse the repository at this point in the history
This method avoids using sleep()s which slow down the system by
delaying message handling even when there are available messages.
The system is now more "event-based" and so doesn't require sleeping.
The receive() method also doesn't need to return None anymore when
the inbound_queue is empty.
  • Loading branch information
mtahmed committed Dec 10, 2013
1 parent 8322f12 commit d0804dc
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 46 deletions.
5 changes: 1 addition & 4 deletions master.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Standard imports
import inspect
import json
import time

# Custom imports
import job
Expand Down Expand Up @@ -78,10 +77,8 @@ def worker(self):
updates from slaves etc.
'''
while True:
# This blocks if inbound_queue in messenger empty.
address, msg = self.messenger.receive(return_payload=False)
if msg is None:
time.sleep(2)
continue

msg_type = msg.msg_type
if msg_type == message.Message.MSG_STATUS:
Expand Down
65 changes: 28 additions & 37 deletions messenger.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Standard imports
import collections
import select
import socket
import time
import threading

# Custom imports
Expand Down Expand Up @@ -37,8 +37,10 @@ def __init__(self, port=DEFAULT_PORT):
self.address_to_hostname = {}
# Both inbound_queue and outbound_queue contain tuples of
# (address, message) that are received or need to be sent out.
self.inbound_queue = []
self.outbound_queue = []
self.inbound_queue = collections.deque()
self.outbound_queue = collections.deque()
self.outbound_queue_sem = threading.Semaphore(value=0)
self.inbound_queue_sem = threading.Semaphore(value=0)
# This dict is used to keep track of MessageTracker objects which can
# be used to track message status.
self.trackers = {}
Expand Down Expand Up @@ -72,31 +74,26 @@ def register_destination(self, hostname, address):
self.address_to_hostname[address] = hostname

def receive(self, return_payload=True):
'''
This method checks this messenger's inbound_queue and if its not empty,
it returns the next element from the queue.
'''Return the next message from the inbound_queue.
:param return_payload: If True, the message payload is deserialized
and returned instead of the message itself.
and returned instead of the Message object itself.
'''
if self.inbound_queue:
msg = self.inbound_queue[0]
self.inbound_queue = self.inbound_queue[1:]

if not return_payload:
return msg

msg_type = msg.msg_type
if msg_type == message.Message.MSG_STATUS:
return int(msg.msg_payload.decode('UTF-8'))
elif msg_type == message.Message.MSG_TASKUNIT:
return taskunit.TaskUnit.deserialize(msg.msg_payload.decode('UTF-8'))
elif msg_type == message.Message.MSG_TASKUNIT_RESULT:
return taskunit.TaskUnit.deserialize(msg.msg_payload.decode('UTF-8'))
elif msg_type == message.Message.MSG_JOB:
return job.Job.deserialize(msg.msg_payload.decode('UTF-8'))
else:
return (None, None)
self.inbound_queue_sem.acquire()
msg = self.inbound_queue.popleft()

if not return_payload:
return msg

msg_type = msg.msg_type
if msg_type == message.Message.MSG_STATUS:
return int(msg.msg_payload.decode('UTF-8'))
elif msg_type == message.Message.MSG_TASKUNIT:
return taskunit.TaskUnit.deserialize(msg.msg_payload.decode('UTF-8'))
elif msg_type == message.Message.MSG_TASKUNIT_RESULT:
return taskunit.TaskUnit.deserialize(msg.msg_payload.decode('UTF-8'))
elif msg_type == message.Message.MSG_JOB:
return job.Job.deserialize(msg.msg_payload.decode('UTF-8'))

def send_status(self, status, address, track=False):
'''
Expand Down Expand Up @@ -190,9 +187,9 @@ def queue_for_sending(self, messages, address):
outbound_queue.
NOTE: This method takes a list of messages and not a single message.
'''
self.outbound_queue.extend([(address, message)
for message
in messages])
for message in messages:
self.outbound_queue.append((address, message))
self.outbound_queue_sem.release()

##### Message-specific methods.
def delete_tracker(self, tracker):
Expand Down Expand Up @@ -308,11 +305,8 @@ def sender(messenger):

messenger.logger.log("Sender up!")
while True:
if len(messenger.outbound_queue) == 0:
time.sleep(3.0)
continue
else:
address, msg = messenger.outbound_queue[0]
messenger.outbound_queue_sem.acquire()
address, msg = messenger.outbound_queue.popleft()

messenger.logger.log("Sending message to %s:%d" % address)
# While the msg is still not sent...
Expand All @@ -323,7 +317,6 @@ def sender(messenger):
# If we can send...
if event & select.EPOLLOUT:
bytes_sent = messenger.socket.sendto(msg, address)
messenger.outbound_queue = messenger.outbound_queue[1:]
# If we have a tracker for this msg, then we need to
# mark it as sent if this is the last frag for the msg
# being sent out.
Expand All @@ -339,9 +332,6 @@ def sender(messenger):
break
else:
messenger.logger.log("Unexpected event on sender socket.")
else:
# Sleep for 0.5 seconds if we couldn't send out.
time.sleep(0.5)

@staticmethod
def receiver(messenger):
Expand Down Expand Up @@ -400,6 +390,7 @@ def receiver(messenger):
messenger.delete_tracker(tracker)
continue
messenger.inbound_queue.append((address, catted_msg))
messenger.inbound_queue_sem.release()
# Send an ack now that we have received the msg.
messenger.send_ack(catted_msg, address)
del fragments_map[msg.msg_id]
Expand Down
9 changes: 4 additions & 5 deletions slave.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def __init__(self, port, ip=None):
self.messenger.register_destination(master_hostname,
(master_ip, master_port))

# When everything is setup, associate with the master.
# When everything is setup, associate with the master(s).
self.associate()

return
Expand Down Expand Up @@ -80,7 +80,7 @@ def associate(self):
self.messenger.delete_tracker(tracker)
unacked_masters[index] = None
num_unacked_masters -= 1
time.sleep(10.0)
time.sleep(0.05)
return

def worker(self):
Expand All @@ -96,10 +96,9 @@ def worker(self):
back to the master through the messenger.
'''
while True:
# This blocks if inbound_queue in messenger empty.
address, msg = self.messenger.receive(return_payload=False)
if msg is None:
time.sleep(2)
continue

if msg.msg_type == message.Message.MSG_TASKUNIT:
object_dict = msg.msg_payload.decode('utf-8')
tu = taskunit.TaskUnit.deserialize(object_dict)
Expand Down

0 comments on commit d0804dc

Please sign in to comment.