Permalink
Browse files

Started DB backend with mongoDB support.

  • Loading branch information...
1 parent 4eb1ed7 commit 7289c9f86c7a43043677bb17ff1696346632f5b9 @minrk minrk committed Nov 21, 2010
Showing with 321 additions and 47 deletions.
  1. +117 −47 IPython/zmq/parallel/controller.py
  2. +148 −0 IPython/zmq/parallel/dictdb.py
  3. +56 −0 IPython/zmq/parallel/mongodb.py
@@ -27,10 +27,17 @@
from IPython.zmq.log import logger # a Logger object
from IPython.zmq.entry_point import bind_port
-from streamsession import Message, wrap_exception
+from streamsession import Message, wrap_exception, ISO8601
from entry_point import (make_base_argument_parser, select_random_ports, split_ports,
connect_logger, parse_url, signal_children, generate_exec_key)
-
+from dictdb import DictDB
+try:
+ from pymongo.binary import Binary
+except ImportError:
+ MongoDB=None
+else:
+ from mongodb import MongoDB
+
#-----------------------------------------------------------------------------
# Code
#-----------------------------------------------------------------------------
@@ -65,6 +72,27 @@ def pop(self, key):
return value
+def init_record(msg):
+ """return an empty TaskRecord dict, with all keys initialized with None."""
+ header = msg['header']
+ return {
+ 'msg_id' : header['msg_id'],
+ 'header' : header,
+ 'content': msg['content'],
+ 'buffers': msg['buffers'],
+ 'submitted': datetime.strptime(header['date'], ISO8601),
+ 'client_uuid' : None,
+ 'engine_uuid' : None,
+ 'started': None,
+ 'completed': None,
+ 'resubmitted': None,
+ 'result_header' : None,
+ 'result_content' : None,
+ 'result_buffers' : None,
+ 'queue' : None
+ }
+
+
class EngineConnector(object):
"""A simple object for accessing the various zmq connections of an object.
Attributes are:
@@ -159,7 +187,7 @@ def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifi
self.by_ident = {}
self.clients = {}
self.hearts = {}
- self.mia = set()
+ # self.mia = set()
# self.sockets = {}
self.loop = loop
@@ -210,14 +238,14 @@ def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifi
'unregistration_request' : self.unregister_engine,
'connection_request': self.connection_request,
}
- #
+ self.registration_timeout = max(5000, 2*self.heartbeat.period)
# this is the stuff that will move to DB:
- self.results = {} # completed results
- self.pending = {} # pending messages, keyed by msg_id
+ # self.results = {} # completed results
+ self.pending = set() # pending messages, keyed by msg_id
self.queues = {} # pending msg_ids keyed by engine_id
self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
self.completed = {} # completed msg_ids keyed by engine_id
- self.registration_timeout = max(5000, 2*self.heartbeat.period)
+ self.all_completed = set()
logger.info("controller::created controller")
@@ -322,6 +350,7 @@ def dispatch_client_msg(self, msg):
if not idents:
logger.error("Bad Client Message: %s"%msg)
return
+ client_id = idents[0]
try:
msg = self.session.unpack_message(msg, content=True)
except:
@@ -401,11 +430,15 @@ def save_queue_request(self, idents, msg):
header = msg['header']
msg_id = header['msg_id']
- info = dict(submit=datetime.now(),
- received=None,
- engine=(eid, queue_id))
- self.pending[msg_id] = ( msg, info )
+ record = init_record(msg)
+ record['engine_uuid'] = queue_id
+ record['client_uuid'] = client_id
+ record['queue'] = 'mux'
+ if MongoDB is not None and isinstance(self.db, MongoDB):
+ record['buffers'] = map(Binary, record['buffers'])
+ self.pending.add(msg_id)
self.queues[eid].append(msg_id)
+ self.db.add_record(msg_id, record)
def save_queue_result(self, idents, msg):
if len(idents) < 2:
@@ -430,11 +463,25 @@ def save_queue_result(self, idents, msg):
if not parent:
return
msg_id = parent['msg_id']
- self.results[msg_id] = msg
if msg_id in self.pending:
- self.pending.pop(msg_id)
+ self.pending.remove(msg_id)
+ self.all_completed.add(msg_id)
self.queues[eid].remove(msg_id)
self.completed[eid].append(msg_id)
+ rheader = msg['header']
+ completed = datetime.strptime(rheader['date'], ISO8601)
+ started = rheader.get('started', None)
+ if started is not None:
+ started = datetime.strptime(started, ISO8601)
+ result = {
+ 'result_header' : rheader,
+ 'result_content': msg['content'],
+ 'started' : started,
+ 'completed' : completed
+ }
+ if MongoDB is not None and isinstance(self.db, MongoDB):
+ result['result_buffers'] = map(Binary, msg['buffers'])
+ self.db.update_record(msg_id, result)
else:
logger.debug("queue:: unknown msg finished %s"%msg_id)
@@ -445,28 +492,26 @@ def save_task_request(self, idents, msg):
client_id = idents[0]
try:
- msg = self.session.unpack_message(msg, content=False)
+ msg = self.session.unpack_message(msg, content=False, copy=False)
except:
logger.error("task::client %r sent invalid task message: %s"%(
client_id, msg), exc_info=True)
return
-
+ rec = init_record(msg)
+ if MongoDB is not None and isinstance(self.db, MongoDB):
+ record['buffers'] = map(Binary, record['buffers'])
+ rec['client_uuid'] = client_id
+ rec['queue'] = 'task'
header = msg['header']
msg_id = header['msg_id']
- self.mia.add(msg_id)
- info = dict(submit=datetime.now(),
- received=None,
- engine=None)
- self.pending[msg_id] = (msg, info)
- if not self.tasks.has_key(client_id):
- self.tasks[client_id] = []
- self.tasks[client_id].append(msg_id)
+ self.pending.add(msg_id)
+ self.db.add_record(msg_id, rec)
def save_task_result(self, idents, msg):
"""save the result of a completed task."""
client_id = idents[0]
try:
- msg = self.session.unpack_message(msg, content=False)
+ msg = self.session.unpack_message(msg, content=False, copy=False)
except:
logger.error("task::invalid task result message send to %r: %s"%(
client_id, msg))
@@ -478,19 +523,33 @@ def save_task_result(self, idents, msg):
logger.warn("Task %r had no parent!"%msg)
return
msg_id = parent['msg_id']
- self.results[msg_id] = msg
header = msg['header']
engine_uuid = header.get('engine', None)
eid = self.by_ident.get(engine_uuid, None)
if msg_id in self.pending:
- self.pending.pop(msg_id)
- if msg_id in self.mia:
- self.mia.remove(msg_id)
- if eid is not None and msg_id in self.tasks[eid]:
+ self.pending.remove(msg_id)
+ self.all_completed.add(msg_id)
+ if eid is not None:
self.completed[eid].append(msg_id)
- self.tasks[eid].remove(msg_id)
+ if msg_id in self.tasks[eid]:
+ self.tasks[eid].remove(msg_id)
+ completed = datetime.strptime(header['date'], ISO8601)
+ started = header.get('started', None)
+ if started is not None:
+ started = datetime.strptime(started, ISO8601)
+ result = {
+ 'result_header' : header,
+ 'result_content': msg['content'],
+ 'started' : started,
+ 'completed' : completed,
+ 'engine_uuid': engine_uuid
+ }
+ if MongoDB is not None and isinstance(self.db, MongoDB):
+ result['result_buffers'] = map(Binary, msg['buffers'])
+ self.db.update_record(msg_id, result)
+
else:
logger.debug("task::unknown task %s finished"%msg_id)
@@ -507,18 +566,20 @@ def save_task_destination(self, idents, msg):
eid = self.by_ident[engine_uuid]
logger.info("task::task %s arrived on %s"%(msg_id, eid))
- if msg_id in self.mia:
- self.mia.remove(msg_id)
- else:
- logger.debug("task::task %s not listed as MIA?!"%(msg_id))
+ # if msg_id in self.mia:
+ # self.mia.remove(msg_id)
+ # else:
+ # logger.debug("task::task %s not listed as MIA?!"%(msg_id))
self.tasks[eid].append(msg_id)
- self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
+ # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
+ self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
def mia_task_request(self, idents, msg):
+ raise NotImplementedError
client_id = idents[0]
- content = dict(mia=self.mia,status='ok')
- self.session.send('mia_reply', content=content, idents=client_id)
+ # content = dict(mia=self.mia,status='ok')
+ # self.session.send('mia_reply', content=content, idents=client_id)
@@ -614,7 +675,7 @@ def unregister_engine(self, ident, msg):
self.by_ident.pop(ec.queue)
self.completed.pop(eid)
for msg_id in self.queues.pop(eid):
- msg = self.pending.pop(msg_id)
+ msg = self.pending.remove(msg_id)
############## TODO: HANDLE IT ################
if self.notifier:
@@ -710,11 +771,11 @@ def purge_results(self, client_id, msg):
msg_ids = content.get('msg_ids', [])
reply = dict(status='ok')
if msg_ids == 'all':
- self.results = {}
+ self.db.drop_matching_records(dict(completed={'$ne':None}))
else:
for msg_id in msg_ids:
- if msg_id in self.results:
- self.results.pop(msg_id)
+ if msg_id in self.all_completed:
+ self.db.drop_record(msg_id)
else:
if msg_id in self.pending:
try:
@@ -736,10 +797,10 @@ def purge_results(self, client_id, msg):
reply = wrap_exception()
break
msg_ids = self.completed.pop(eid)
- for msg_id in msg_ids:
- self.results.pop(msg_id)
+ uid = self.engines[eid].queue
+ self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
- self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
+ self.session.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
def resubmit_task(self, client_id, msg, buffers):
"""Resubmit a task."""
@@ -755,13 +816,15 @@ def get_results(self, client_id, msg):
content = dict(status='ok')
content['pending'] = pending
content['completed'] = completed
+ if not statusonly:
+ records = self.db.find_records(dict(msg_id={'$in':msg_ids}))
for msg_id in msg_ids:
if msg_id in self.pending:
pending.append(msg_id)
- elif msg_id in self.results:
+ elif msg_id in self.all_completed:
completed.append(msg_id)
if not statusonly:
- content[msg_id] = self.results[msg_id]['content']
+ content[msg_id] = records[msg_id]['result_content']
else:
try:
raise KeyError('No such message: '+msg_id)
@@ -799,6 +862,8 @@ def make_argument_parser():
parser.add_argument('--scheduler', type=str, default='pure',
choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
help='select the task scheduler [default: pure ZMQ]')
+ parser.add_argument('--mongodb', action='store_true',
+ help='Use MongoDB task storage [default: in-memory]')
return parser
@@ -927,6 +992,11 @@ def main(argv=None):
q.start()
children.append(q)
+ if args.mongodb:
+ from mongodb import MongoDB
+ db = MongoDB(thesession.session)
+ else:
+ db = DictDB()
time.sleep(.25)
# build connection dicts
@@ -946,7 +1016,7 @@ def main(argv=None):
'notification': iface%nport
}
signal_children(children)
- con = Controller(loop, thesession, sub, reg, hmon, c, n, None, engine_addrs, client_addrs)
+ con = Controller(loop, thesession, sub, reg, hmon, c, n, db, engine_addrs, client_addrs)
dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
loop.start()
Oops, something went wrong.

0 comments on commit 7289c9f

Please sign in to comment.