Skip to content
This repository

Add a metadata attribute to messages #2051

Merged
merged 10 commits into from over 1 year ago

2 participants

Jason Grout Min RK
Jason Grout

We'd like to easily add metadata to messages. Adding an optional top-level metadata attribute to messages seems like a natural solution.

With this pull request, we can add metadata to a session using a context handler like:

@contextmanager
def session_metadata(metadata):
    # flush any messages waiting in buffers
    sys.stdout.flush()
    sys.stderr.flush()

    session = sys.stdout.session
    old_metadata = session.metadata
    new_metadata = old_metadata.copy()
    new_metadata.update(metadata)
    session.metadata = new_metadata
    try:
        yield None
    finally:
        sys.stdout.flush()
        sys.stderr.flush()
        session.metadata = old_metadata

with session_metadata({'location': 4}):
     print 'hello world' # goes to location 4

(is there a better way to get the session object than to get it from sys.stdout?)

Old Description

We'd like to attach some metadata to stdout and stderr messages, for example, indicating where
the frontend should put the output. It would be really convenient if sys.stdout and sys.stderr (which I think is
IPython.zmq.iostream.OutStream, right?) had a 'metadata' attribute that could be set and would be added to each outgoing message. This would also change the messaging spec to add a 'metadata' attribute to the
content dict for stream messages (just like display_data messages already have).

@contextmanager
def stream_metadata(metadata):
    sys.stdout.flush()
    stdout_metadata = sys.stdout.metadata
    old_stdout_metadata = stdout_metadata.copy()
    stdout_metadata.update(metadata)
    sys.stdout.set_metadata(stdout_metadata)

    sys.stderr.flush()
    stderr_metadata = sys.stderr.metadata
    old_stderr_metadata = stderr_metadata.copy()
    stderr_metadata.update(metadata)
    sys.stderr.set_metadata(stderr_metadata)
    try:
        yield None
    finally:
        sys.stdout.set_metadata(old_stdout_metadata)
        sys.stdout.set_metadata(old_stderr_metadata)

with stream_metadata({'location': 4}):
     print 'hello world' # goes to location 4
Jason Grout

Of course, docs need to change as well, and there may be other places in the codebase that would need to be changed. I'm willing to do those, but I want to make sure that you guys are good with the idea before putting in that time.

Min RK
Owner
minrk commented June 28, 2012

I think this is sensible, let's hear what @ellisonbg has to say.

Jason Grout

As we experiment with this, we realize that we also need to add this sort of information on display_data messages, pyout messages, etc. Maybe it would be cleaner to add a metadata attribute to the session object, and anytime the session sends a message (on the iopub channel?), it adds the metadata to the message. Should this sort of iopub-level metadata be added to the header instead of the content, though?

Jason Grout

Looking into things even more, it seems like if we just add the capability to define a default subheader in the session object, that might do all that we want.

Jason Grout

If we did put the metadata in the subheader, we'd also need to modify kernel.js to pass the header information (or maybe just a header metadata attribute) to callbacks, in the Kernel.execute function.

Jason Grout

I just added some commits which implement a configurable subheader object and also pass the header to javascript callbacks.

Jason Grout

Okay, here's a third idea, since having the metadata in the header just seems awkward. For one, the header lives on as parent_headers for possibly a long time. Secondly, an application (like callbacks in javascript) could care about metadata, but would probably rarely care about message ids or session uuids, so there seems to be a logical distinction between metadata/subheader and the header. In a sense, the header deals with routing, while metadata deals with the content.

Instead, how about adding a (optional?) metadata attribute to the basic message spec, so messages would look like {header: {...}, parent_header: {...}, metadata: {...}, content: {...}}? Things like the javascript callbacks would get the message content and the metadata.

Min RK
Owner
minrk commented June 30, 2012

I like the notion of metadata being a fundamental message property. Since we already put it in the content for some messages, I would say we should extend that to metadata being an optional key in the content dict of all message types. I'm not 100% sure about the API for this, at the Session level or otherwise.

Min RK
Owner
minrk commented June 30, 2012

Further thinking out loud:

There are three options:

  1. metadata in the header (possibly via subheader). I don't like this, because I don't think most library code should be using headers for anything. The content should be everything you need to know about the message itself, and the headers should only be used for low-level routing of handlers. But this is exactly what I did in the parallel code for message introspection without unpacking content.
  2. metadata as a fourth top-level component of all messages. On some level, this is the cleanest, but most handlers really only need the 'content' of the message after the Application has used the headers to figure out what handlers to call. So this would mean we are always passing the whole Message around, or we are always passing two dicts (content and metadata) around.
  3. metadata as an optional key in all content dicts. This has the benefit of extending what we already have, rather than changing it. But we don't actually use the metadata key from displaypub for anything, so there's not a high cost to change.

Personally, I'm thinking 2. is the best choice right now, as @jasongrout proposed.

Jason Grout

I'm liking the idea of a metadata top-level element more. The distinction from the content is elegant, in my mind. I think we may be to the point where a post to ipython-dev would make sense to get a wider audience; I'll post right now.

Jason Grout

I think the patches above add a top-level metadata attribute to messages in the right place. When the design decision has been made, I can squash the commits to make the history make more sense.

It'd be nice if github kept track of old versions of the description, so we could change it, but not lose context of the comments at the top.

Jason Grout

I added a few more commits to start working towards having metadata a mandatory element of messages. There might still be parts where I'm missing things (for example, the javascript code that creates messages?)

I'm stopping working on this until a decision is made about a design direction, though, since the source changes now seem to be more and more involved.

Min RK
Owner
minrk commented July 20, 2012

After SciPy discussion with enthought/enaml, we are definitely going with your metadata approach. I will check the JS to see if there's anything more that needs to be done.

One thing I would do is totally remove the subheader, and put all uses of that into metadata. Since I believe that 100% of subheader code is written by me, I am happy to do that.

Jason Grout

Great! Who is enaml?

Min RK
Owner
minrk commented July 20, 2012

https://github.com/enthought/enaml

Chris Colbert is the main developer here.

Min RK
Owner
minrk commented July 20, 2012

Can you add the few commits from my branch?

They remove subheader entirely, using metadata instead, and make sure the notebook works as well.

@ellisonbg can you weigh in on this one?

Jason Grout

Sure. Can you just submit a pull request to my branch to make it easy?

Min RK
Owner
minrk commented July 20, 2012

I tried to, but it didn't let me. Perhaps you have pull requests disabled for your branch?

You can still do git pull minrk stream-metadata the old-fashioned way.

Jason Grout

It didn't let you? That's very odd---what did it say? I don't even know if there is a way to disable pull requests on a branch.

I know I can pull, but I thought it would be good to learn the ins and outs of github better. If you don't want to diagnose this further, though, I can just pull. I just did it in a test branch and it merged just fine.

Jason Grout

(so just let me know if you want to try to figure github out better, or if you want me to just pull and get on with coding.)

Min RK
Owner
minrk commented July 20, 2012

I honestly have no idea why. But in the list of available pull targets in the New PR UI, which includes dozens of IPython forks, your fork is not listed. I think just give it a pull, and maybe it's a GitHub bug that will work itself out magically.

Jason Grout

huh, interesting. Okay, I pulled and pushed.

Jason Grout

Also, are there any other implementations of the spec that need to be changed? qtconsole or anything like that?

Min RK
Owner
minrk commented July 20, 2012

The Session object is the only implementation, shared by all IPython applications. The subheader was only used at the application level in IPython.parallel, so there shouldn't be anything to change anywhere else.

Jason Grout

Great!

Min RK
Owner
minrk commented July 20, 2012

This actually needs a rebase, due to a cleanup PR earlier in the month. It's just on the blocks of assertEquals in test_session in one commit.

If you want to do that rebase, go ahead. I also made a rebased version of this branch at minrk/metadata-rebased, so you can just use my version in-place with:

git fetch minrk
git reset minrk/metadata-rebased --hard
git push -f
Jason Grout

If you've already done the work, great! I just pushed the rebased branch.

It also probably wouldn't hurt to squash some of my commits down, since my early commits were just exploring ideas.

Min RK
Owner
minrk commented July 20, 2012

Since this PR is owned by you, you would have to do the squashing. You are welcome to do a git rebase -i if you like. But otherwise, I think this can go in as-is (as soon as I confirm passing with test_pr)

Jason Grout

Well, it's a question about the style of the project---do you want the thinking process, the scaffolding, as it were, or do you want a nice finished set of commits. If you think it can go in as-is, that's great. If you want me to clean it up a bit, let me know.

Min RK
Owner
minrk commented July 20, 2012

We definitely don't require that each commit be final - cleaning up a PR is usually up to the style preferences of the committer (I tend to try to squash my own typos), but not enforced unless a big mess is made.

Min RK
Owner
minrk commented July 20, 2012

Test results for commit 58134b4 merged into master
Platform: darwin

  • python2.6: OK (libraries not available: cython matplotlib oct2py pygments pymongo qt rpy2 tornado wx wx.aui)
  • python2.7: OK (libraries not available: oct2py wx wx.aui)
  • python3.2: OK (libraries not available: cython matplotlib oct2py pymongo qt rpy2 wx wx.aui)

Not available for testing: python3.1

Min RK minrk merged commit ea4f608 into from July 20, 2012
Min RK minrk closed this July 20, 2012
Jason Grout

I guess we were working past each other. I just cleaned things up and force pushed again, but I think it was after you merged, so hopefully things didn't get messed up.

Jason Grout

I'm glad it's in! Now we can run the Sage cell server off of ipython master rather than our own custom branch.

Min RK
Owner
minrk commented July 20, 2012

Sorry about that - we are working through our outstanding PRs, and I wanted this one in before others of mine that might have caused this to need another rebase.

Thanks for the work!

Takafumi Arakaki tkf referenced this pull request in tkf/emacs-ipython-notebook August 06, 2012
Closed

Adapt to the new IPython messaging protocol #46

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
19  IPython/frontend/html/notebook/static/js/kernel.js
@@ -43,6 +43,7 @@ var IPython = (function (IPython) {
43 43
                 session : this.session_id,
44 44
                 msg_type : msg_type
45 45
             },
  46
+            metadata : {},
46 47
             content : content,
47 48
             parent_header : {}
48 49
         };
@@ -220,21 +221,21 @@ var IPython = (function (IPython) {
220 221
         //  'set_next_input': set_next_input_callback
221 222
         // }
222 223
         //
223  
-        // The execute_reply_callback will be passed the content object of the execute_reply
  224
+        // The execute_reply_callback will be passed the content and metadata objects of the execute_reply
224 225
         // message documented here:
225 226
         //
226 227
         // http://ipython.org/ipython-doc/dev/development/messaging.html#execute
227 228
         //
228 229
         // The output_callback will be passed msg_type ('stream','display_data','pyout','pyerr')
229  
-        // of the output and the content object of the PUB/SUB channel that contains the
  230
+        // of the output and the content and metadata objects of the PUB/SUB channel that contains the
230 231
         // output:
231 232
         //
232 233
         // http://ipython.org/ipython-doc/dev/development/messaging.html#messages-on-the-pub-sub-socket
233 234
         //
234 235
         // The clear_output_callback will be passed a content object that contains
235  
-        // stdout, stderr and other fields that are booleans.
  236
+        // stdout, stderr and other fields that are booleans, as well as the metadata object.
236 237
         //
237  
-        // The set_next_input_callback will bepassed the text that should become the next
  238
+        // The set_next_input_callback will be passed the text that should become the next
238 239
         // input cell.
239 240
 
240 241
         var content = {
@@ -313,12 +314,13 @@ var IPython = (function (IPython) {
313 314
         reply = $.parseJSON(e.data);
314 315
         var header = reply.header;
315 316
         var content = reply.content;
  317
+        var metadata = reply.metadata;
316 318
         var msg_type = header.msg_type;
317 319
         var callbacks = this.get_callbacks_for_msg(reply.parent_header.msg_id);
318 320
         if (callbacks !== undefined) {
319 321
             var cb = callbacks[msg_type];
320 322
             if (cb !== undefined) {
321  
-                cb(content);
  323
+                cb(content, metadata);
322 324
             }
323 325
         };
324 326
 
@@ -347,9 +349,10 @@ var IPython = (function (IPython) {
347 349
 
348 350
 
349 351
     Kernel.prototype._handle_iopub_reply = function (e) {
350  
-        reply = $.parseJSON(e.data);
  352
+        var reply = $.parseJSON(e.data);
351 353
         var content = reply.content;
352 354
         var msg_type = reply.header.msg_type;
  355
+        var metadata = reply.metadata;
353 356
         var callbacks = this.get_callbacks_for_msg(reply.parent_header.msg_id);
354 357
         if (msg_type !== 'status' && callbacks === undefined) {
355 358
             // Message not from one of this notebook's cells and there are no
@@ -360,7 +363,7 @@ var IPython = (function (IPython) {
360 363
         if (output_types.indexOf(msg_type) >= 0) {
361 364
             var cb = callbacks['output'];
362 365
             if (cb !== undefined) {
363  
-                cb(msg_type, content);
  366
+                cb(msg_type, content, metadata);
364 367
             }
365 368
         } else if (msg_type === 'status') {
366 369
             if (content.execution_state === 'busy') {
@@ -374,7 +377,7 @@ var IPython = (function (IPython) {
374 377
         } else if (msg_type === 'clear_output') {
375 378
             var cb = callbacks['clear_output'];
376 379
             if (cb !== undefined) {
377  
-                cb(content);
  380
+                cb(content, metadata);
378 381
             }
379 382
         };
380 383
     };
50  IPython/parallel/client/client.py
@@ -650,12 +650,16 @@ def _unwrap_exception(self, content):
650 650
             e.engine_info['engine_id'] = eid
651 651
         return e
652 652
 
653  
-    def _extract_metadata(self, header, parent, content):
  653
+    def _extract_metadata(self, msg):
  654
+        header = msg['header']
  655
+        parent = msg['parent_header']
  656
+        msg_meta = msg['metadata']
  657
+        content = msg['content']
654 658
         md = {'msg_id' : parent['msg_id'],
655 659
               'received' : datetime.now(),
656  
-              'engine_uuid' : header.get('engine', None),
657  
-              'follow' : parent.get('follow', []),
658  
-              'after' : parent.get('after', []),
  660
+              'engine_uuid' : msg_meta.get('engine', None),
  661
+              'follow' : msg_meta.get('follow', []),
  662
+              'after' : msg_meta.get('after', []),
659 663
               'status' : content['status'],
660 664
             }
661 665
 
@@ -664,8 +668,8 @@ def _extract_metadata(self, header, parent, content):
664 668
 
665 669
         if 'date' in parent:
666 670
             md['submitted'] = parent['date']
667  
-        if 'started' in header:
668  
-            md['started'] = header['started']
  671
+        if 'started' in msg_meta:
  672
+            md['started'] = msg_meta['started']
669 673
         if 'date' in header:
670 674
             md['completed'] = header['date']
671 675
         return md
@@ -738,7 +742,7 @@ def _handle_execute_reply(self, msg):
738 742
 
739 743
         # construct metadata:
740 744
         md = self.metadata[msg_id]
741  
-        md.update(self._extract_metadata(header, parent, content))
  745
+        md.update(self._extract_metadata(msg))
742 746
         # is this redundant?
743 747
         self.metadata[msg_id] = md
744 748
         
@@ -775,7 +779,7 @@ def _handle_apply_reply(self, msg):
775 779
 
776 780
         # construct metadata:
777 781
         md = self.metadata[msg_id]
778  
-        md.update(self._extract_metadata(header, parent, content))
  782
+        md.update(self._extract_metadata(msg))
779 783
         # is this redundant?
780 784
         self.metadata[msg_id] = md
781 785
 
@@ -1201,7 +1205,7 @@ def _maybe_raise(self, result):
1201 1205
 
1202 1206
         return result
1203 1207
 
1204  
-    def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
  1208
+    def send_apply_request(self, socket, f, args=None, kwargs=None, metadata=None, track=False,
1205 1209
                             ident=None):
1206 1210
         """construct and send an apply message via a socket.
1207 1211
 
@@ -1214,7 +1218,7 @@ def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None,
1214 1218
         # defaults:
1215 1219
         args = args if args is not None else []
1216 1220
         kwargs = kwargs if kwargs is not None else {}
1217  
-        subheader = subheader if subheader is not None else {}
  1221
+        metadata = metadata if metadata is not None else {}
1218 1222
 
1219 1223
         # validate arguments
1220 1224
         if not callable(f) and not isinstance(f, Reference):
@@ -1223,13 +1227,13 @@ def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None,
1223 1227
             raise TypeError("args must be tuple or list, not %s"%type(args))
1224 1228
         if not isinstance(kwargs, dict):
1225 1229
             raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1226  
-        if not isinstance(subheader, dict):
1227  
-            raise TypeError("subheader must be dict, not %s"%type(subheader))
  1230
+        if not isinstance(metadata, dict):
  1231
+            raise TypeError("metadata must be dict, not %s"%type(metadata))
1228 1232
 
1229 1233
         bufs = util.pack_apply_message(f,args,kwargs)
1230 1234
 
1231 1235
         msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1232  
-                            subheader=subheader, track=track)
  1236
+                            metadata=metadata, track=track)
1233 1237
 
1234 1238
         msg_id = msg['header']['msg_id']
1235 1239
         self.outstanding.add(msg_id)
@@ -1245,7 +1249,7 @@ def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None,
1245 1249
 
1246 1250
         return msg
1247 1251
 
1248  
-    def send_execute_request(self, socket, code, silent=True, subheader=None, ident=None):
  1252
+    def send_execute_request(self, socket, code, silent=True, metadata=None, ident=None):
1249 1253
         """construct and send an execute request via a socket.
1250 1254
 
1251 1255
         """
@@ -1254,19 +1258,19 @@ def send_execute_request(self, socket, code, silent=True, subheader=None, ident=
1254 1258
             raise RuntimeError("Client cannot be used after its sockets have been closed")
1255 1259
         
1256 1260
         # defaults:
1257  
-        subheader = subheader if subheader is not None else {}
  1261
+        metadata = metadata if metadata is not None else {}
1258 1262
 
1259 1263
         # validate arguments
1260 1264
         if not isinstance(code, basestring):
1261 1265
             raise TypeError("code must be text, not %s" % type(code))
1262  
-        if not isinstance(subheader, dict):
1263  
-            raise TypeError("subheader must be dict, not %s" % type(subheader))
  1266
+        if not isinstance(metadata, dict):
  1267
+            raise TypeError("metadata must be dict, not %s" % type(metadata))
1264 1268
         
1265 1269
         content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1266 1270
 
1267 1271
 
1268 1272
         msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1269  
-                            subheader=subheader)
  1273
+                            metadata=metadata)
1270 1274
 
1271 1275
         msg_id = msg['header']['msg_id']
1272 1276
         self.outstanding.add(msg_id)
@@ -1401,7 +1405,7 @@ def get_result(self, indices_or_msg_ids=None, block=None):
1401 1405
         return ar
1402 1406
 
1403 1407
     @spin_first
1404  
-    def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
  1408
+    def resubmit(self, indices_or_msg_ids=None, metadata=None, block=None):
1405 1409
         """Resubmit one or more tasks.
1406 1410
 
1407 1411
         in-flight tasks may not be resubmitted.
@@ -1539,7 +1543,13 @@ def result_status(self, msg_ids, status_only=True):
1539 1543
                     rcontent = self.session.unpack(rcontent)
1540 1544
 
1541 1545
                 md = self.metadata[msg_id]
1542  
-                md.update(self._extract_metadata(header, parent, rcontent))
  1546
+                md_msg = dict(
  1547
+                    content=rcontent,
  1548
+                    parent_header=parent,
  1549
+                    header=header,
  1550
+                    metadata=rec['result_metadata'],
  1551
+                )
  1552
+                md.update(self._extract_metadata(md_msg))
1543 1553
                 if rec.get('received'):
1544 1554
                     md['received'] = rec['received']
1545 1555
                 md.update(iodict)
4  IPython/parallel/client/view.py
@@ -1022,10 +1022,10 @@ def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
1022 1022
 
1023 1023
         after = self._render_dependency(after)
1024 1024
         follow = self._render_dependency(follow)
1025  
-        subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
  1025
+        metadata = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
1026 1026
 
1027 1027
         msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
1028  
-                                subheader=subheader)
  1028
+                                metadata=metadata)
1029 1029
         tracker = None if track is False else msg['tracker']
1030 1030
 
1031 1031
         ar = AsyncResult(self.client, msg['header']['msg_id'], fname=getname(f), targets=None, tracker=tracker)
31  IPython/parallel/controller/hub.py
@@ -56,6 +56,7 @@ def empty_record():
56 56
     return {
57 57
         'msg_id' : None,
58 58
         'header' : None,
  59
+        'metadata' : None,
59 60
         'content': None,
60 61
         'buffers': None,
61 62
         'submitted': None,
@@ -66,6 +67,7 @@ def empty_record():
66 67
         'resubmitted': None,
67 68
         'received': None,
68 69
         'result_header' : None,
  70
+        'result_metadata' : None,
69 71
         'result_content' : None,
70 72
         'result_buffers' : None,
71 73
         'queue' : None,
@@ -83,6 +85,7 @@ def init_record(msg):
83 85
         'msg_id' : header['msg_id'],
84 86
         'header' : header,
85 87
         'content': msg['content'],
  88
+        'metadata': msg['metadata'],
86 89
         'buffers': msg['buffers'],
87 90
         'submitted': header['date'],
88 91
         'client_uuid' : None,
@@ -92,6 +95,7 @@ def init_record(msg):
92 95
         'resubmitted': None,
93 96
         'received': None,
94 97
         'result_header' : None,
  98
+        'result_metadata': None,
95 99
         'result_content' : None,
96 100
         'result_buffers' : None,
97 101
         'queue' : None,
@@ -646,10 +650,12 @@ def save_queue_result(self, idents, msg):
646 650
             return
647 651
         # update record anyway, because the unregistration could have been premature
648 652
         rheader = msg['header']
  653
+        md = msg['metadata']
649 654
         completed = rheader['date']
650  
-        started = rheader.get('started', None)
  655
+        started = md.get('started', None)
651 656
         result = {
652 657
             'result_header' : rheader,
  658
+            'result_metadata': md,
653 659
             'result_content': msg['content'],
654 660
             'received': datetime.now(),
655 661
             'started' : started,
@@ -736,10 +742,11 @@ def save_task_result(self, idents, msg):
736 742
             self.unassigned.remove(msg_id)
737 743
 
738 744
         header = msg['header']
739  
-        engine_uuid = header.get('engine', u'')
  745
+        md = msg['metadata']
  746
+        engine_uuid = md.get('engine', u'')
740 747
         eid = self.by_ident.get(cast_bytes(engine_uuid), None)
741 748
         
742  
-        status = header.get('status', None)
  749
+        status = md.get('status', None)
743 750
 
744 751
         if msg_id in self.pending:
745 752
             self.log.info("task::task %r finished on %s", msg_id, eid)
@@ -751,9 +758,10 @@ def save_task_result(self, idents, msg):
751 758
                 if msg_id in self.tasks[eid]:
752 759
                     self.tasks[eid].remove(msg_id)
753 760
             completed = header['date']
754  
-            started = header.get('started', None)
  761
+            started = md.get('started', None)
755 762
             result = {
756 763
                 'result_header' : header,
  764
+                'result_metadata': msg['metadata'],
757 765
                 'result_content': msg['content'],
758 766
                 'started' : started,
759 767
                 'completed' : completed,
@@ -1221,12 +1229,15 @@ def _extract_record(self, rec):
1221 1229
         io_dict = {}
1222 1230
         for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'):
1223 1231
                 io_dict[key] = rec[key]
1224  
-        content = { 'result_content': rec['result_content'],
1225  
-                            'header': rec['header'],
1226  
-                            'result_header' : rec['result_header'],
1227  
-                            'received' : rec['received'],
1228  
-                            'io' : io_dict,
1229  
-                          }
  1232
+        content = { 
  1233
+            'header': rec['header'],
  1234
+            'metadata': rec['metadata'],
  1235
+            'result_metadata': rec['result_metadata'],
  1236
+            'result_header' : rec['result_header'],
  1237
+            'result_content': rec['result_content'],
  1238
+            'received' : rec['received'],
  1239
+            'io' : io_dict,
  1240
+        }
1230 1241
         if rec['result_buffers']:
1231 1242
             buffers = map(bytes, rec['result_buffers'])
1232 1243
         else:
31  IPython/parallel/controller/scheduler.py
@@ -129,12 +129,14 @@ def leastload(loads):
129 129
 
130 130
 class Job(object):
131 131
     """Simple container for a job"""
132  
-    def __init__(self, msg_id, raw_msg, idents, msg, header, targets, after, follow, timeout):
  132
+    def __init__(self, msg_id, raw_msg, idents, msg, header, metadata,
  133
+                    targets, after, follow, timeout):
133 134
         self.msg_id = msg_id
134 135
         self.raw_msg = raw_msg
135 136
         self.idents = idents
136 137
         self.msg = msg
137 138
         self.header = header
  139
+        self.metadata = metadata
138 140
         self.targets = targets
139 141
         self.after = after
140 142
         self.follow = follow
@@ -327,13 +329,13 @@ def handle_stranded_tasks(self, engine):
327 329
                 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
328 330
             except:
329 331
                 content = error.wrap_exception()
330  
-            # build fake header
331  
-            header = dict(
332  
-                status='error',
  332
+            # build fake metadata
  333
+            md = dict(
  334
+                status=u'error',
333 335
                 engine=engine,
334 336
                 date=datetime.now(),
335 337
             )
336  
-            msg = self.session.msg('apply_reply', content, parent=parent, subheader=header)
  338
+            msg = self.session.msg('apply_reply', content, parent=parent, metadata=md)
337 339
             raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
338 340
             # and dispatch it
339 341
             self.dispatch_result(raw_reply)
@@ -365,20 +367,21 @@ def dispatch_submission(self, raw_msg):
365 367
         self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False)
366 368
 
367 369
         header = msg['header']
  370
+        md = msg['metadata']
368 371
         msg_id = header['msg_id']
369 372
         self.all_ids.add(msg_id)
370 373
 
371 374
         # get targets as a set of bytes objects
372 375
         # from a list of unicode objects
373  
-        targets = header.get('targets', [])
  376
+        targets = md.get('targets', [])
374 377
         targets = map(cast_bytes, targets)
375 378
         targets = set(targets)
376 379
 
377  
-        retries = header.get('retries', 0)
  380
+        retries = md.get('retries', 0)
378 381
         self.retries[msg_id] = retries
379 382
 
380 383
         # time dependencies
381  
-        after = header.get('after', None)
  384
+        after = md.get('after', None)
382 385
         if after:
383 386
             after = Dependency(after)
384 387
             if after.all:
@@ -402,10 +405,10 @@ def dispatch_submission(self, raw_msg):
402 405
             after = MET
403 406
 
404 407
         # location dependencies
405  
-        follow = Dependency(header.get('follow', []))
  408
+        follow = Dependency(md.get('follow', []))
406 409
 
407 410
         # turn timeouts into datetime objects:
408  
-        timeout = header.get('timeout', None)
  411
+        timeout = md.get('timeout', None)
409 412
         if timeout:
410 413
             # cast to float, because jsonlib returns floats as decimal.Decimal,
411 414
             # which timedelta does not accept
@@ -413,7 +416,7 @@ def dispatch_submission(self, raw_msg):
413 416
 
414 417
         job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg,
415 418
                  header=header, targets=targets, after=after, follow=follow,
416  
-                 timeout=timeout,
  419
+                 timeout=timeout, metadata=md,
417 420
         )
418 421
 
419 422
         # validate and reduce dependencies:
@@ -585,10 +588,10 @@ def dispatch_result(self, raw_msg):
585 588
             self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
586 589
             return
587 590
 
588  
-        header = msg['header']
  591
+        md = msg['metadata']
589 592
         parent = msg['parent_header']
590  
-        if header.get('dependencies_met', True):
591  
-            success = (header['status'] == 'ok')
  593
+        if md.get('dependencies_met', True):
  594
+            success = (md['status'] == 'ok')
592 595
             msg_id = parent['msg_id']
593 596
             retries = self.retries[msg_id]
594 597
             if not success and retries > 0:
6  IPython/parallel/controller/sqlitedb.py
@@ -109,6 +109,7 @@ class SQLiteDB(BaseDB):
109 109
     # the ordered list of column names
110 110
     _keys = List(['msg_id' ,
111 111
             'header' ,
  112
+            'metadata',
112 113
             'content',
113 114
             'buffers',
114 115
             'submitted',
@@ -119,6 +120,7 @@ class SQLiteDB(BaseDB):
119 120
             'resubmitted',
120 121
             'received',
121 122
             'result_header' ,
  123
+            'result_metadata',
122 124
             'result_content' ,
123 125
             'result_buffers' ,
124 126
             'queue' ,
@@ -131,6 +133,7 @@ class SQLiteDB(BaseDB):
131 133
     # sqlite datatypes for checking that db is current format
132 134
     _types = Dict({'msg_id' : 'text' ,
133 135
             'header' : 'dict text',
  136
+            'metadata' : 'dict text',
134 137
             'content' : 'dict text',
135 138
             'buffers' : 'bufs blob',
136 139
             'submitted' : 'timestamp',
@@ -141,6 +144,7 @@ class SQLiteDB(BaseDB):
141 144
             'resubmitted' : 'text',
142 145
             'received' : 'timestamp',
143 146
             'result_header' : 'dict text',
  147
+            'result_metadata' : 'dict text',
144 148
             'result_content' : 'dict text',
145 149
             'result_buffers' : 'bufs blob',
146 150
             'queue' : 'text',
@@ -240,6 +244,7 @@ def _init_db(self):
240 244
         self._db.execute("""CREATE TABLE IF NOT EXISTS %s
241 245
                 (msg_id text PRIMARY KEY,
242 246
                 header dict text,
  247
+                metadata dict text,
243 248
                 content dict text,
244 249
                 buffers bufs blob,
245 250
                 submitted timestamp,
@@ -250,6 +255,7 @@ def _init_db(self):
250 255
                 resubmitted text,
251 256
                 received timestamp,
252 257
                 result_header dict text,
  258
+                result_metadata dict text,
253 259
                 result_content dict text,
254 260
                 result_buffers bufs blob,
255 261
                 queue text,
37  IPython/zmq/ipkernel.py
@@ -218,9 +218,9 @@ def dispatch_shell(self, stream, msg):
218 218
             # is it safe to assume a msg_id will not be resubmitted?
219 219
             reply_type = msg_type.split('_')[0] + '_reply'
220 220
             status = {'status' : 'aborted'}
221  
-            sub = {'engine' : self.ident}
222  
-            sub.update(status)
223  
-            reply_msg = self.session.send(stream, reply_type, subheader=sub,
  221
+            md = {'engine' : self.ident}
  222
+            md.update(status)
  223
+            reply_msg = self.session.send(stream, reply_type, metadata=md,
224 224
                         content=status, parent=msg, ident=idents)
225 225
             return
226 226
         
@@ -293,13 +293,16 @@ def record_ports(self, ports):
293 293
     # Kernel request handlers
294 294
     #---------------------------------------------------------------------------
295 295
     
296  
-    def _make_subheader(self):
297  
-        """init subheader dict, for execute/apply_reply"""
298  
-        return {
  296
+    def _make_metadata(self, other=None):
  297
+        """init metadata dict, for execute/apply_reply"""
  298
+        new_md = {
299 299
             'dependencies_met' : True,
300 300
             'engine' : self.ident,
301 301
             'started': datetime.now(),
302 302
         }
  303
+        if other:
  304
+            new_md.update(other)
  305
+        return new_md
303 306
     
304 307
     def _publish_pyin(self, code, parent, execution_count):
305 308
         """Publish the code request on the pyin stream."""
@@ -333,7 +336,7 @@ def execute_request(self, stream, ident, parent):
333 336
             self.log.error("%s", parent)
334 337
             return
335 338
         
336  
-        sub = self._make_subheader()
  339
+        md = self._make_metadata(parent['metadata'])
337 340
 
338 341
         shell = self.shell # we'll need this a lot here
339 342
 
@@ -425,13 +428,13 @@ def execute_request(self, stream, ident, parent):
425 428
         # Send the reply.
426 429
         reply_content = json_clean(reply_content)
427 430
         
428  
-        sub['status'] = reply_content['status']
  431
+        md['status'] = reply_content['status']
429 432
         if reply_content['status'] == 'error' and \
430 433
                         reply_content['ename'] == 'UnmetDependency':
431  
-                sub['dependencies_met'] = False
  434
+                md['dependencies_met'] = False
432 435
 
433 436
         reply_msg = self.session.send(stream, u'execute_reply',
434  
-                                      reply_content, parent, subheader=sub,
  437
+                                      reply_content, parent, metadata=md,
435 438
                                       ident=ident)
436 439
         
437 440
         self.log.debug("%s", reply_msg)
@@ -543,7 +546,7 @@ def apply_request(self, stream, ident, parent):
543 546
         # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
544 547
         # self.iopub_socket.send(pyin_msg)
545 548
         # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
546  
-        sub = self._make_subheader()
  549
+        md = self._make_metadata(parent['metadata'])
547 550
         try:
548 551
             working = shell.user_ns
549 552
 
@@ -589,19 +592,19 @@ def apply_request(self, stream, ident, parent):
589 592
             result_buf = []
590 593
 
591 594
             if reply_content['ename'] == 'UnmetDependency':
592  
-                sub['dependencies_met'] = False
  595
+                md['dependencies_met'] = False
593 596
         else:
594 597
             reply_content = {'status' : 'ok'}
595 598
 
596 599
         # put 'ok'/'error' status in header, for scheduler introspection:
597  
-        sub['status'] = reply_content['status']
  600
+        md['status'] = reply_content['status']
598 601
 
599 602
         # flush i/o
600 603
         sys.stdout.flush()
601 604
         sys.stderr.flush()
602 605
         
603 606
         reply_msg = self.session.send(stream, u'apply_reply', reply_content,
604  
-                    parent=parent, ident=ident,buffers=result_buf, subheader=sub)
  607
+                    parent=parent, ident=ident,buffers=result_buf, metadata=md)
605 608
 
606 609
         self._publish_status(u'idle', parent)
607 610
 
@@ -672,9 +675,9 @@ def _abort_queue(self, stream):
672 675
             reply_type = msg_type.split('_')[0] + '_reply'
673 676
 
674 677
             status = {'status' : 'aborted'}
675  
-            sub = {'engine' : self.ident}
676  
-            sub.update(status)
677  
-            reply_msg = self.session.send(stream, reply_type, subheader=sub,
  678
+            md = {'engine' : self.ident}
  679
+            md.update(status)
  680
+            reply_msg = self.session.send(stream, reply_type, meatadata=md,
678 681
                         content=status, parent=msg, ident=idents)
679 682
             self.log.debug("%s", reply_msg)
680 683
             # We need to wait a bit for requests to come in. This can probably
46  IPython/zmq/session.py
@@ -49,7 +49,7 @@
49 49
 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
50 50
 from IPython.utils.py3compat import str_to_bytes
51 51
 from IPython.utils.traitlets import (CBytes, Unicode, Bool, Any, Instance, Set,
52  
-                                        DottedObjectName, CUnicode)
  52
+                                        DottedObjectName, CUnicode, Dict)
53 53
 
54 54
 #-----------------------------------------------------------------------------
55 55
 # utility functions
@@ -292,6 +292,9 @@ def _session_changed(self, name, old, new):
292 292
     username = Unicode(os.environ.get('USER',u'username'), config=True,
293 293
         help="""Username for the Session. Default is your system username.""")
294 294
 
  295
+    metadata = Dict({}, config=True,
  296
+        help="""Metadata dictionary, which serves as the default top-level metadata dict for each message.""")
  297
+
295 298
     # message signature related traits:
296 299
     
297 300
     key = CBytes(b'', config=True,
@@ -402,7 +405,7 @@ def _check_packers(self):
402 405
     def msg_header(self, msg_type):
403 406
         return msg_header(self.msg_id, msg_type, self.username, self.session)
404 407
 
405  
-    def msg(self, msg_type, content=None, parent=None, subheader=None, header=None):
  408
+    def msg(self, msg_type, content=None, parent=None, header=None, metadata=None):
406 409
         """Return the nested message dict.
407 410
 
408 411
         This format is different from what is sent over the wire. The
@@ -416,8 +419,9 @@ def msg(self, msg_type, content=None, parent=None, subheader=None, header=None):
416 419
         msg['msg_type'] = header['msg_type']
417 420
         msg['parent_header'] = {} if parent is None else extract_header(parent)
418 421
         msg['content'] = {} if content is None else content
419  
-        sub = {} if subheader is None else subheader
420  
-        msg['header'].update(sub)
  422
+        msg['metadata'] = self.metadata.copy()
  423
+        if metadata is not None:
  424
+            msg['metadata'].update(metadata)
421 425
         return msg
422 426
 
423 427
     def sign(self, msg_list):
@@ -451,7 +455,7 @@ def serialize(self, msg, ident=None):
451 455
         -------
452 456
         msg_list : list
453 457
             The list of bytes objects to be sent with the format:
454  
-            [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
  458
+            [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_metadata,p_content,
455 459
              buffer1,buffer2,...]. In this list, the p_* entities are
456 460
             the packed or serialized versions, so if JSON is used, these
457 461
             are utf8 encoded JSON strings.
@@ -472,7 +476,8 @@ def serialize(self, msg, ident=None):
472 476
 
473 477
         real_message = [self.pack(msg['header']),
474 478
                         self.pack(msg['parent_header']),
475  
-                        content
  479
+                        self.pack(msg['metadata']),
  480
+                        content,
476 481
         ]
477 482
 
478 483
         to_send = []
@@ -492,7 +497,7 @@ def serialize(self, msg, ident=None):
492 497
         return to_send
493 498
 
494 499
     def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
495  
-             buffers=None, subheader=None, track=False, header=None):
  500
+             buffers=None, track=False, header=None, metadata=None):
496 501
         """Build and send a message via stream or socket.
497 502
 
498 503
         The message format used by this function internally is as follows:
@@ -516,20 +521,20 @@ def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
516 521
         content : dict or None
517 522
             The content of the message (ignored if msg_or_type is a message).
518 523
         header : dict or None
519  
-            The header dict for the message (ignores if msg_to_type is a message).
  524
+            The header dict for the message (ignored if msg_to_type is a message).
520 525
         parent : Message or dict or None
521 526
             The parent or parent header describing the parent of this message
522 527
             (ignored if msg_or_type is a message).
523 528
         ident : bytes or list of bytes
524 529
             The zmq.IDENTITY routing path.
525  
-        subheader : dict or None
526  
-            Extra header keys for this message's header (ignored if msg_or_type
527  
-            is a message).
  530
+        metadata : dict or None
  531
+            The metadata describing the message
528 532
         buffers : list or None
529 533
             The already-serialized buffers to be appended to the message.
530 534
         track : bool
531 535
             Whether to track.  Only for use with Sockets, because ZMQStream
532 536
             objects cannot track messages.
  537
+            
533 538
 
534 539
         Returns
535 540
         -------
@@ -553,7 +558,7 @@ def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
553 558
             msg = msg_or_type
554 559
         else:
555 560
             msg = self.msg(msg_or_type, content=content, parent=parent,
556  
-                           subheader=subheader, header=header)
  561
+                           header=header, metadata=metadata)
557 562
 
558 563
         buffers = [] if buffers is None else buffers
559 564
         to_send = self.serialize(msg, ident)
@@ -596,7 +601,7 @@ def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None):
596 601
             The ZMQ stream or socket to use for sending the message.
597 602
         msg_list : list
598 603
             The serialized list of messages to send. This only includes the
599  
-            [p_header,p_parent,p_content,buffer1,buffer2,...] portion of
  604
+            [p_header,p_parent,p_metadata,p_content,buffer1,buffer2,...] portion of
600 605
             the message.
601 606
         ident : ident or list
602 607
             A single ident or a list of idents to use in sending.
@@ -694,7 +699,7 @@ def unserialize(self, msg_list, content=True, copy=True):
694 699
         -----------
695 700
         msg_list : list of bytes or Message objects
696 701
             The list of message parts of the form [HMAC,p_header,p_parent,
697  
-            p_content,buffer1,buffer2,...].
  702
+            p_metadata,p_content,buffer1,buffer2,...].
698 703
         content : bool (True)
699 704
             Whether to unpack the content dict (True), or leave it packed
700 705
             (False).
@@ -708,7 +713,7 @@ def unserialize(self, msg_list, content=True, copy=True):
708 713
             The nested message dict with top-level keys [header, parent_header,
709 714
             content, buffers].
710 715
         """
711  
-        minlen = 4
  716
+        minlen = 5
712 717
         message = {}
713 718
         if not copy:
714 719
             for i in range(minlen):
@@ -720,9 +725,9 @@ def unserialize(self, msg_list, content=True, copy=True):
720 725
             if signature in self.digest_history:
721 726
                 raise ValueError("Duplicate Signature: %r"%signature)
722 727
             self.digest_history.add(signature)
723  
-            check = self.sign(msg_list[1:4])
  728
+            check = self.sign(msg_list[1:5])
724 729
             if not signature == check:
725  
-                raise ValueError("Invalid Signature: %r"%signature)
  730
+                raise ValueError("Invalid Signature: %r" % signature)
726 731
         if not len(msg_list) >= minlen:
727 732
             raise TypeError("malformed message, must have at least %i elements"%minlen)
728 733
         header = self.unpack(msg_list[1])
@@ -730,12 +735,13 @@ def unserialize(self, msg_list, content=True, copy=True):
730 735
         message['msg_id'] = header['msg_id']
731 736
         message['msg_type'] = header['msg_type']
732 737
         message['parent_header'] = self.unpack(msg_list[2])
  738
+        message['metadata'] = self.unpack(msg_list[3])
733 739
         if content:
734  
-            message['content'] = self.unpack(msg_list[3])
  740
+            message['content'] = self.unpack(msg_list[4])
735 741
         else:
736  
-            message['content'] = msg_list[3]
  742
+            message['content'] = msg_list[4]
737 743
 
738  
-        message['buffers'] = msg_list[4:]
  744
+        message['buffers'] = msg_list[5:]
739 745
         return message
740 746
 
741 747
 def test_msg2obj():
10  IPython/zmq/tests/test_session.py
@@ -47,10 +47,11 @@ class TestSession(SessionTestCase):
47 47
     def test_msg(self):
48 48
         """message format"""
49 49
         msg = self.session.msg('execute')
50  
-        thekeys = set('header parent_header content msg_type msg_id'.split())
  50
+        thekeys = set('header parent_header metadata content msg_type msg_id'.split())
51 51
         s = set(msg.keys())
52 52
         self.assertEqual(s, thekeys)
53 53
         self.assertTrue(isinstance(msg['content'],dict))
  54
+        self.assertTrue(isinstance(msg['metadata'],dict))
54 55
         self.assertTrue(isinstance(msg['header'],dict))
55 56
         self.assertTrue(isinstance(msg['parent_header'],dict))
56 57
         self.assertTrue(isinstance(msg['msg_id'],str))
@@ -69,6 +70,7 @@ def test_serialize(self):
69 70
         self.assertEqual(new_msg['header'],msg['header'])
70 71
         self.assertEqual(new_msg['content'],msg['content'])
71 72
         self.assertEqual(new_msg['parent_header'],msg['parent_header'])
  73
+        self.assertEqual(new_msg['metadata'],msg['metadata'])
72 74
         # ensure floats don't come out as Decimal:
73 75
         self.assertEqual(type(new_msg['content']['b']),type(new_msg['content']['b']))
74 76
 
@@ -85,6 +87,7 @@ def test_send(self):
85 87
         self.assertEqual(new_msg['header'],msg['header'])
86 88
         self.assertEqual(new_msg['content'],msg['content'])
87 89
         self.assertEqual(new_msg['parent_header'],msg['parent_header'])
  90
+        self.assertEqual(new_msg['metadata'],msg['metadata'])
88 91
         self.assertEqual(new_msg['buffers'],[b'bar'])
89 92
 
90 93
         socket.data = []
@@ -92,9 +95,10 @@ def test_send(self):
92 95
         content = msg['content']
93 96
         header = msg['header']
94 97
         parent = msg['parent_header']
  98
+        metadata = msg['metadata']
95 99
         msg_type = header['msg_type']
96 100
         self.session.send(socket, None, content=content, parent=parent,
97  
-            header=header, ident=b'foo', buffers=[b'bar'])
  101
+            header=header, metadata=metadata, ident=b'foo', buffers=[b'bar'])
98 102
         ident, msg_list = self.session.feed_identities(socket.data)
99 103
         new_msg = self.session.unserialize(msg_list)
100 104
         self.assertEqual(ident[0], b'foo')
@@ -102,6 +106,7 @@ def test_send(self):
102 106
         self.assertEqual(new_msg['msg_type'],msg['msg_type'])
103 107
         self.assertEqual(new_msg['header'],msg['header'])
104 108
         self.assertEqual(new_msg['content'],msg['content'])
  109
+        self.assertEqual(new_msg['metadata'],msg['metadata'])
105 110
         self.assertEqual(new_msg['parent_header'],msg['parent_header'])
106 111
         self.assertEqual(new_msg['buffers'],[b'bar'])
107 112
 
@@ -114,6 +119,7 @@ def test_send(self):
114 119
         self.assertEqual(new_msg['msg_type'],msg['msg_type'])
115 120
         self.assertEqual(new_msg['header'],msg['header'])
116 121
         self.assertEqual(new_msg['content'],msg['content'])
  122
+        self.assertEqual(new_msg['metadata'],msg['metadata'])
117 123
         self.assertEqual(new_msg['parent_header'],msg['parent_header'])
118 124
         self.assertEqual(new_msg['buffers'],[b'bar'])
119 125
 
6  docs/source/development/messaging.txt
@@ -81,7 +81,7 @@ representation of all the data, we can communicate with such clients.
81 81
 General Message Format
82 82
 ======================
83 83
 
84  
-A message is defined by the following three-dictionary structure::
  84
+A message is defined by the following four-dictionary structure::
85 85
 
86 86
     {
87 87
       # The message header contains a pair of unique identifiers for the
@@ -105,6 +105,9 @@ A message is defined by the following three-dictionary structure::
105 105
       # The actual content of the message must be a dict, whose structure
106 106
       # depends on the message type.
107 107
       'content' : dict,
  108
+      
  109
+      # Any metadata associated with the message.
  110
+      'metadata' : dict,
108 111
     }
109 112
 
110 113
    
@@ -127,6 +130,7 @@ messages upon deserialization to the following form for convenience::
127 130
       'msg_type' : str,
128 131
       'parent_header' : dict,
129 132
       'content' : dict,
  133
+      'metadata' : dict,
130 134
     }
131 135
 
132 136
 All messages sent to or received by any IPython process should have this
Commit_comment_tip

Tip: You can add notes to lines in a file. Hover to the left of a line to make a note

Something went wrong with that request. Please try again.