Skip to content
Browse files

SGE test related fixes

* allow iopub messages to arrive first on Hub
* SQLite no longer commits immediately
* parallelwave example
* typos in launcher.py
  • Loading branch information...
1 parent 037d01b commit 9634efdfa8adc2c11c78fbf1742dc7195f1f4748 @minrk minrk committed Apr 1, 2011
View
89 IPython/parallel/hub.py
@@ -45,6 +45,30 @@ def _printer(*args, **kwargs):
print (args)
print (kwargs)
+def empty_record():
+ """Return an empty dict with all record keys."""
+ return {
+ 'msg_id' : None,
+ 'header' : None,
+ 'content': None,
+ 'buffers': None,
+ 'submitted': None,
+ 'client_uuid' : None,
+ 'engine_uuid' : None,
+ 'started': None,
+ 'completed': None,
+ 'resubmitted': None,
+ 'result_header' : None,
+ 'result_content' : None,
+ 'result_buffers' : None,
+ 'queue' : None,
+ 'pyin' : None,
+ 'pyout': None,
+ 'pyerr': None,
+ 'stdout': '',
+ 'stderr': '',
+ }
+
def init_record(msg):
"""Initialize a TaskRecord based on a request."""
header = msg['header']
@@ -283,6 +307,7 @@ class Hub(LoggingFactory):
tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
completed=Dict() # completed msg_ids keyed by engine_id
all_completed=Set() # completed msg_ids keyed by engine_id
+ dead_engines=Set() # completed msg_ids keyed by engine_id
# mia=None
incoming_registrations=Dict()
registration_timeout=Int()
@@ -531,9 +556,21 @@ def save_queue_request(self, idents, msg):
record['client_uuid'] = client_id
record['queue'] = 'mux'
+ try:
+ # it's posible iopub arrived first:
+ existing = self.db.get_record(msg_id)
+ for key,evalue in existing.iteritems():
+ rvalue = record[key]
+ if evalue and rvalue and evalue != rvalue:
+ self.log.error("conflicting initial state for record: %s:%s <> %s"%(msg_id, rvalue, evalue))
+ elif evalue and not rvalue:
+ record[key] = evalue
+ self.db.update_record(msg_id, record)
+ except KeyError:
+ self.db.add_record(msg_id, record)
+
self.pending.add(msg_id)
self.queues[eid].append(msg_id)
- self.db.add_record(msg_id, record)
def save_queue_result(self, idents, msg):
if len(idents) < 2:
@@ -551,7 +588,7 @@ def save_queue_result(self, idents, msg):
eid = self.by_ident.get(queue_id, None)
if eid is None:
self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
- self.log.debug("queue:: %s"%msg[2:])
+ # self.log.debug("queue:: %s"%msg[2:])
return
parent = msg['parent_header']
@@ -604,7 +641,18 @@ def save_task_request(self, idents, msg):
header = msg['header']
msg_id = header['msg_id']
self.pending.add(msg_id)
- self.db.add_record(msg_id, record)
+ try:
+ # it's posible iopub arrived first:
+ existing = self.db.get_record(msg_id)
+ for key,evalue in existing.iteritems():
+ rvalue = record[key]
+ if evalue and rvalue and evalue != rvalue:
+ self.log.error("conflicting initial state for record: %s:%s <> %s"%(msg_id, rvalue, evalue))
+ elif evalue and not rvalue:
+ record[key] = evalue
+ self.db.update_record(msg_id, record)
+ except KeyError:
+ self.db.add_record(msg_id, record)
def save_task_result(self, idents, msg):
"""save the result of a completed task."""
@@ -704,9 +752,10 @@ def save_iopub_message(self, topics, msg):
# ensure msg_id is in db
try:
rec = self.db.get_record(msg_id)
- except:
- self.log.error("iopub::IOPub message has invalid parent", exc_info=True)
- return
+ except KeyError:
+ rec = empty_record()
+ rec['msg_id'] = msg_id
+ self.db.add_record(msg_id, rec)
# stream
d = {}
if msg_type == 'stream':
@@ -734,7 +783,8 @@ def connection_request(self, client_id, msg):
content.update(self.client_info)
jsonable = {}
for k,v in self.keytable.iteritems():
- jsonable[str(k)] = v
+ if v not in self.dead_engines:
+ jsonable[str(k)] = v
content['engines'] = jsonable
self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
@@ -812,15 +862,20 @@ def unregister_engine(self, ident, msg):
return
self.log.info("registration::unregister_engine(%s)"%eid)
# print (eid)
- content=dict(id=eid, queue=self.engines[eid].queue)
- self.ids.remove(eid)
- uuid = self.keytable.pop(eid)
- ec = self.engines.pop(eid)
- self.hearts.pop(ec.heartbeat)
- self.by_ident.pop(ec.queue)
- self.completed.pop(eid)
- self._handle_stranded_msgs(eid, uuid)
- ############## TODO: HANDLE IT ################
+ uuid = self.keytable[eid]
+ content=dict(id=eid, queue=uuid)
+ self.dead_engines.add(uuid)
+ # self.ids.remove(eid)
+ # uuid = self.keytable.pop(eid)
+ #
+ # ec = self.engines.pop(eid)
+ # self.hearts.pop(ec.heartbeat)
+ # self.by_ident.pop(ec.queue)
+ # self.completed.pop(eid)
+ handleit = lambda : self._handle_stranded_msgs(eid, uuid)
+ dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
+ dc.start()
+ ############## TODO: HANDLE IT ################
if self.notifier:
self.session.send(self.notifier, "unregistration_notification", content=content)
@@ -833,7 +888,7 @@ def _handle_stranded_msgs(self, eid, uuid):
that the result failed and later receive the actual result.
"""
- outstanding = self.queues.pop(eid)
+ outstanding = self.queues[eid]
for msg_id in outstanding:
self.pending.remove(msg_id)
View
4 IPython/parallel/launcher.py
@@ -904,7 +904,7 @@ class PBSEngineSetLauncher(PBSLauncher):
def start(self, n, cluster_dir):
"""Start n engines by profile or cluster_dir."""
- self.log.info('Starting %n engines with PBSEngineSetLauncher: %r' % (n, self.args))
+ self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
return super(PBSEngineSetLauncher, self).start(n, cluster_dir)
#SGE is very similar to PBS
@@ -942,7 +942,7 @@ class SGEEngineSetLauncher(SGELauncher):
def start(self, n, cluster_dir):
"""Start n engines by profile or cluster_dir."""
- self.log.info('Starting %n engines with SGEEngineSetLauncher: %r' % (n, self.args))
+ self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
return super(SGEEngineSetLauncher, self).start(n, cluster_dir)
View
21 IPython/parallel/sqlitedb.py
@@ -13,6 +13,8 @@
import sqlite3
+from zmq.eventloop import ioloop
+
from IPython.utils.traitlets import CUnicode, CStr, Instance, List
from .dictdb import BaseDB
from .util import ISO8601
@@ -114,6 +116,13 @@ def __init__(self, **kwargs):
else:
self.location = '.'
self._init_db()
+
+ # register db commit as 2s periodic callback
+ # to prevent clogging pipes
+ # assumes we are being run in a zmq ioloop app
+ loop = ioloop.IOLoop.instance()
+ pc = ioloop.PeriodicCallback(self._db.commit, 2000, loop)
+ pc.start()
def _defaults(self):
"""create an empty record"""
@@ -133,7 +142,9 @@ def _init_db(self):
sqlite3.register_converter('bufs', _convert_bufs)
# connect to the db
dbfile = os.path.join(self.location, self.filename)
- self._db = sqlite3.connect(dbfile, detect_types=sqlite3.PARSE_DECLTYPES, cached_statements=16)
+ self._db = sqlite3.connect(dbfile, detect_types=sqlite3.PARSE_DECLTYPES,
+ # isolation_level = None)#,
+ cached_statements=64)
# print dir(self._db)
self._db.execute("""CREATE TABLE IF NOT EXISTS %s
@@ -218,7 +229,7 @@ def add_record(self, msg_id, rec):
line = self._dict_to_list(d)
tups = '(%s)'%(','.join(['?']*len(line)))
self._db.execute("INSERT INTO %s VALUES %s"%(self.table, tups), line)
- self._db.commit()
+ # self._db.commit()
def get_record(self, msg_id):
"""Get a specific Task Record, by msg_id."""
@@ -240,19 +251,19 @@ def update_record(self, msg_id, rec):
query += ', '.join(sets)
query += ' WHERE msg_id == %r'%msg_id
self._db.execute(query, values)
- self._db.commit()
+ # self._db.commit()
def drop_record(self, msg_id):
"""Remove a record from the DB."""
self._db.execute("""DELETE FROM %s WHERE mgs_id==?"""%self.table, (msg_id,))
- self._db.commit()
+ # self._db.commit()
def drop_matching_records(self, check):
"""Remove a record from the DB."""
expr,args = self._render_expression(check)
query = "DELETE FROM %s WHERE %s"%(self.table, expr)
self._db.execute(query,args)
- self._db.commit()
+ # self._db.commit()
def find_records(self, check, id_only=False):
"""Find records matching a query dict."""
View
2 docs/examples/newparallel/wave2D/parallelwave-mpi.py
@@ -28,7 +28,7 @@
from numpy import exp, zeros, newaxis, sqrt
from IPython.external import argparse
-from IPython.parallel.client import Client, Reference
+from IPython.parallel import Client, Reference
def setup_partitioner(index, num_procs, gnum_cells, parts):
"""create a partitioner in the engine namespace"""
View
2 docs/examples/newparallel/wave2D/parallelwave.py
@@ -28,7 +28,7 @@
from numpy import exp, zeros, newaxis, sqrt
from IPython.external import argparse
-from IPython.parallel.client import Client, Reference
+from IPython.parallel import Client, Reference
def setup_partitioner(comm, addrs, index, num_procs, gnum_cells, parts):
"""create a partitioner in the engine namespace"""

0 comments on commit 9634efd

Please sign in to comment.
Something went wrong with that request. Please try again.