diff --git a/IPython/frontend/html/notebook/static/js/kernel.js b/IPython/frontend/html/notebook/static/js/kernel.js
index b466b66dee5..77292ba4730 100644
--- a/IPython/frontend/html/notebook/static/js/kernel.js
+++ b/IPython/frontend/html/notebook/static/js/kernel.js
@@ -43,6 +43,7 @@ var IPython = (function (IPython) {
session : this.session_id,
msg_type : msg_type
},
+ metadata : {},
content : content,
parent_header : {}
};
@@ -220,21 +221,21 @@ var IPython = (function (IPython) {
// 'set_next_input': set_next_input_callback
// }
//
- // The execute_reply_callback will be passed the content object of the execute_reply
+ // The execute_reply_callback will be passed the content and metadata objects of the execute_reply
// message documented here:
//
// http://ipython.org/ipython-doc/dev/development/messaging.html#execute
//
// The output_callback will be passed msg_type ('stream','display_data','pyout','pyerr')
- // of the output and the content object of the PUB/SUB channel that contains the
+ // of the output and the content and metadata objects of the PUB/SUB channel that contains the
// output:
//
// http://ipython.org/ipython-doc/dev/development/messaging.html#messages-on-the-pub-sub-socket
//
// The clear_output_callback will be passed a content object that contains
- // stdout, stderr and other fields that are booleans.
+ // stdout, stderr and other fields that are booleans, as well as the metadata object.
//
- // The set_next_input_callback will bepassed the text that should become the next
+ // The set_next_input_callback will be passed the text that should become the next
// input cell.
var content = {
@@ -313,12 +314,13 @@ var IPython = (function (IPython) {
reply = $.parseJSON(e.data);
var header = reply.header;
var content = reply.content;
+ var metadata = reply.metadata;
var msg_type = header.msg_type;
var callbacks = this.get_callbacks_for_msg(reply.parent_header.msg_id);
if (callbacks !== undefined) {
var cb = callbacks[msg_type];
if (cb !== undefined) {
- cb(content);
+ cb(content, metadata);
}
};
@@ -347,9 +349,10 @@ var IPython = (function (IPython) {
Kernel.prototype._handle_iopub_reply = function (e) {
- reply = $.parseJSON(e.data);
+ var reply = $.parseJSON(e.data);
var content = reply.content;
var msg_type = reply.header.msg_type;
+ var metadata = reply.metadata;
var callbacks = this.get_callbacks_for_msg(reply.parent_header.msg_id);
if (msg_type !== 'status' && callbacks === undefined) {
// Message not from one of this notebook's cells and there are no
@@ -360,7 +363,7 @@ var IPython = (function (IPython) {
if (output_types.indexOf(msg_type) >= 0) {
var cb = callbacks['output'];
if (cb !== undefined) {
- cb(msg_type, content);
+ cb(msg_type, content, metadata);
}
} else if (msg_type === 'status') {
if (content.execution_state === 'busy') {
@@ -374,7 +377,7 @@ var IPython = (function (IPython) {
} else if (msg_type === 'clear_output') {
var cb = callbacks['clear_output'];
if (cb !== undefined) {
- cb(content);
+ cb(content, metadata);
}
};
};
diff --git a/IPython/parallel/client/client.py b/IPython/parallel/client/client.py
index a61036ea8b6..9e679181bb3 100644
--- a/IPython/parallel/client/client.py
+++ b/IPython/parallel/client/client.py
@@ -629,12 +629,16 @@ def _unwrap_exception(self, content):
e.engine_info['engine_id'] = eid
return e
- def _extract_metadata(self, header, parent, content):
+ def _extract_metadata(self, msg):
+ header = msg['header']
+ parent = msg['parent_header']
+ msg_meta = msg['metadata']
+ content = msg['content']
md = {'msg_id' : parent['msg_id'],
'received' : datetime.now(),
- 'engine_uuid' : header.get('engine', None),
- 'follow' : parent.get('follow', []),
- 'after' : parent.get('after', []),
+ 'engine_uuid' : msg_meta.get('engine', None),
+ 'follow' : msg_meta.get('follow', []),
+ 'after' : msg_meta.get('after', []),
'status' : content['status'],
}
@@ -643,8 +647,8 @@ def _extract_metadata(self, header, parent, content):
if 'date' in parent:
md['submitted'] = parent['date']
- if 'started' in header:
- md['started'] = header['started']
+ if 'started' in msg_meta:
+ md['started'] = msg_meta['started']
if 'date' in header:
md['completed'] = header['date']
return md
@@ -717,7 +721,7 @@ def _handle_execute_reply(self, msg):
# construct metadata:
md = self.metadata[msg_id]
- md.update(self._extract_metadata(header, parent, content))
+ md.update(self._extract_metadata(msg))
# is this redundant?
self.metadata[msg_id] = md
@@ -754,7 +758,7 @@ def _handle_apply_reply(self, msg):
# construct metadata:
md = self.metadata[msg_id]
- md.update(self._extract_metadata(header, parent, content))
+ md.update(self._extract_metadata(msg))
# is this redundant?
self.metadata[msg_id] = md
@@ -1180,7 +1184,7 @@ def _maybe_raise(self, result):
return result
- def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
+ def send_apply_request(self, socket, f, args=None, kwargs=None, metadata=None, track=False,
ident=None):
"""construct and send an apply message via a socket.
@@ -1193,7 +1197,7 @@ def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None,
# defaults:
args = args if args is not None else []
kwargs = kwargs if kwargs is not None else {}
- subheader = subheader if subheader is not None else {}
+ metadata = metadata if metadata is not None else {}
# validate arguments
if not callable(f) and not isinstance(f, Reference):
@@ -1202,13 +1206,13 @@ def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None,
raise TypeError("args must be tuple or list, not %s"%type(args))
if not isinstance(kwargs, dict):
raise TypeError("kwargs must be dict, not %s"%type(kwargs))
- if not isinstance(subheader, dict):
- raise TypeError("subheader must be dict, not %s"%type(subheader))
+ if not isinstance(metadata, dict):
+ raise TypeError("metadata must be dict, not %s"%type(metadata))
bufs = util.pack_apply_message(f,args,kwargs)
msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
- subheader=subheader, track=track)
+ metadata=metadata, track=track)
msg_id = msg['header']['msg_id']
self.outstanding.add(msg_id)
@@ -1224,7 +1228,7 @@ def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None,
return msg
- def send_execute_request(self, socket, code, silent=True, subheader=None, ident=None):
+ def send_execute_request(self, socket, code, silent=True, metadata=None, ident=None):
"""construct and send an execute request via a socket.
"""
@@ -1233,19 +1237,19 @@ def send_execute_request(self, socket, code, silent=True, subheader=None, ident=
raise RuntimeError("Client cannot be used after its sockets have been closed")
# defaults:
- subheader = subheader if subheader is not None else {}
+ metadata = metadata if metadata is not None else {}
# validate arguments
if not isinstance(code, basestring):
raise TypeError("code must be text, not %s" % type(code))
- if not isinstance(subheader, dict):
- raise TypeError("subheader must be dict, not %s" % type(subheader))
+ if not isinstance(metadata, dict):
+ raise TypeError("metadata must be dict, not %s" % type(metadata))
content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
msg = self.session.send(socket, "execute_request", content=content, ident=ident,
- subheader=subheader)
+ metadata=metadata)
msg_id = msg['header']['msg_id']
self.outstanding.add(msg_id)
@@ -1380,7 +1384,7 @@ def get_result(self, indices_or_msg_ids=None, block=None):
return ar
@spin_first
- def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
+ def resubmit(self, indices_or_msg_ids=None, metadata=None, block=None):
"""Resubmit one or more tasks.
in-flight tasks may not be resubmitted.
@@ -1518,7 +1522,13 @@ def result_status(self, msg_ids, status_only=True):
rcontent = self.session.unpack(rcontent)
md = self.metadata[msg_id]
- md.update(self._extract_metadata(header, parent, rcontent))
+ md_msg = dict(
+ content=rcontent,
+ parent_header=parent,
+ header=header,
+ metadata=rec['result_metadata'],
+ )
+ md.update(self._extract_metadata(md_msg))
if rec.get('received'):
md['received'] = rec['received']
md.update(iodict)
diff --git a/IPython/parallel/client/view.py b/IPython/parallel/client/view.py
index 5d047dac811..add9cfeaf21 100644
--- a/IPython/parallel/client/view.py
+++ b/IPython/parallel/client/view.py
@@ -1022,10 +1022,10 @@ def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
after = self._render_dependency(after)
follow = self._render_dependency(follow)
- subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
+ metadata = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
- subheader=subheader)
+ metadata=metadata)
tracker = None if track is False else msg['tracker']
ar = AsyncResult(self.client, msg['header']['msg_id'], fname=getname(f), targets=None, tracker=tracker)
diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py
index 49cdd6780fe..5782a5ecf0d 100644
--- a/IPython/parallel/controller/hub.py
+++ b/IPython/parallel/controller/hub.py
@@ -58,6 +58,7 @@ def empty_record():
return {
'msg_id' : None,
'header' : None,
+ 'metadata' : None,
'content': None,
'buffers': None,
'submitted': None,
@@ -68,6 +69,7 @@ def empty_record():
'resubmitted': None,
'received': None,
'result_header' : None,
+ 'result_metadata' : None,
'result_content' : None,
'result_buffers' : None,
'queue' : None,
@@ -85,6 +87,7 @@ def init_record(msg):
'msg_id' : header['msg_id'],
'header' : header,
'content': msg['content'],
+ 'metadata': msg['metadata'],
'buffers': msg['buffers'],
'submitted': header['date'],
'client_uuid' : None,
@@ -94,6 +97,7 @@ def init_record(msg):
'resubmitted': None,
'received': None,
'result_header' : None,
+ 'result_metadata': None,
'result_content' : None,
'result_buffers' : None,
'queue' : None,
@@ -655,10 +659,12 @@ def save_queue_result(self, idents, msg):
return
# update record anyway, because the unregistration could have been premature
rheader = msg['header']
+ md = msg['metadata']
completed = rheader['date']
- started = rheader.get('started', None)
+ started = md.get('started', None)
result = {
'result_header' : rheader,
+ 'result_metadata': md,
'result_content': msg['content'],
'received': datetime.now(),
'started' : started,
@@ -745,10 +751,11 @@ def save_task_result(self, idents, msg):
self.unassigned.remove(msg_id)
header = msg['header']
- engine_uuid = header.get('engine', u'')
+ md = msg['metadata']
+ engine_uuid = md.get('engine', u'')
eid = self.by_ident.get(cast_bytes(engine_uuid), None)
- status = header.get('status', None)
+ status = md.get('status', None)
if msg_id in self.pending:
self.log.info("task::task %r finished on %s", msg_id, eid)
@@ -760,9 +767,10 @@ def save_task_result(self, idents, msg):
if msg_id in self.tasks[eid]:
self.tasks[eid].remove(msg_id)
completed = header['date']
- started = header.get('started', None)
+ started = md.get('started', None)
result = {
'result_header' : header,
+ 'result_metadata': msg['metadata'],
'result_content': msg['content'],
'started' : started,
'completed' : completed,
@@ -1285,12 +1293,15 @@ def _extract_record(self, rec):
io_dict = {}
for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'):
io_dict[key] = rec[key]
- content = { 'result_content': rec['result_content'],
- 'header': rec['header'],
- 'result_header' : rec['result_header'],
- 'received' : rec['received'],
- 'io' : io_dict,
- }
+ content = {
+ 'header': rec['header'],
+ 'metadata': rec['metadata'],
+ 'result_metadata': rec['result_metadata'],
+ 'result_header' : rec['result_header'],
+ 'result_content': rec['result_content'],
+ 'received' : rec['received'],
+ 'io' : io_dict,
+ }
if rec['result_buffers']:
buffers = map(bytes, rec['result_buffers'])
else:
diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py
index 39e23657a55..c0c8f1c8b64 100644
--- a/IPython/parallel/controller/scheduler.py
+++ b/IPython/parallel/controller/scheduler.py
@@ -129,12 +129,14 @@ def leastload(loads):
class Job(object):
"""Simple container for a job"""
- def __init__(self, msg_id, raw_msg, idents, msg, header, targets, after, follow, timeout):
+ def __init__(self, msg_id, raw_msg, idents, msg, header, metadata,
+ targets, after, follow, timeout):
self.msg_id = msg_id
self.raw_msg = raw_msg
self.idents = idents
self.msg = msg
self.header = header
+ self.metadata = metadata
self.targets = targets
self.after = after
self.follow = follow
@@ -349,13 +351,13 @@ def handle_stranded_tasks(self, engine):
raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
except:
content = error.wrap_exception()
- # build fake header
- header = dict(
- status='error',
+ # build fake metadata
+ md = dict(
+ status=u'error',
engine=engine,
date=datetime.now(),
)
- msg = self.session.msg('apply_reply', content, parent=parent, subheader=header)
+ msg = self.session.msg('apply_reply', content, parent=parent, metadata=md)
raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
# and dispatch it
self.dispatch_result(raw_reply)
@@ -387,20 +389,21 @@ def dispatch_submission(self, raw_msg):
self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False)
header = msg['header']
+ md = msg['metadata']
msg_id = header['msg_id']
self.all_ids.add(msg_id)
# get targets as a set of bytes objects
# from a list of unicode objects
- targets = header.get('targets', [])
+ targets = md.get('targets', [])
targets = map(cast_bytes, targets)
targets = set(targets)
- retries = header.get('retries', 0)
+ retries = md.get('retries', 0)
self.retries[msg_id] = retries
# time dependencies
- after = header.get('after', None)
+ after = md.get('after', None)
if after:
after = Dependency(after)
if after.all:
@@ -424,10 +427,10 @@ def dispatch_submission(self, raw_msg):
after = MET
# location dependencies
- follow = Dependency(header.get('follow', []))
+ follow = Dependency(md.get('follow', []))
# turn timeouts into datetime objects:
- timeout = header.get('timeout', None)
+ timeout = md.get('timeout', None)
if timeout:
# cast to float, because jsonlib returns floats as decimal.Decimal,
# which timedelta does not accept
@@ -435,7 +438,7 @@ def dispatch_submission(self, raw_msg):
job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg,
header=header, targets=targets, after=after, follow=follow,
- timeout=timeout,
+ timeout=timeout, metadata=md,
)
# validate and reduce dependencies:
@@ -607,10 +610,10 @@ def dispatch_result(self, raw_msg):
self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
return
- header = msg['header']
+ md = msg['metadata']
parent = msg['parent_header']
- if header.get('dependencies_met', True):
- success = (header['status'] == 'ok')
+ if md.get('dependencies_met', True):
+ success = (md['status'] == 'ok')
msg_id = parent['msg_id']
retries = self.retries[msg_id]
if not success and retries > 0:
diff --git a/IPython/parallel/controller/sqlitedb.py b/IPython/parallel/controller/sqlitedb.py
index 2b017d40a8a..36f015dac69 100644
--- a/IPython/parallel/controller/sqlitedb.py
+++ b/IPython/parallel/controller/sqlitedb.py
@@ -109,6 +109,7 @@ class SQLiteDB(BaseDB):
# the ordered list of column names
_keys = List(['msg_id' ,
'header' ,
+ 'metadata',
'content',
'buffers',
'submitted',
@@ -119,6 +120,7 @@ class SQLiteDB(BaseDB):
'resubmitted',
'received',
'result_header' ,
+ 'result_metadata',
'result_content' ,
'result_buffers' ,
'queue' ,
@@ -131,6 +133,7 @@ class SQLiteDB(BaseDB):
# sqlite datatypes for checking that db is current format
_types = Dict({'msg_id' : 'text' ,
'header' : 'dict text',
+ 'metadata' : 'dict text',
'content' : 'dict text',
'buffers' : 'bufs blob',
'submitted' : 'timestamp',
@@ -141,6 +144,7 @@ class SQLiteDB(BaseDB):
'resubmitted' : 'text',
'received' : 'timestamp',
'result_header' : 'dict text',
+ 'result_metadata' : 'dict text',
'result_content' : 'dict text',
'result_buffers' : 'bufs blob',
'queue' : 'text',
@@ -240,6 +244,7 @@ def _init_db(self):
self._db.execute("""CREATE TABLE IF NOT EXISTS %s
(msg_id text PRIMARY KEY,
header dict text,
+ metadata dict text,
content dict text,
buffers bufs blob,
submitted timestamp,
@@ -250,6 +255,7 @@ def _init_db(self):
resubmitted text,
received timestamp,
result_header dict text,
+ result_metadata dict text,
result_content dict text,
result_buffers bufs blob,
queue text,
diff --git a/IPython/zmq/ipkernel.py b/IPython/zmq/ipkernel.py
index a0853f764e2..21b521d278b 100755
--- a/IPython/zmq/ipkernel.py
+++ b/IPython/zmq/ipkernel.py
@@ -218,9 +218,9 @@ def dispatch_shell(self, stream, msg):
# is it safe to assume a msg_id will not be resubmitted?
reply_type = msg_type.split('_')[0] + '_reply'
status = {'status' : 'aborted'}
- sub = {'engine' : self.ident}
- sub.update(status)
- reply_msg = self.session.send(stream, reply_type, subheader=sub,
+ md = {'engine' : self.ident}
+ md.update(status)
+ reply_msg = self.session.send(stream, reply_type, metadata=md,
content=status, parent=msg, ident=idents)
return
@@ -293,13 +293,16 @@ def record_ports(self, ports):
# Kernel request handlers
#---------------------------------------------------------------------------
- def _make_subheader(self):
- """init subheader dict, for execute/apply_reply"""
- return {
+ def _make_metadata(self, other=None):
+ """init metadata dict, for execute/apply_reply"""
+ new_md = {
'dependencies_met' : True,
'engine' : self.ident,
'started': datetime.now(),
}
+ if other:
+ new_md.update(other)
+ return new_md
def _publish_pyin(self, code, parent, execution_count):
"""Publish the code request on the pyin stream."""
@@ -333,7 +336,7 @@ def execute_request(self, stream, ident, parent):
self.log.error("%s", parent)
return
- sub = self._make_subheader()
+ md = self._make_metadata(parent['metadata'])
shell = self.shell # we'll need this a lot here
@@ -425,13 +428,13 @@ def execute_request(self, stream, ident, parent):
# Send the reply.
reply_content = json_clean(reply_content)
- sub['status'] = reply_content['status']
+ md['status'] = reply_content['status']
if reply_content['status'] == 'error' and \
reply_content['ename'] == 'UnmetDependency':
- sub['dependencies_met'] = False
+ md['dependencies_met'] = False
reply_msg = self.session.send(stream, u'execute_reply',
- reply_content, parent, subheader=sub,
+ reply_content, parent, metadata=md,
ident=ident)
self.log.debug("%s", reply_msg)
@@ -543,7 +546,7 @@ def apply_request(self, stream, ident, parent):
# pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
# self.iopub_socket.send(pyin_msg)
# self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
- sub = self._make_subheader()
+ md = self._make_metadata(parent['metadata'])
try:
working = shell.user_ns
@@ -589,19 +592,19 @@ def apply_request(self, stream, ident, parent):
result_buf = []
if reply_content['ename'] == 'UnmetDependency':
- sub['dependencies_met'] = False
+ md['dependencies_met'] = False
else:
reply_content = {'status' : 'ok'}
# put 'ok'/'error' status in header, for scheduler introspection:
- sub['status'] = reply_content['status']
+ md['status'] = reply_content['status']
# flush i/o
sys.stdout.flush()
sys.stderr.flush()
reply_msg = self.session.send(stream, u'apply_reply', reply_content,
- parent=parent, ident=ident,buffers=result_buf, subheader=sub)
+ parent=parent, ident=ident,buffers=result_buf, metadata=md)
self._publish_status(u'idle', parent)
@@ -672,9 +675,9 @@ def _abort_queue(self, stream):
reply_type = msg_type.split('_')[0] + '_reply'
status = {'status' : 'aborted'}
- sub = {'engine' : self.ident}
- sub.update(status)
- reply_msg = self.session.send(stream, reply_type, subheader=sub,
+ md = {'engine' : self.ident}
+ md.update(status)
+ reply_msg = self.session.send(stream, reply_type, meatadata=md,
content=status, parent=msg, ident=idents)
self.log.debug("%s", reply_msg)
# We need to wait a bit for requests to come in. This can probably
diff --git a/IPython/zmq/session.py b/IPython/zmq/session.py
index 4cf1ab230f2..26569fe466c 100644
--- a/IPython/zmq/session.py
+++ b/IPython/zmq/session.py
@@ -49,7 +49,7 @@
from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
from IPython.utils.py3compat import str_to_bytes
from IPython.utils.traitlets import (CBytes, Unicode, Bool, Any, Instance, Set,
- DottedObjectName, CUnicode)
+ DottedObjectName, CUnicode, Dict)
#-----------------------------------------------------------------------------
# utility functions
@@ -296,6 +296,9 @@ def _session_changed(self, name, old, new):
username = Unicode(os.environ.get('USER',u'username'), config=True,
help="""Username for the Session. Default is your system username.""")
+ metadata = Dict({}, config=True,
+ help="""Metadata dictionary, which serves as the default top-level metadata dict for each message.""")
+
# message signature related traits:
key = CBytes(b'', config=True,
@@ -406,7 +409,7 @@ def _check_packers(self):
def msg_header(self, msg_type):
return msg_header(self.msg_id, msg_type, self.username, self.session)
- def msg(self, msg_type, content=None, parent=None, subheader=None, header=None):
+ def msg(self, msg_type, content=None, parent=None, header=None, metadata=None):
"""Return the nested message dict.
This format is different from what is sent over the wire. The
@@ -420,8 +423,9 @@ def msg(self, msg_type, content=None, parent=None, subheader=None, header=None):
msg['msg_type'] = header['msg_type']
msg['parent_header'] = {} if parent is None else extract_header(parent)
msg['content'] = {} if content is None else content
- sub = {} if subheader is None else subheader
- msg['header'].update(sub)
+ msg['metadata'] = self.metadata.copy()
+ if metadata is not None:
+ msg['metadata'].update(metadata)
return msg
def sign(self, msg_list):
@@ -455,7 +459,7 @@ def serialize(self, msg, ident=None):
-------
msg_list : list
The list of bytes objects to be sent with the format:
- [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
+ [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_metadata,p_content,
buffer1,buffer2,...]. In this list, the p_* entities are
the packed or serialized versions, so if JSON is used, these
are utf8 encoded JSON strings.
@@ -476,7 +480,8 @@ def serialize(self, msg, ident=None):
real_message = [self.pack(msg['header']),
self.pack(msg['parent_header']),
- content
+ self.pack(msg['metadata']),
+ content,
]
to_send = []
@@ -496,7 +501,7 @@ def serialize(self, msg, ident=None):
return to_send
def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
- buffers=None, subheader=None, track=False, header=None):
+ buffers=None, track=False, header=None, metadata=None):
"""Build and send a message via stream or socket.
The message format used by this function internally is as follows:
@@ -520,20 +525,20 @@ def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
content : dict or None
The content of the message (ignored if msg_or_type is a message).
header : dict or None
- The header dict for the message (ignores if msg_to_type is a message).
+ The header dict for the message (ignored if msg_to_type is a message).
parent : Message or dict or None
The parent or parent header describing the parent of this message
(ignored if msg_or_type is a message).
ident : bytes or list of bytes
The zmq.IDENTITY routing path.
- subheader : dict or None
- Extra header keys for this message's header (ignored if msg_or_type
- is a message).
+ metadata : dict or None
+ The metadata describing the message
buffers : list or None
The already-serialized buffers to be appended to the message.
track : bool
Whether to track. Only for use with Sockets, because ZMQStream
objects cannot track messages.
+
Returns
-------
@@ -557,7 +562,7 @@ def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
msg = msg_or_type
else:
msg = self.msg(msg_or_type, content=content, parent=parent,
- subheader=subheader, header=header)
+ header=header, metadata=metadata)
buffers = [] if buffers is None else buffers
to_send = self.serialize(msg, ident)
@@ -600,7 +605,7 @@ def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None):
The ZMQ stream or socket to use for sending the message.
msg_list : list
The serialized list of messages to send. This only includes the
- [p_header,p_parent,p_content,buffer1,buffer2,...] portion of
+ [p_header,p_parent,p_metadata,p_content,buffer1,buffer2,...] portion of
the message.
ident : ident or list
A single ident or a list of idents to use in sending.
@@ -698,7 +703,7 @@ def unserialize(self, msg_list, content=True, copy=True):
-----------
msg_list : list of bytes or Message objects
The list of message parts of the form [HMAC,p_header,p_parent,
- p_content,buffer1,buffer2,...].
+ p_metadata,p_content,buffer1,buffer2,...].
content : bool (True)
Whether to unpack the content dict (True), or leave it packed
(False).
@@ -712,7 +717,7 @@ def unserialize(self, msg_list, content=True, copy=True):
The nested message dict with top-level keys [header, parent_header,
content, buffers].
"""
- minlen = 4
+ minlen = 5
message = {}
if not copy:
for i in range(minlen):
@@ -724,9 +729,9 @@ def unserialize(self, msg_list, content=True, copy=True):
if signature in self.digest_history:
raise ValueError("Duplicate Signature: %r"%signature)
self.digest_history.add(signature)
- check = self.sign(msg_list[1:4])
+ check = self.sign(msg_list[1:5])
if not signature == check:
- raise ValueError("Invalid Signature: %r"%signature)
+ raise ValueError("Invalid Signature: %r" % signature)
if not len(msg_list) >= minlen:
raise TypeError("malformed message, must have at least %i elements"%minlen)
header = self.unpack(msg_list[1])
@@ -734,12 +739,13 @@ def unserialize(self, msg_list, content=True, copy=True):
message['msg_id'] = header['msg_id']
message['msg_type'] = header['msg_type']
message['parent_header'] = self.unpack(msg_list[2])
+ message['metadata'] = self.unpack(msg_list[3])
if content:
- message['content'] = self.unpack(msg_list[3])
+ message['content'] = self.unpack(msg_list[4])
else:
- message['content'] = msg_list[3]
+ message['content'] = msg_list[4]
- message['buffers'] = msg_list[4:]
+ message['buffers'] = msg_list[5:]
return message
def test_msg2obj():
diff --git a/IPython/zmq/tests/test_session.py b/IPython/zmq/tests/test_session.py
index 5a440e6aa0a..accc1798068 100644
--- a/IPython/zmq/tests/test_session.py
+++ b/IPython/zmq/tests/test_session.py
@@ -47,10 +47,11 @@ class TestSession(SessionTestCase):
def test_msg(self):
"""message format"""
msg = self.session.msg('execute')
- thekeys = set('header parent_header content msg_type msg_id'.split())
+ thekeys = set('header parent_header metadata content msg_type msg_id'.split())
s = set(msg.keys())
self.assertEqual(s, thekeys)
self.assertTrue(isinstance(msg['content'],dict))
+ self.assertTrue(isinstance(msg['metadata'],dict))
self.assertTrue(isinstance(msg['header'],dict))
self.assertTrue(isinstance(msg['parent_header'],dict))
self.assertTrue(isinstance(msg['msg_id'],str))
@@ -69,6 +70,7 @@ def test_serialize(self):
self.assertEqual(new_msg['header'],msg['header'])
self.assertEqual(new_msg['content'],msg['content'])
self.assertEqual(new_msg['parent_header'],msg['parent_header'])
+ self.assertEqual(new_msg['metadata'],msg['metadata'])
# ensure floats don't come out as Decimal:
self.assertEqual(type(new_msg['content']['b']),type(new_msg['content']['b']))
@@ -85,6 +87,7 @@ def test_send(self):
self.assertEqual(new_msg['header'],msg['header'])
self.assertEqual(new_msg['content'],msg['content'])
self.assertEqual(new_msg['parent_header'],msg['parent_header'])
+ self.assertEqual(new_msg['metadata'],msg['metadata'])
self.assertEqual(new_msg['buffers'],[b'bar'])
socket.data = []
@@ -92,9 +95,10 @@ def test_send(self):
content = msg['content']
header = msg['header']
parent = msg['parent_header']
+ metadata = msg['metadata']
msg_type = header['msg_type']
self.session.send(socket, None, content=content, parent=parent,
- header=header, ident=b'foo', buffers=[b'bar'])
+ header=header, metadata=metadata, ident=b'foo', buffers=[b'bar'])
ident, msg_list = self.session.feed_identities(socket.data)
new_msg = self.session.unserialize(msg_list)
self.assertEqual(ident[0], b'foo')
@@ -102,6 +106,7 @@ def test_send(self):
self.assertEqual(new_msg['msg_type'],msg['msg_type'])
self.assertEqual(new_msg['header'],msg['header'])
self.assertEqual(new_msg['content'],msg['content'])
+ self.assertEqual(new_msg['metadata'],msg['metadata'])
self.assertEqual(new_msg['parent_header'],msg['parent_header'])
self.assertEqual(new_msg['buffers'],[b'bar'])
@@ -114,6 +119,7 @@ def test_send(self):
self.assertEqual(new_msg['msg_type'],msg['msg_type'])
self.assertEqual(new_msg['header'],msg['header'])
self.assertEqual(new_msg['content'],msg['content'])
+ self.assertEqual(new_msg['metadata'],msg['metadata'])
self.assertEqual(new_msg['parent_header'],msg['parent_header'])
self.assertEqual(new_msg['buffers'],[b'bar'])
diff --git a/docs/source/development/messaging.txt b/docs/source/development/messaging.txt
index ab45c64fd13..d723b890c81 100644
--- a/docs/source/development/messaging.txt
+++ b/docs/source/development/messaging.txt
@@ -81,7 +81,7 @@ representation of all the data, we can communicate with such clients.
General Message Format
======================
-A message is defined by the following three-dictionary structure::
+A message is defined by the following four-dictionary structure::
{
# The message header contains a pair of unique identifiers for the
@@ -105,6 +105,9 @@ A message is defined by the following three-dictionary structure::
# The actual content of the message must be a dict, whose structure
# depends on the message type.
'content' : dict,
+
+ # Any metadata associated with the message.
+ 'metadata' : dict,
}
@@ -127,6 +130,7 @@ messages upon deserialization to the following form for convenience::
'msg_type' : str,
'parent_header' : dict,
'content' : dict,
+ 'metadata' : dict,
}
All messages sent to or received by any IPython process should have this