Skip to content

Commit

Permalink
allow configuration of buffer storage in Task DB
Browse files Browse the repository at this point in the history
  • Loading branch information
minrk committed Jun 2, 2012
1 parent dbad051 commit 3b8e88e
Showing 1 changed file with 49 additions and 3 deletions.
52 changes: 49 additions & 3 deletions IPython/parallel/controller/hub.py
Expand Up @@ -319,6 +319,27 @@ class Hub(SessionFactory):
client_info: dict of zmq connection information for engines to connect client_info: dict of zmq connection information for engines to connect
to the queues. to the queues.
""" """

store_request_buffers = Bool(True, config=True,
"""Store raw data necessary for resubmitting apply_requests in the Task Database.
If you do not need to use the Client to resubmit apply_requests,
then you can safely disable this feature.
"""
)
store_result_buffers = Bool(True, config=True,
"""Store raw data necessary for reconstructing results in the Task Database.
If you do not need to fetch results from the Hub (those not from your Client object)
then you can safely disable this feature.
"""
)
def _store_result_buffers_changed(self, name, old, new):
"""Without storing results, resubmit makes no sense"""
if not new:
self.store_request_buffers = False


# internal data structures: # internal data structures:
ids=Set() # engine IDs ids=Set() # engine IDs
keytable=Dict() keytable=Dict()
Expand Down Expand Up @@ -565,6 +586,10 @@ def save_queue_request(self, idents, msg):
self.log.debug("queue:: valid are: %r", self.by_ident.keys()) self.log.debug("queue:: valid are: %r", self.by_ident.keys())
return return
record = init_record(msg) record = init_record(msg)

if not self.store_request_buffers:
record['buffers'] = []

msg_id = record['msg_id'] msg_id = record['msg_id']
self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid) self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
# Unicode in records # Unicode in records
Expand Down Expand Up @@ -640,7 +665,8 @@ def save_queue_result(self, idents, msg):
'completed' : completed 'completed' : completed
} }


result['result_buffers'] = msg['buffers'] if self.store_result_buffers:
result['result_buffers'] = msg['buffers']
try: try:
self.db.update_record(msg_id, result) self.db.update_record(msg_id, result)
except Exception: except Exception:
Expand All @@ -661,6 +687,9 @@ def save_task_request(self, idents, msg):
return return
record = init_record(msg) record = init_record(msg)


if not self.store_request_buffers:
record['buffers'] = []

record['client_uuid'] = client_id.decode('ascii') record['client_uuid'] = client_id.decode('ascii')
record['queue'] = 'task' record['queue'] = 'task'
header = msg['header'] header = msg['header']
Expand Down Expand Up @@ -744,8 +773,10 @@ def save_task_result(self, idents, msg):
'received' : datetime.now(), 'received' : datetime.now(),
'engine_uuid': engine_uuid, 'engine_uuid': engine_uuid,
} }


result['result_buffers'] = msg['buffers'] if self.store_result_buffers:
result['result_buffers'] = msg['buffers']

try: try:
self.db.update_record(msg_id, result) self.db.update_record(msg_id, result)
except Exception: except Exception:
Expand Down Expand Up @@ -1121,6 +1152,12 @@ def resubmit_task(self, client_id, msg):
"""Resubmit one or more tasks.""" """Resubmit one or more tasks."""
def finish(reply): def finish(reply):
self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id) self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)

try:
if not self.store_request_buffers:
raise ValueError("Hub has disabled task resubmission.")
except Exception:
return finish(error.wrap_exception())


content = msg['content'] content = msg['content']
msg_ids = content['msg_ids'] msg_ids = content['msg_ids']
Expand Down Expand Up @@ -1209,6 +1246,15 @@ def _extract_record(self, rec):


def get_results(self, client_id, msg): def get_results(self, client_id, msg):
"""Get the result of 1 or more messages.""" """Get the result of 1 or more messages."""

try:
if not self.store_request_buffers:
raise ValueError("Hub has disabled result retrieval.")
except Exception:
self.session.send(self.query, "result_reply", content=error.wrap_exception(),
parent=msg, ident=client_id)
return

content = msg['content'] content = msg['content']
msg_ids = sorted(set(content['msg_ids'])) msg_ids = sorted(set(content['msg_ids']))
statusonly = content.get('status_only', False) statusonly = content.get('status_only', False)
Expand Down

0 comments on commit 3b8e88e

Please sign in to comment.