Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Select NoDB by default #1928

Merged
merged 3 commits into from

2 participants

@minrk
Owner

As discussed on the list, NoDB is now the default backend.

A small bug discovered along the way when using HubResults for execute requests is also fixed and tested.

minrk added some commits
@fperez
Owner

Test results for commit fcf42b7 merged into master
Platform: linux2

  • python2.7: OK
  • python3.2: OK (libraries not available: cython matplotlib pymongo rpy2 wx wx.aui)

Not available for testing: python2.6, python3.1

@fperez
Owner

Awesome, clean and solid code. Merging now, thanks!!

@fperez fperez merged commit e5d53ac into from
@minrk minrk deleted the branch
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jun 12, 2012
  1. @minrk
  2. @minrk

    Use NoDB by default

    minrk authored
    adds shortcuts, so you can specify "DictDB", rather than "full.path.to.DictDB".
  3. @minrk
This page is out of date. Refresh to see the latest.
View
10 IPython/parallel/apps/ipcontrollerapp.py
@@ -110,10 +110,11 @@
'nodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.NoDB'}},
"""use dummy DB backend, which doesn't store any information.
- This can be used to prevent growth of the memory footprint of the Hub
- in cases where its record-keeping is not required. Requesting results
- of tasks submitted by other clients, db_queries, and task resubmission
- will not be available."""),
+ This is the default as of IPython 0.13.
+
+ To enable delayed or repeated retrieval of results from the Hub,
+ select one of the true db backends.
+ """),
'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
'reuse existing json connection files')
})
@@ -138,7 +139,6 @@
aliases.update(base_aliases)
aliases.update(session_aliases)
-
class IPControllerApp(BaseParallelApplication):
name = u'ipcontroller'
View
12 IPython/parallel/client/client.py
@@ -1543,11 +1543,15 @@ def result_status(self, msg_ids, status_only=True):
if rec.get('received'):
md['received'] = rec['received']
md.update(iodict)
-
+
if rcontent['status'] == 'ok':
- res,buffers = util.unserialize_object(buffers)
+ if header['msg_type'] == 'apply_reply':
+ res,buffers = util.unserialize_object(buffers)
+ elif header['msg_type'] == 'execute_reply':
+ res = ExecuteReply(msg_id, rcontent, md)
+ else:
+ raise KeyError("unhandled msg type: %r" % header[msg_type])
else:
- print rcontent
res = self._unwrap_exception(rcontent)
failures.append(res)
@@ -1555,7 +1559,7 @@ def result_status(self, msg_ids, status_only=True):
content[msg_id] = res
if len(theids) == 1 and failures:
- raise failures[0]
+ raise failures[0]
error.collect_exceptions(failures, "result_status")
return content
View
10 IPython/parallel/controller/dictdb.py
@@ -184,6 +184,10 @@ def get_history(self):
msg_ids = self._records.keys()
return sorted(msg_ids, key=lambda m: self._records[m]['submitted'])
+NODATA = KeyError("NoDB backend doesn't store any data. "
+"Start the Controller with a DB backend to enable resubmission / result persistence."
+)
+
class NoDB(DictDB):
"""A blackhole db backend that actually stores no information.
@@ -197,7 +201,7 @@ def add_record(self, msg_id, record):
pass
def get_record(self, msg_id):
- raise KeyError("NoDB does not support record access")
+ raise NODATA
def update_record(self, msg_id, record):
pass
@@ -209,8 +213,8 @@ def drop_record(self, msg_id):
pass
def find_records(self, check, keys=None):
- raise KeyError("NoDB does not store information")
+ raise NODATA
def get_history(self):
- raise KeyError("NoDB does not store information")
+ raise NODATA
View
35 IPython/parallel/controller/hub.py
@@ -119,6 +119,13 @@ class EngineConnector(HasTraits):
heartbeat=CBytes()
pending=Set()
+_db_shortcuts = {
+ 'sqlitedb' : 'IPython.parallel.controller.sqlitedb.SQLiteDB',
+ 'mongodb' : 'IPython.parallel.controller.mongodb.MongoDB',
+ 'dictdb' : 'IPython.parallel.controller.dictdb.DictDB',
+ 'nodb' : 'IPython.parallel.controller.dictdb.NoDB',
+}
+
class HubFactory(RegistrationFactory):
"""The Configurable for setting up a Hub."""
@@ -181,8 +188,17 @@ def _notifier_port_default(self):
monitor_url = Unicode('')
- db_class = DottedObjectName('IPython.parallel.controller.dictdb.DictDB',
- config=True, help="""The class to use for the DB backend""")
+ db_class = DottedObjectName('NoDB',
+ config=True, help="""The class to use for the DB backend
+
+ Options include:
+
+ SQLiteDB: SQLite
+ MongoDB : use MongoDB
+ DictDB : in-memory storage (fastest, but be mindful of memory growth of the Hub)
+ NoDB : disable database altogether (default)
+
+ """)
# not configurable
db = Instance('IPython.parallel.controller.dictdb.BaseDB')
@@ -258,9 +274,9 @@ def init_hub(self):
sub = ZMQStream(sub, loop)
# connect the db
- self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
- # cdir = self.config.Global.cluster_dir
- self.db = import_item(str(self.db_class))(session=self.session.session,
+ db_class = _db_shortcuts.get(self.db_class.lower(), self.db_class)
+ self.log.info('Hub using DB backend: %r', (db_class.split('.')[-1]))
+ self.db = import_item(str(db_class))(session=self.session.session,
config=self.config, log=self.log)
time.sleep(.25)
try:
@@ -824,8 +840,15 @@ def save_iopub_message(self, topics, msg):
d['pyerr'] = content
elif msg_type == 'pyin':
d['pyin'] = content['code']
+ elif msg_type in ('display_data', 'pyout'):
+ d[msg_type] = content
+ elif msg_type == 'status':
+ pass
else:
- d[msg_type] = content.get('data', '')
+ self.log.warn("unhandled iopub msg_type: %r", msg_type)
+
+ if not d:
+ return
try:
self.db.update_record(msg_id, d)
View
2  IPython/parallel/tests/__init__.py
@@ -57,7 +57,7 @@ def setup():
cp = TestProcessLauncher()
cp.cmd_and_args = ipcontroller_cmd_argv + \
- ['--profile=iptest', '--log-level=50', '--ping=250']
+ ['--profile=iptest', '--log-level=50', '--ping=250', '--dictdb']
cp.start()
launchers.append(cp)
tic = time.time()
View
19 IPython/parallel/tests/test_client.py
@@ -159,6 +159,25 @@ def test_get_result(self):
self.assertFalse(isinstance(ar2, AsyncHubResult))
c.close()
+ def test_get_execute_result(self):
+ """test getting execute results from the Hub."""
+ c = clientmod.Client(profile='iptest')
+ t = c.ids[-1]
+ cell = '\n'.join([
+ 'import time',
+ 'time.sleep(0.25)',
+ '5'
+ ])
+ ar = c[t].execute("import time; time.sleep(1)", silent=False)
+ # give the monitor time to notice the message
+ time.sleep(.25)
+ ahr = self.client.get_result(ar.msg_ids)
+ self.assertTrue(isinstance(ahr, AsyncHubResult))
+ self.assertEquals(ahr.get().pyout, ar.get().pyout)
+ ar2 = self.client.get_result(ar.msg_ids)
+ self.assertFalse(isinstance(ar2, AsyncHubResult))
+ c.close()
+
def test_ids_list(self):
"""test client.ids"""
ids = self.client.ids
View
52 docs/source/parallel/parallel_db.txt
@@ -4,19 +4,40 @@
IPython's Task Database
=======================
-The IPython Hub stores all task requests and results in a database. Currently supported backends
-are: MongoDB, SQLite (the default), and an in-memory DictDB. The most common use case for
-this is clients requesting results for tasks they did not submit, via:
+Enabling a DB Backend
+=====================
+
+The IPython Hub can store all task requests and results in a database.
+Currently supported backends are: MongoDB, SQLite, and an in-memory DictDB.
+
+This database behavior is optional due to its potential :ref:`db_cost`,
+so you must enable one, either at the command-line::
+
+ $> ipcontroller --dictb # or --mongodb or --sqlitedb
+
+or in your :file:`ipcontroller_config.py`:
+
+.. sourcecode:: python
+
+ c.HubFactory.db_class = "DictDB"
+ c.HubFactory.db_class = "MongoDB"
+ c.HubFactory.db_class = "SQLiteDB"
+
+
+Using the Task Database
+=======================
+
+The most common use case for this is clients requesting results for tasks they did not submit, via:
.. sourcecode:: ipython
In [1]: rc.get_result(task_id)
-However, since we have this DB backend, we provide a direct query method in the :class:`client`
+However, since we have this DB backend, we provide a direct query method in the :class:`~.Client`
for users who want deeper introspection into their task history. The :meth:`db_query` method of
the Client is modeled after MongoDB queries, so if you have used MongoDB it should look
-familiar. In fact, when the MongoDB backend is in use, the query is relayed directly. However,
-when using other backends, the interface is emulated and only a subset of queries is possible.
+familiar. In fact, when the MongoDB backend is in use, the query is relayed directly.
+When using other backends, the interface is emulated and only a subset of queries is possible.
.. seealso::
@@ -39,18 +60,18 @@ header dict The request header
content dict The request content (likely empty)
buffers list(bytes) buffers containing serialized request objects
submitted datetime timestamp for time of submission (set by client)
-client_uuid uuid(bytes) IDENT of client's socket
-engine_uuid uuid(bytes) IDENT of engine's socket
+client_uuid uuid(ascii) IDENT of client's socket
+engine_uuid uuid(ascii) IDENT of engine's socket
started datetime time task began execution on engine
completed datetime time task finished execution (success or failure) on engine
resubmitted uuid(ascii) msg_id of resubmitted task (if applicable)
result_header dict header for result
result_content dict content for result
result_buffers list(bytes) buffers containing serialized request objects
-queue bytes The name of the queue for the task ('mux' or 'task')
-pyin <unused> Python input (unused)
-pyout <unused> Python output (unused)
-pyerr <unused> Python traceback (unused)
+queue str The name of the queue for the task ('mux' or 'task')
+pyin str Python input source
+pyout dict Python output (pyout message content)
+pyerr dict Python traceback (pyerr message content)
stdout str Stream of stdout data
stderr str Stream of stderr data
@@ -77,15 +98,15 @@ The DB Query is useful for two primary cases:
1. deep polling of task status or metadata
2. selecting a subset of tasks, on which to perform a later operation (e.g. wait on result, purge records, resubmit,...)
+
Example Queries
===============
-
To get all msg_ids that are not completed, only retrieving their ID and start time:
.. sourcecode:: ipython
- In [1]: incomplete = rc.db_query({'complete' : None}, keys=['msg_id', 'started'])
+ In [1]: incomplete = rc.db_query({'completed' : None}, keys=['msg_id', 'started'])
All jobs started in the last hour by me:
@@ -113,12 +134,13 @@ Result headers for all jobs on engine 3 or 4:
In [2]: hist34 = rc.db_query({'engine_uuid' : {'$in' : uuids }, keys='result_header')
+.. _db_cost:
Cost
====
The advantage of the database backends is, of course, that large amounts of
-data can be stored that won't fit in memory. The default 'backend' is actually
+data can be stored that won't fit in memory. The basic DictDB 'backend' is actually
to just store all of this information in a Python dictionary. This is very fast,
but will run out of memory quickly if you move a lot of data around, or your
cluster is to run for a long time.
Something went wrong with that request. Please try again.