Permalink
Browse files

general parallel code cleanup

  • Loading branch information...
1 parent 077c1af commit b8dd49c8e1257d51af09c545c964031c3b5517f4 @minrk minrk committed Oct 26, 2010

Large diffs are not rendered by default.

Oops, something went wrong.
@@ -130,7 +130,7 @@ def handle_heart_failure(self, heart):
for handler in self._failure_handlers:
try:
handler(heart)
- except Exception, e:
+ except Exception as e:
print (e)
logger.error("heartbeat::Bad Handler! %s"%handler)
pass
@@ -1,3 +1,10 @@
+"""The Python scheduler for rich scheduling.
+
+The Pure ZMQ scheduler does not allow routing schemes other than LRU,
+nor does it check msg_id DAG dependencies. For those, a slightly slower
+Python Scheduler exists.
+"""
+
#----------------------------------------------------------------------
# Imports
#----------------------------------------------------------------------
@@ -40,7 +47,7 @@ def plainrandom(loads):
def lru(loads):
"""Always pick the front of the line.
- The content of loads is ignored.
+ The content of `loads` is ignored.
Assumes LRU ordering of loads, with oldest first.
"""
@@ -151,10 +158,12 @@ def __init__(self, client_stream, engine_stream, mon_stream,
self.notifier_stream.on_recv(self.dispatch_notification)
def resume_receiving(self):
- """resume accepting jobs"""
+ """Resume accepting jobs."""
self.client_stream.on_recv(self.dispatch_submission, copy=False)
def stop_receiving(self):
+ """Stop accepting jobs while there are no engines.
+ Leave them in the ZMQ queue."""
self.client_stream.on_recv(None)
#-----------------------------------------------------------------------
@@ -176,7 +185,7 @@ def dispatch_notification(self, msg):
logger.error("task::Invalid notification msg: %s"%msg)
@logged
def _register_engine(self, uid):
- """new engine became available"""
+ """New engine with ident `uid` became available."""
# head of the line:
self.targets.insert(0,uid)
self.loads.insert(0,0)
@@ -187,10 +196,12 @@ def _register_engine(self, uid):
self.resume_receiving()
def _unregister_engine(self, uid):
- """existing engine became unavailable"""
- # handle any potentially finished tasks:
+ """Existing engine with ident `uid` became unavailable."""
if len(self.targets) == 1:
+ # this was our only engine
self.stop_receiving()
+
+ # handle any potentially finished tasks:
self.engine_stream.flush()
self.completed.pop(uid)
@@ -203,7 +214,7 @@ def _unregister_engine(self, uid):
self.handle_stranded_tasks(lost)
def handle_stranded_tasks(self, lost):
- """deal with jobs resident in an engine that died."""
+ """Deal with jobs resident in an engine that died."""
# TODO: resubmit the tasks?
for msg_id in lost:
pass
@@ -214,26 +225,29 @@ def handle_stranded_tasks(self, lost):
#-----------------------------------------------------------------------
@logged
def dispatch_submission(self, raw_msg):
- """dispatch job submission"""
+ """Dispatch job submission to appropriate handlers."""
# ensure targets up to date:
self.notifier_stream.flush()
try:
idents, msg = self.session.feed_identities(raw_msg, copy=False)
- except Exception, e:
+ except Exception as e:
logger.error("task::Invaid msg: %s"%msg)
return
msg = self.session.unpack_message(msg, content=False, copy=False)
header = msg['header']
msg_id = header['msg_id']
+
+ # time dependencies
after = Dependency(header.get('after', []))
if after.mode == 'all':
after.difference_update(self.all_done)
if after.check(self.all_done):
- # recast as empty set, if we are already met,
- # to prevent
+ # recast as empty set, if `after` already met,
+ # to prevent unnecessary set comparisons
after = Dependency([])
+ # location dependencies
follow = Dependency(header.get('follow', []))
if len(after) == 0:
# time deps already met, try to run
@@ -244,6 +258,7 @@ def dispatch_submission(self, raw_msg):
self.save_unmet(msg_id, raw_msg, after, follow)
# send to monitor
self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
+
@logged
def maybe_run(self, msg_id, raw_msg, follow=None):
"""check location dependencies, and run if they are met."""
@@ -276,7 +291,7 @@ def save_unmet(self, msg_id, msg, after, follow):
@logged
def submit_task(self, msg_id, msg, follow=None, indices=None):
- """submit a task to any of a subset of our targets"""
+ """Submit a task to any of a subset of our targets."""
if indices:
loads = [self.loads[i] for i in indices]
else:
@@ -290,6 +305,8 @@ def submit_task(self, msg_id, msg, follow=None, indices=None):
self.engine_stream.send_multipart(msg, copy=False)
self.add_job(idx)
self.pending[target][msg_id] = (msg, follow)
+ content = dict(msg_id=msg_id, engine_id=target)
+ self.session.send(self.mon_stream, 'task_destination', content=content, ident='tracktask')
#-----------------------------------------------------------------------
# Result Handling
@@ -298,7 +315,7 @@ def submit_task(self, msg_id, msg, follow=None, indices=None):
def dispatch_result(self, raw_msg):
try:
idents,msg = self.session.feed_identities(raw_msg, copy=False)
- except Exception, e:
+ except Exception as e:
logger.error("task::Invaid result: %s"%msg)
return
msg = self.session.unpack_message(msg, content=False, copy=False)
@@ -125,7 +125,7 @@ def __call__(self, prompt=None):
while True:
try:
reply = self.socket.recv_json(zmq.NOBLOCK)
- except zmq.ZMQError, e:
+ except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
pass
else:
@@ -171,7 +171,7 @@ def abort_queue(self, stream):
while True:
try:
msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
- except zmq.ZMQError, e:
+ except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
break
else:
@@ -238,17 +238,6 @@ def dispatch_control(self, msg):
else:
handler(self.control_stream, idents, msg)
- # def flush_control(self):
- # while any(zmq.select([self.control_socket],[],[],1e-4)):
- # try:
- # msg = self.control_socket.recv_multipart(zmq.NOBLOCK, copy=False)
- # except zmq.ZMQError, e:
- # if e.errno != zmq.EAGAIN:
- # raise e
- # return
- # else:
- # self.dispatch_control(msg)
-
#-------------------- queue helpers ------------------------------
@@ -8,6 +8,7 @@
import traceback
import pprint
import uuid
+from datetime import datetime
import zmq
from zmq.utils import jsonapi
@@ -111,6 +112,7 @@ def __getitem__(self, k):
def msg_header(msg_id, msg_type, username, session):
+ date=datetime.now().isoformat()
return locals()
# return {
# 'msg_id' : msg_id,
@@ -140,7 +142,7 @@ def extract_header(msg_or_header):
return h
def rekey(dikt):
- """rekey a dict that has been forced to use str keys where there should be
+ """Rekey a dict that has been forced to use str keys where there should be
ints by json. This belongs in the jsonutil added by fperez."""
for k in dikt.iterkeys():
if isinstance(k, str):
@@ -162,11 +164,22 @@ def rekey(dikt):
return dikt
def serialize_object(obj, threshold=64e-6):
- """serialize an object into a list of sendable buffers.
+ """Serialize an object into a list of sendable buffers.
- Returns: (pmd, bufs)
- where pmd is the pickled metadata wrapper, and bufs
- is a list of data buffers"""
+ Parameters
+ ----------
+
+ obj : object
+ The object to be serialized
+ threshold : float
+ The threshold for not double-pickling the content.
+
+
+ Returns
+ -------
+ ('pmd', [bufs]) :
+ where pmd is the pickled metadata wrapper,
+ bufs is a list of data buffers"""
# threshold is 100 B
databuffers = []
if isinstance(obj, (list, tuple)):
@@ -318,6 +331,8 @@ def send(self, stream, msg_type, content=None, buffers=None, parent=None, subhea
Parameters
----------
+ stream : zmq.Socket or ZMQStream
+ the socket-like object used to send the data
msg_type : str or Message/dict
Normally, msg_type will be
@@ -347,10 +362,7 @@ def send(self, stream, msg_type, content=None, buffers=None, parent=None, subhea
to_send.append(DELIM)
to_send.append(self.pack(msg['header']))
to_send.append(self.pack(msg['parent_header']))
- # if parent is None:
- # to_send.append(self.none)
- # else:
- # to_send.append(self.pack(dict(parent)))
+
if content is None:
content = self.none
elif isinstance(content, dict):
@@ -374,11 +386,10 @@ def send(self, stream, msg_type, content=None, buffers=None, parent=None, subhea
pprint.pprint(omsg)
pprint.pprint(to_send)
pprint.pprint(buffers)
- # return both the msg object and the buffers
return omsg
def send_raw(self, stream, msg, flags=0, copy=True, idents=None):
- """send a raw message via idents.
+ """Send a raw message via idents.
Parameters
----------
@@ -399,7 +410,7 @@ def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
socket = socket.socket
try:
msg = socket.recv_multipart(mode)
- except zmq.ZMQError, e:
+ except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
# We can convert EAGAIN to None as we know in this case
# recv_json won't return None.
@@ -412,7 +423,7 @@ def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
idents, msg = self.feed_identities(msg, copy)
try:
return idents, self.unpack_message(msg, content=content, copy=copy)
- except Exception, e:
+ except Exception as e:
print (idents, msg)
# TODO: handle it
raise e
Oops, something went wrong.

0 comments on commit b8dd49c

Please sign in to comment.