Skip to content

Commit

Permalink
Merge pull request ipython#2051 from jasongrout/stream-metadata
Browse files Browse the repository at this point in the history
Add a metadata attribute to messages

subheader is removed in favor of the new metadata dict,
reducing degeneracy.
  • Loading branch information
minrk committed Jul 21, 2012
2 parents bff463b + 58134b4 commit ea4f608
Show file tree
Hide file tree
Showing 10 changed files with 146 additions and 94 deletions.
19 changes: 11 additions & 8 deletions IPython/frontend/html/notebook/static/js/kernel.js
Expand Up @@ -43,6 +43,7 @@ var IPython = (function (IPython) {
session : this.session_id,
msg_type : msg_type
},
metadata : {},
content : content,
parent_header : {}
};
Expand Down Expand Up @@ -220,21 +221,21 @@ var IPython = (function (IPython) {
// 'set_next_input': set_next_input_callback
// }
//
// The execute_reply_callback will be passed the content object of the execute_reply
// The execute_reply_callback will be passed the content and metadata objects of the execute_reply
// message documented here:
//
// http://ipython.org/ipython-doc/dev/development/messaging.html#execute
//
// The output_callback will be passed msg_type ('stream','display_data','pyout','pyerr')
// of the output and the content object of the PUB/SUB channel that contains the
// of the output and the content and metadata objects of the PUB/SUB channel that contains the
// output:
//
// http://ipython.org/ipython-doc/dev/development/messaging.html#messages-on-the-pub-sub-socket
//
// The clear_output_callback will be passed a content object that contains
// stdout, stderr and other fields that are booleans.
// stdout, stderr and other fields that are booleans, as well as the metadata object.
//
// The set_next_input_callback will bepassed the text that should become the next
// The set_next_input_callback will be passed the text that should become the next
// input cell.

var content = {
Expand Down Expand Up @@ -313,12 +314,13 @@ var IPython = (function (IPython) {
reply = $.parseJSON(e.data);
var header = reply.header;
var content = reply.content;
var metadata = reply.metadata;
var msg_type = header.msg_type;
var callbacks = this.get_callbacks_for_msg(reply.parent_header.msg_id);
if (callbacks !== undefined) {
var cb = callbacks[msg_type];
if (cb !== undefined) {
cb(content);
cb(content, metadata);
}
};

Expand Down Expand Up @@ -347,9 +349,10 @@ var IPython = (function (IPython) {


Kernel.prototype._handle_iopub_reply = function (e) {
reply = $.parseJSON(e.data);
var reply = $.parseJSON(e.data);
var content = reply.content;
var msg_type = reply.header.msg_type;
var metadata = reply.metadata;
var callbacks = this.get_callbacks_for_msg(reply.parent_header.msg_id);
if (msg_type !== 'status' && callbacks === undefined) {
// Message not from one of this notebook's cells and there are no
Expand All @@ -360,7 +363,7 @@ var IPython = (function (IPython) {
if (output_types.indexOf(msg_type) >= 0) {
var cb = callbacks['output'];
if (cb !== undefined) {
cb(msg_type, content);
cb(msg_type, content, metadata);
}
} else if (msg_type === 'status') {
if (content.execution_state === 'busy') {
Expand All @@ -374,7 +377,7 @@ var IPython = (function (IPython) {
} else if (msg_type === 'clear_output') {
var cb = callbacks['clear_output'];
if (cb !== undefined) {
cb(content);
cb(content, metadata);
}
};
};
Expand Down
50 changes: 30 additions & 20 deletions IPython/parallel/client/client.py
Expand Up @@ -629,12 +629,16 @@ def _unwrap_exception(self, content):
e.engine_info['engine_id'] = eid
return e

def _extract_metadata(self, header, parent, content):
def _extract_metadata(self, msg):
header = msg['header']
parent = msg['parent_header']
msg_meta = msg['metadata']
content = msg['content']
md = {'msg_id' : parent['msg_id'],
'received' : datetime.now(),
'engine_uuid' : header.get('engine', None),
'follow' : parent.get('follow', []),
'after' : parent.get('after', []),
'engine_uuid' : msg_meta.get('engine', None),
'follow' : msg_meta.get('follow', []),
'after' : msg_meta.get('after', []),
'status' : content['status'],
}

Expand All @@ -643,8 +647,8 @@ def _extract_metadata(self, header, parent, content):

if 'date' in parent:
md['submitted'] = parent['date']
if 'started' in header:
md['started'] = header['started']
if 'started' in msg_meta:
md['started'] = msg_meta['started']
if 'date' in header:
md['completed'] = header['date']
return md
Expand Down Expand Up @@ -717,7 +721,7 @@ def _handle_execute_reply(self, msg):

# construct metadata:
md = self.metadata[msg_id]
md.update(self._extract_metadata(header, parent, content))
md.update(self._extract_metadata(msg))
# is this redundant?
self.metadata[msg_id] = md

Expand Down Expand Up @@ -754,7 +758,7 @@ def _handle_apply_reply(self, msg):

# construct metadata:
md = self.metadata[msg_id]
md.update(self._extract_metadata(header, parent, content))
md.update(self._extract_metadata(msg))
# is this redundant?
self.metadata[msg_id] = md

Expand Down Expand Up @@ -1180,7 +1184,7 @@ def _maybe_raise(self, result):

return result

def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
def send_apply_request(self, socket, f, args=None, kwargs=None, metadata=None, track=False,
ident=None):
"""construct and send an apply message via a socket.
Expand All @@ -1193,7 +1197,7 @@ def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None,
# defaults:
args = args if args is not None else []
kwargs = kwargs if kwargs is not None else {}
subheader = subheader if subheader is not None else {}
metadata = metadata if metadata is not None else {}

# validate arguments
if not callable(f) and not isinstance(f, Reference):
Expand All @@ -1202,13 +1206,13 @@ def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None,
raise TypeError("args must be tuple or list, not %s"%type(args))
if not isinstance(kwargs, dict):
raise TypeError("kwargs must be dict, not %s"%type(kwargs))
if not isinstance(subheader, dict):
raise TypeError("subheader must be dict, not %s"%type(subheader))
if not isinstance(metadata, dict):
raise TypeError("metadata must be dict, not %s"%type(metadata))

bufs = util.pack_apply_message(f,args,kwargs)

msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
subheader=subheader, track=track)
metadata=metadata, track=track)

msg_id = msg['header']['msg_id']
self.outstanding.add(msg_id)
Expand All @@ -1224,7 +1228,7 @@ def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None,

return msg

def send_execute_request(self, socket, code, silent=True, subheader=None, ident=None):
def send_execute_request(self, socket, code, silent=True, metadata=None, ident=None):
"""construct and send an execute request via a socket.
"""
Expand All @@ -1233,19 +1237,19 @@ def send_execute_request(self, socket, code, silent=True, subheader=None, ident=
raise RuntimeError("Client cannot be used after its sockets have been closed")

# defaults:
subheader = subheader if subheader is not None else {}
metadata = metadata if metadata is not None else {}

# validate arguments
if not isinstance(code, basestring):
raise TypeError("code must be text, not %s" % type(code))
if not isinstance(subheader, dict):
raise TypeError("subheader must be dict, not %s" % type(subheader))
if not isinstance(metadata, dict):
raise TypeError("metadata must be dict, not %s" % type(metadata))

content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})


msg = self.session.send(socket, "execute_request", content=content, ident=ident,
subheader=subheader)
metadata=metadata)

msg_id = msg['header']['msg_id']
self.outstanding.add(msg_id)
Expand Down Expand Up @@ -1380,7 +1384,7 @@ def get_result(self, indices_or_msg_ids=None, block=None):
return ar

@spin_first
def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
def resubmit(self, indices_or_msg_ids=None, metadata=None, block=None):
"""Resubmit one or more tasks.
in-flight tasks may not be resubmitted.
Expand Down Expand Up @@ -1518,7 +1522,13 @@ def result_status(self, msg_ids, status_only=True):
rcontent = self.session.unpack(rcontent)

md = self.metadata[msg_id]
md.update(self._extract_metadata(header, parent, rcontent))
md_msg = dict(
content=rcontent,
parent_header=parent,
header=header,
metadata=rec['result_metadata'],
)
md.update(self._extract_metadata(md_msg))
if rec.get('received'):
md['received'] = rec['received']
md.update(iodict)
Expand Down
4 changes: 2 additions & 2 deletions IPython/parallel/client/view.py
Expand Up @@ -1022,10 +1022,10 @@ def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,

after = self._render_dependency(after)
follow = self._render_dependency(follow)
subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
metadata = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)

msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
subheader=subheader)
metadata=metadata)
tracker = None if track is False else msg['tracker']

ar = AsyncResult(self.client, msg['header']['msg_id'], fname=getname(f), targets=None, tracker=tracker)
Expand Down
31 changes: 21 additions & 10 deletions IPython/parallel/controller/hub.py
Expand Up @@ -58,6 +58,7 @@ def empty_record():
return {
'msg_id' : None,
'header' : None,
'metadata' : None,
'content': None,
'buffers': None,
'submitted': None,
Expand All @@ -68,6 +69,7 @@ def empty_record():
'resubmitted': None,
'received': None,
'result_header' : None,
'result_metadata' : None,
'result_content' : None,
'result_buffers' : None,
'queue' : None,
Expand All @@ -85,6 +87,7 @@ def init_record(msg):
'msg_id' : header['msg_id'],
'header' : header,
'content': msg['content'],
'metadata': msg['metadata'],
'buffers': msg['buffers'],
'submitted': header['date'],
'client_uuid' : None,
Expand All @@ -94,6 +97,7 @@ def init_record(msg):
'resubmitted': None,
'received': None,
'result_header' : None,
'result_metadata': None,
'result_content' : None,
'result_buffers' : None,
'queue' : None,
Expand Down Expand Up @@ -655,10 +659,12 @@ def save_queue_result(self, idents, msg):
return
# update record anyway, because the unregistration could have been premature
rheader = msg['header']
md = msg['metadata']
completed = rheader['date']
started = rheader.get('started', None)
started = md.get('started', None)
result = {
'result_header' : rheader,
'result_metadata': md,
'result_content': msg['content'],
'received': datetime.now(),
'started' : started,
Expand Down Expand Up @@ -745,10 +751,11 @@ def save_task_result(self, idents, msg):
self.unassigned.remove(msg_id)

header = msg['header']
engine_uuid = header.get('engine', u'')
md = msg['metadata']
engine_uuid = md.get('engine', u'')
eid = self.by_ident.get(cast_bytes(engine_uuid), None)

status = header.get('status', None)
status = md.get('status', None)

if msg_id in self.pending:
self.log.info("task::task %r finished on %s", msg_id, eid)
Expand All @@ -760,9 +767,10 @@ def save_task_result(self, idents, msg):
if msg_id in self.tasks[eid]:
self.tasks[eid].remove(msg_id)
completed = header['date']
started = header.get('started', None)
started = md.get('started', None)
result = {
'result_header' : header,
'result_metadata': msg['metadata'],
'result_content': msg['content'],
'started' : started,
'completed' : completed,
Expand Down Expand Up @@ -1285,12 +1293,15 @@ def _extract_record(self, rec):
io_dict = {}
for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'):
io_dict[key] = rec[key]
content = { 'result_content': rec['result_content'],
'header': rec['header'],
'result_header' : rec['result_header'],
'received' : rec['received'],
'io' : io_dict,
}
content = {
'header': rec['header'],
'metadata': rec['metadata'],
'result_metadata': rec['result_metadata'],
'result_header' : rec['result_header'],
'result_content': rec['result_content'],
'received' : rec['received'],
'io' : io_dict,
}
if rec['result_buffers']:
buffers = map(bytes, rec['result_buffers'])
else:
Expand Down

0 comments on commit ea4f608

Please sign in to comment.