Skip to content
This repository

Sessionwork #579

Merged
merged 9 commits into from over 2 years ago

2 participants

Brian E. Granger Min RK
Brian E. Granger
Owner

This refactors and adds tests to IPython.zmq.session.Session. The tests can be run with nosetests IPython.zmq.tests.test_session.

  • Renamed the unpack_message method to unserialize.
  • Removed msg_id and msg_type from the top-level message dict (put into headers) throughout the code base.
  • Added tests.
  • Updated the docstrings for Session.
Min RK
Owner
minrk commented July 14, 2011

One thing you might do, since now you are doing many nested dict requests that weren't there before, is add a few msg_id = msg['header']['msg_id'], so you aren't asking for an element of an element of a dict more than you need to.

Brian E. Granger
Owner
added some commits July 13, 2011
Brian E. Granger Fixing code to assume msg_type and msg_id are top-level.
* I have gone through and looked for instances of ['msg_type'] and
  ['msg_id'] and tried to make sure that I added ['header'] so
  pull the values out of the header.
* But there are many cases where I can't tell if the dict is the
  full message or the header already. This is especially true
  of the msg_id in the parallel db parts of the code.
* Tests pass, but this is scary.
36064ac
Brian E. Granger Renaming unpack_message to unserialize and updating docstrings. 6ceaba5
Brian E. Granger Fixing docstrings and a few more places for msg_id/msg_type. 1d3fec8
Brian E. Granger Adding tests for zmq.session. 28a146d
Brian E. Granger More tests for Session.send/recv. 88dae27
Brian E. Granger Fixing messaging docs to reflect msg_type in header. 12aef93
Brian E. Granger Adding temp refs to msg_type to prevent nested dict gets. ff292f9
Brian E. Granger Fixing bug in ipkernel.py related to msg_type refactoring. c4eb828
Brian E. Granger
Owner

This branch should not be merged until after the 0.11 release.

Brian E. Granger
Owner

@minrk: with 0.11 out, can you review this to make sure it is ready to go into master. The htmlnotebook branch is based on this and I would to have this merged so I can follow master again.

Min RK
Owner

It hasn't changed since I last looked it over, and I just read it over again, so I think it should be fine. Merging this should probably trigger creating the 0.11 maintenance branch.

Brian E. Granger ellisonbg merged commit 260396c into from August 09, 2011
Brian E. Granger ellisonbg closed this August 09, 2011
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Showing 9 unique commits by 1 author.

Jul 14, 2011
Brian E. Granger Fixing code to assume msg_type and msg_id are top-level.
* I have gone through and looked for instances of ['msg_type'] and
  ['msg_id'] and tried to make sure that I added ['header'] so
  pull the values out of the header.
* But there are many cases where I can't tell if the dict is the
  full message or the header already. This is especially true
  of the msg_id in the parallel db parts of the code.
* Tests pass, but this is scary.
36064ac
Brian E. Granger Renaming unpack_message to unserialize and updating docstrings. 6ceaba5
Brian E. Granger Fixing docstrings and a few more places for msg_id/msg_type. 1d3fec8
Brian E. Granger Adding tests for zmq.session. 28a146d
Brian E. Granger More tests for Session.send/recv. 88dae27
Brian E. Granger Fixing messaging docs to reflect msg_type in header. 12aef93
Brian E. Granger Adding temp refs to msg_type to prevent nested dict gets. ff292f9
Brian E. Granger Fixing bug in ipkernel.py related to msg_type refactoring. c4eb828
Brian E. Granger Fixing another bug in msg_type refactoring. 260396c
This page is out of date. Refresh to see the latest.
2  IPython/frontend/qt/base_frontend_mixin.py
@@ -96,7 +96,7 @@ def _dispatch(self, msg):
96 96
         """ Calls the frontend handler associated with the message type of the 
97 97
             given message.
98 98
         """
99  
-        msg_type = msg['msg_type']
  99
+        msg_type = msg['header']['msg_type']
100 100
         handler = getattr(self, '_handle_' + msg_type, None)
101 101
         if handler:
102 102
             handler(msg)
6  IPython/frontend/qt/kernelmanager.py
@@ -66,7 +66,7 @@ def call_handlers(self, msg):
66 66
         self.message_received.emit(msg)
67 67
         
68 68
         # Emit signals for specialized message types.
69  
-        msg_type = msg['msg_type']
  69
+        msg_type = msg['header']['msg_type']
70 70
         signal = getattr(self, msg_type, None)
71 71
         if signal:
72 72
             signal.emit(msg)
@@ -122,7 +122,7 @@ def call_handlers(self, msg):
122 122
         # Emit the generic signal.
123 123
         self.message_received.emit(msg)
124 124
         # Emit signals for specialized message types.
125  
-        msg_type = msg['msg_type']
  125
+        msg_type = msg['header']['msg_type']
126 126
         signal = getattr(self, msg_type + '_received', None)
127 127
         if signal:
128 128
             signal.emit(msg)
@@ -155,7 +155,7 @@ def call_handlers(self, msg):
155 155
         self.message_received.emit(msg)
156 156
         
157 157
         # Emit signals for specialized message types.
158  
-        msg_type = msg['msg_type']
  158
+        msg_type = msg['header']['msg_type']
159 159
         if msg_type == 'input_request':
160 160
             self.input_requested.emit(msg)
161 161
 
8  IPython/parallel/client/client.py
@@ -670,7 +670,7 @@ def _flush_notifications(self):
670 670
         while msg is not None:
671 671
             if self.debug:
672 672
                 pprint(msg)
673  
-            msg_type = msg['msg_type']
  673
+            msg_type = msg['header']['msg_type']
674 674
             handler = self._notification_handlers.get(msg_type, None)
675 675
             if handler is None:
676 676
                 raise Exception("Unhandled message type: %s"%msg.msg_type)
@@ -684,7 +684,7 @@ def _flush_results(self, sock):
684 684
         while msg is not None:
685 685
             if self.debug:
686 686
                 pprint(msg)
687  
-            msg_type = msg['msg_type']
  687
+            msg_type = msg['header']['msg_type']
688 688
             handler = self._queue_handlers.get(msg_type, None)
689 689
             if handler is None:
690 690
                 raise Exception("Unhandled message type: %s"%msg.msg_type)
@@ -729,7 +729,7 @@ def _flush_iopub(self, sock):
729 729
             msg_id = parent['msg_id']
730 730
             content = msg['content']
731 731
             header = msg['header']
732  
-            msg_type = msg['msg_type']
  732
+            msg_type = msg['header']['msg_type']
733 733
             
734 734
             # init metadata:
735 735
             md = self.metadata[msg_id]
@@ -994,7 +994,7 @@ def send_apply_message(self, socket, f, args=None, kwargs=None, subheader=None,
994 994
         msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
995 995
                             subheader=subheader, track=track)
996 996
         
997  
-        msg_id = msg['msg_id']
  997
+        msg_id = msg['header']['msg_id']
998 998
         self.outstanding.add(msg_id)
999 999
         if ident:
1000 1000
             # possibly routed to a specific engine
4  IPython/parallel/client/view.py
@@ -523,7 +523,7 @@ def _really_apply(self, f, args=None, kwargs=None, targets=None, block=None, tra
523 523
                                     ident=ident)
524 524
             if track:
525 525
                 trackers.append(msg['tracker'])
526  
-            msg_ids.append(msg['msg_id'])
  526
+            msg_ids.append(msg['header']['msg_id'])
527 527
         tracker = None if track is False else zmq.MessageTracker(*trackers)
528 528
         ar = AsyncResult(self.client, msg_ids, fname=f.__name__, targets=targets, tracker=tracker)
529 529
         if block:
@@ -980,7 +980,7 @@ def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
980 980
                                 subheader=subheader)
981 981
         tracker = None if track is False else msg['tracker']
982 982
         
983  
-        ar = AsyncResult(self.client, msg['msg_id'], fname=f.__name__, targets=None, tracker=tracker)
  983
+        ar = AsyncResult(self.client, msg['header']['msg_id'], fname=f.__name__, targets=None, tracker=tracker)
984 984
         
985 985
         if block:
986 986
             try:
20  IPython/parallel/controller/hub.py
@@ -485,7 +485,7 @@ def dispatch_query(self, msg):
485 485
             return
486 486
         client_id = idents[0]
487 487
         try:
488  
-            msg = self.session.unpack_message(msg, content=True)
  488
+            msg = self.session.unserialize(msg, content=True)
489 489
         except Exception:
490 490
             content = error.wrap_exception()
491 491
             self.log.error("Bad Query Message: %r"%msg, exc_info=True)
@@ -494,7 +494,7 @@ def dispatch_query(self, msg):
494 494
             return
495 495
         # print client_id, header, parent, content
496 496
         #switch on message type:
497  
-        msg_type = msg['msg_type']
  497
+        msg_type = msg['header']['msg_type']
498 498
         self.log.info("client::client %r requested %r"%(client_id, msg_type))
499 499
         handler = self.query_handlers.get(msg_type, None)
500 500
         try:
@@ -550,7 +550,7 @@ def save_queue_request(self, idents, msg):
550 550
             return
551 551
         queue_id, client_id = idents[:2]
552 552
         try:
553  
-            msg = self.session.unpack_message(msg)
  553
+            msg = self.session.unserialize(msg)
554 554
         except Exception:
555 555
             self.log.error("queue::client %r sent invalid message to %r: %r"%(client_id, queue_id, msg), exc_info=True)
556 556
             return
@@ -597,7 +597,7 @@ def save_queue_result(self, idents, msg):
597 597
             
598 598
         client_id, queue_id = idents[:2]
599 599
         try:
600  
-            msg = self.session.unpack_message(msg)
  600
+            msg = self.session.unserialize(msg)
601 601
         except Exception:
602 602
             self.log.error("queue::engine %r sent invalid message to %r: %r"%(
603 603
                     queue_id,client_id, msg), exc_info=True)
@@ -647,7 +647,7 @@ def save_task_request(self, idents, msg):
647 647
         client_id = idents[0]
648 648
         
649 649
         try:
650  
-            msg = self.session.unpack_message(msg)
  650
+            msg = self.session.unserialize(msg)
651 651
         except Exception:
652 652
             self.log.error("task::client %r sent invalid task message: %r"%(
653 653
                     client_id, msg), exc_info=True)
@@ -697,7 +697,7 @@ def save_task_result(self, idents, msg):
697 697
         """save the result of a completed task."""
698 698
         client_id = idents[0]
699 699
         try:
700  
-            msg = self.session.unpack_message(msg)
  700
+            msg = self.session.unserialize(msg)
701 701
         except Exception:
702 702
             self.log.error("task::invalid task result message send to %r: %r"%(
703 703
                     client_id, msg), exc_info=True)
@@ -744,7 +744,7 @@ def save_task_result(self, idents, msg):
744 744
     
745 745
     def save_task_destination(self, idents, msg):
746 746
         try:
747  
-            msg = self.session.unpack_message(msg, content=True)
  747
+            msg = self.session.unserialize(msg, content=True)
748 748
         except Exception:
749 749
             self.log.error("task::invalid task tracking message", exc_info=True)
750 750
             return
@@ -781,7 +781,7 @@ def save_iopub_message(self, topics, msg):
781 781
         """save an iopub message into the db"""
782 782
         # print (topics)
783 783
         try:
784  
-            msg = self.session.unpack_message(msg, content=True)
  784
+            msg = self.session.unserialize(msg, content=True)
785 785
         except Exception:
786 786
             self.log.error("iopub::invalid IOPub message", exc_info=True)
787 787
             return
@@ -791,7 +791,7 @@ def save_iopub_message(self, topics, msg):
791 791
             self.log.error("iopub::invalid IOPub message: %r"%msg)
792 792
             return
793 793
         msg_id = parent['msg_id']
794  
-        msg_type = msg['msg_type']
  794
+        msg_type = msg['header']['msg_type']
795 795
         content = msg['content']
796 796
         
797 797
         # ensure msg_id is in db
@@ -1165,7 +1165,7 @@ def finish(reply):
1165 1165
                 msg = self.session.msg(header['msg_type'])
1166 1166
                 msg['content'] = rec['content']
1167 1167
                 msg['header'] = header
1168  
-                msg['msg_id'] = rec['msg_id']
  1168
+                msg['header']['msg_id'] = rec['msg_id']
1169 1169
                 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1170 1170
 
1171 1171
         finish(dict(status='ok'))
8  IPython/parallel/controller/scheduler.py
@@ -211,12 +211,12 @@ def dispatch_notification(self, msg):
211 211
             self.log.warn("task::Invalid Message: %r",msg)
212 212
             return
213 213
         try:
214  
-            msg = self.session.unpack_message(msg)
  214
+            msg = self.session.unserialize(msg)
215 215
         except ValueError:
216 216
             self.log.warn("task::Unauthorized message from: %r"%idents)
217 217
             return
218 218
         
219  
-        msg_type = msg['msg_type']
  219
+        msg_type = msg['header']['msg_type']
220 220
         
221 221
         handler = self._notification_handlers.get(msg_type, None)
222 222
         if handler is None:
@@ -307,7 +307,7 @@ def dispatch_submission(self, raw_msg):
307 307
         self.notifier_stream.flush()
308 308
         try:
309 309
             idents, msg = self.session.feed_identities(raw_msg, copy=False)
310  
-            msg = self.session.unpack_message(msg, content=False, copy=False)
  310
+            msg = self.session.unserialize(msg, content=False, copy=False)
311 311
         except Exception:
312 312
             self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
313 313
             return
@@ -515,7 +515,7 @@ def dispatch_result(self, raw_msg):
515 515
         """dispatch method for result replies"""
516 516
         try:
517 517
             idents,msg = self.session.feed_identities(raw_msg, copy=False)
518  
-            msg = self.session.unpack_message(msg, content=False, copy=False)
  518
+            msg = self.session.unserialize(msg, content=False, copy=False)
519 519
             engine = idents[0]
520 520
             try:
521 521
                 idx = self.targets.index(engine)
2  IPython/parallel/engine/engine.py
@@ -90,7 +90,7 @@ def complete_registration(self, msg):
90 90
         loop = self.loop
91 91
         identity = self.bident
92 92
         idents,msg = self.session.feed_identities(msg)
93  
-        msg = Message(self.session.unpack_message(msg))
  93
+        msg = Message(self.session.unserialize(msg))
94 94
         
95 95
         if msg.content.status == 'ok':
96 96
             self.id = int(msg.content.id)
10  IPython/parallel/engine/kernelstarter.py
@@ -40,11 +40,11 @@ def start(self):
40 40
     def dispatch_request(self, raw_msg):
41 41
         idents, msg = self.session.feed_identities()
42 42
         try:
43  
-            msg = self.session.unpack_message(msg, content=False)
  43
+            msg = self.session.unserialize(msg, content=False)
44 44
         except:
45 45
             print ("bad msg: %s"%msg)
46 46
         
47  
-        msgtype = msg['msg_type']
  47
+        msgtype = msg['header']['msg_type']
48 48
         handler = self.handlers.get(msgtype, None)
49 49
         if handler is None:
50 50
             self.downstream.send_multipart(raw_msg, copy=False)
@@ -54,11 +54,11 @@ def dispatch_request(self, raw_msg):
54 54
     def dispatch_reply(self, raw_msg):
55 55
         idents, msg = self.session.feed_identities()
56 56
         try:
57  
-            msg = self.session.unpack_message(msg, content=False)
  57
+            msg = self.session.unserialize(msg, content=False)
58 58
         except:
59 59
             print ("bad msg: %s"%msg)
60 60
         
61  
-        msgtype = msg['msg_type']
  61
+        msgtype = msg['header']['msg_type']
62 62
         handler = self.handlers.get(msgtype, None)
63 63
         if handler is None:
64 64
             self.upstream.send_multipart(raw_msg, copy=False)
@@ -227,4 +227,4 @@ def make_starter(up_addr, down_addr, *args, **kwargs):
227 227
     starter = KernelStarter(session, upstream, downstream, *args, **kwargs)
228 228
     starter.start()
229 229
     loop.start()
230  
-    
  230
+    
20  IPython/parallel/engine/streamkernel.py
@@ -150,7 +150,7 @@ def abort_queue(self, stream):
150 150
                 
151 151
             self.log.info("Aborting:")
152 152
             self.log.info(str(msg))
153  
-            msg_type = msg['msg_type']
  153
+            msg_type = msg['header']['msg_type']
154 154
             reply_type = msg_type.split('_')[0] + '_reply'
155 155
             # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
156 156
             # self.reply_socket.send(ident,zmq.SNDMORE)
@@ -195,7 +195,7 @@ def shutdown_request(self, stream, ident, parent):
195 195
     def dispatch_control(self, msg):
196 196
         idents,msg = self.session.feed_identities(msg, copy=False)
197 197
         try:
198  
-            msg = self.session.unpack_message(msg, content=True, copy=False)
  198
+            msg = self.session.unserialize(msg, content=True, copy=False)
199 199
         except:
200 200
             self.log.error("Invalid Message", exc_info=True)
201 201
             return
@@ -204,10 +204,11 @@ def dispatch_control(self, msg):
204 204
         
205 205
         header = msg['header']
206 206
         msg_id = header['msg_id']
207  
-        
208  
-        handler = self.control_handlers.get(msg['msg_type'], None)
  207
+        msg_type = header['msg_type']
  208
+
  209
+        handler = self.control_handlers.get(msg_type, None)
209 210
         if handler is None:
210  
-            self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type'])
  211
+            self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg_type)
211 212
         else:
212 213
             handler(self.control_stream, idents, msg)
213 214
     
@@ -373,7 +374,7 @@ def dispatch_queue(self, stream, msg):
373 374
         self.control_stream.flush()
374 375
         idents,msg = self.session.feed_identities(msg, copy=False)
375 376
         try:
376  
-            msg = self.session.unpack_message(msg, content=True, copy=False)
  377
+            msg = self.session.unserialize(msg, content=True, copy=False)
377 378
         except:
378 379
             self.log.error("Invalid Message", exc_info=True)
379 380
             return
@@ -383,17 +384,18 @@ def dispatch_queue(self, stream, msg):
383 384
         
384 385
         header = msg['header']
385 386
         msg_id = header['msg_id']
  387
+        msg_type = msg['header']['msg_type']
386 388
         if self.check_aborted(msg_id):
387 389
             self.aborted.remove(msg_id)
388 390
             # is it safe to assume a msg_id will not be resubmitted?
389  
-            reply_type = msg['msg_type'].split('_')[0] + '_reply'
  391
+            reply_type = msg_type.split('_')[0] + '_reply'
390 392
             status = {'status' : 'aborted'}
391 393
             reply_msg = self.session.send(stream, reply_type, subheader=status,
392 394
                         content=status, parent=msg, ident=idents)
393 395
             return
394  
-        handler = self.shell_handlers.get(msg['msg_type'], None)
  396
+        handler = self.shell_handlers.get(msg_type, None)
395 397
         if handler is None:
396  
-            self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type'])
  398
+            self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg_type)
397 399
         else:
398 400
             handler(stream, idents, msg)
399 401
     
5  IPython/parallel/tests/test_db.py
@@ -56,8 +56,9 @@ def load_records(self, n=1):
56 56
             msg = self.session.msg('apply_request', content=dict(a=5))
57 57
             msg['buffers'] = []
58 58
             rec = init_record(msg)
59  
-            msg_ids.append(msg['msg_id'])
60  
-            self.db.add_record(msg['msg_id'], rec)
  59
+            msg_id = msg['header']['msg_id']
  60
+            msg_ids.append(msg_id)
  61
+            self.db.add_record(msg_id, rec)
61 62
         return msg_ids
62 63
     
63 64
     def test_add_record(self):
10  IPython/zmq/ipkernel.py
@@ -124,7 +124,9 @@ def do_one_iteration(self):
124 124
         ident,msg = self.session.recv(self.shell_socket, zmq.NOBLOCK)
125 125
         if msg is None:
126 126
             return
127  
-        
  127
+
  128
+        msg_type = msg['header']['msg_type']
  129
+
128 130
         # This assert will raise in versions of zeromq 2.0.7 and lesser.
129 131
         # We now require 2.0.8 or above, so we can uncomment for safety.
130 132
         # print(ident,msg, file=sys.__stdout__)
@@ -133,11 +135,11 @@ def do_one_iteration(self):
133 135
         # Print some info about this message and leave a '--->' marker, so it's
134 136
         # easier to trace visually the message chain when debugging.  Each
135 137
         # handler prints its message at the end.
136  
-        self.log.debug('\n*** MESSAGE TYPE:'+str(msg['msg_type'])+'***')
  138
+        self.log.debug('\n*** MESSAGE TYPE:'+str(msg_type)+'***')
137 139
         self.log.debug('   Content: '+str(msg['content'])+'\n   --->\n   ')
138 140
 
139 141
         # Find and call actual handler for message
140  
-        handler = self.handlers.get(msg['msg_type'], None)
  142
+        handler = self.handlers.get(msg_type, None)
141 143
         if handler is None:
142 144
             self.log.error("UNKNOWN MESSAGE TYPE:" +str(msg))
143 145
         else:
@@ -375,7 +377,7 @@ def _abort_queue(self):
375 377
                        "Unexpected missing message part."
376 378
 
377 379
             self.log.debug("Aborting:\n"+str(Message(msg)))
378  
-            msg_type = msg['msg_type']
  380
+            msg_type = msg['header']['msg_type']
379 381
             reply_type = msg_type.split('_')[0] + '_reply'
380 382
             reply_msg = self.session.send(self.shell_socket, reply_type,
381 383
                     {'status' : 'aborted'}, msg, ident=ident)
2  IPython/zmq/pykernel.py
@@ -190,7 +190,7 @@ def _abort_queue(self):
190 190
             else:
191 191
                 assert ident is not None, "Missing message part."
192 192
             self.log.debug("Aborting: %s"%Message(msg))
193  
-            msg_type = msg['msg_type']
  193
+            msg_type = msg['header']['msg_type']
194 194
             reply_type = msg_type.split('_')[0] + '_reply'
195 195
             reply_msg = self.session.send(self.shell_socket, reply_type, {'status':'aborted'}, msg, ident=ident)
196 196
             self.log.debug(Message(reply_msg))
108  IPython/zmq/session.py
@@ -244,7 +244,7 @@ def _unpacker_changed(self, name, old, new):
244 244
     def _session_default(self):
245 245
         return bytes(uuid.uuid4())
246 246
     
247  
-    username = Unicode(os.environ.get('USER','username'), config=True,
  247
+    username = Unicode(os.environ.get('USER',u'username'), config=True,
248 248
         help="""Username for the Session. Default is your system username.""")
249 249
     
250 250
     # message signature related traits:
@@ -350,18 +350,16 @@ def _check_packers(self):
350 350
     def msg_header(self, msg_type):
351 351
         return msg_header(self.msg_id, msg_type, self.username, self.session)
352 352
 
353  
-    def msg(self, msg_type, content=None, parent=None, subheader=None):
  353
+    def msg(self, msg_type, content=None, parent=None, subheader=None, header=None):
354 354
         """Return the nested message dict.
355 355
 
356 356
         This format is different from what is sent over the wire. The
357  
-        self.serialize method converts this nested message dict to the wire
358  
-        format, which uses a message list.
  357
+        serialize/unserialize methods converts this nested message dict to the wire
  358
+        format, which is a list of message parts.
359 359
         """
360 360
         msg = {}
361  
-        msg['header'] = self.msg_header(msg_type)
362  
-        msg['msg_id'] = msg['header']['msg_id']
  361
+        msg['header'] = self.msg_header(msg_type) if header is None else header
363 362
         msg['parent_header'] = {} if parent is None else extract_header(parent)
364  
-        msg['msg_type'] = msg_type
365 363
         msg['content'] = {} if content is None else content
366 364
         sub = {} if subheader is None else subheader
367 365
         msg['header'].update(sub)
@@ -385,6 +383,10 @@ def sign(self, msg_list):
385 383
     def serialize(self, msg, ident=None):
386 384
         """Serialize the message components to bytes.
387 385
 
  386
+        This is roughly the inverse of unserialize. The serialize/unserialize
  387
+        methods work with full message lists, whereas pack/unpack work with
  388
+        the individual message parts in the message list.
  389
+
388 390
         Parameters
389 391
         ----------
390 392
         msg : dict or Message
@@ -434,8 +436,8 @@ def serialize(self, msg, ident=None):
434 436
 
435 437
         return to_send
436 438
         
437  
-    def send(self, stream, msg_or_type, content=None, parent=None, ident=None, 
438  
-                                    buffers=None, subheader=None, track=False):
  439
+    def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
  440
+             buffers=None, subheader=None, track=False, header=None):
439 441
         """Build and send a message via stream or socket.
440 442
 
441 443
         The message format used by this function internally is as follows:
@@ -443,37 +445,42 @@ def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
443 445
         [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
444 446
          buffer1,buffer2,...] 
445 447
 
446  
-        The self.serialize method converts the nested message dict into this
  448
+        The serialize/unserialize methods convert the nested message dict into this
447 449
         format.
448 450
 
449 451
         Parameters
450 452
         ----------
451 453
         
452 454
         stream : zmq.Socket or ZMQStream
453  
-            the socket-like object used to send the data
  455
+            The socket-like object used to send the data.
454 456
         msg_or_type : str or Message/dict
455 457
             Normally, msg_or_type will be a msg_type unless a message is being 
456  
-            sent more than once.
  458
+            sent more than once. If a header is supplied, this can be set to
  459
+            None and the msg_type will be pulled from the header.
457 460
         
458 461
         content : dict or None
459  
-            the content of the message (ignored if msg_or_type is a message)
  462
+            The content of the message (ignored if msg_or_type is a message).
  463
+        header : dict or None
  464
+            The header dict for the message (ignores if msg_to_type is a message).
460 465
         parent : Message or dict or None
461  
-            the parent or parent header describing the parent of this message
  466
+            The parent or parent header describing the parent of this message
  467
+            (ignored if msg_or_type is a message).
462 468
         ident : bytes or list of bytes
463  
-            the zmq.IDENTITY routing path
  469
+            The zmq.IDENTITY routing path.
464 470
         subheader : dict or None
465  
-            extra header keys for this message's header
  471
+            Extra header keys for this message's header (ignored if msg_or_type
  472
+            is a message).
466 473
         buffers : list or None
467  
-            the already-serialized buffers to be appended to the message
  474
+            The already-serialized buffers to be appended to the message.
468 475
         track : bool
469  
-            whether to track.  Only for use with Sockets, 
470  
-            because ZMQStream objects cannot track messages.
  476
+            Whether to track.  Only for use with Sockets, because ZMQStream
  477
+            objects cannot track messages.
471 478
         
472 479
         Returns
473 480
         -------
474  
-        msg : message dict
475  
-            the constructed message
476  
-        (msg,tracker) : (message dict, MessageTracker)
  481
+        msg : dict
  482
+            The constructed message.
  483
+        (msg,tracker) : (dict, MessageTracker)
477 484
             if track=True, then a 2-tuple will be returned, 
478 485
             the first element being the constructed
479 486
             message, and the second being the MessageTracker
@@ -486,12 +493,13 @@ def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
486 493
             raise TypeError("ZMQStream cannot track messages")
487 494
         
488 495
         if isinstance(msg_or_type, (Message, dict)):
489  
-            # we got a Message, not a msg_type
490  
-            # don't build a new Message
  496
+            # We got a Message or message dict, not a msg_type so don't
  497
+            # build a new Message.
491 498
             msg = msg_or_type
492 499
         else:
493  
-            msg = self.msg(msg_or_type, content, parent, subheader)
494  
-        
  500
+            msg = self.msg(msg_or_type, content=content, parent=parent, 
  501
+                           subheader=subheader, header=header)
  502
+
495 503
         buffers = [] if buffers is None else buffers
496 504
         to_send = self.serialize(msg, ident)
497 505
         flag = 0
@@ -521,7 +529,7 @@ def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
521 529
         msg['tracker'] = tracker
522 530
         
523 531
         return msg
524  
-    
  532
+
525 533
     def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None):
526 534
         """Send a raw message via ident path.
527 535
 
@@ -543,7 +551,7 @@ def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None):
543 551
             ident = [ident]
544 552
         if ident is not None:
545 553
             to_send.extend(ident)
546  
-            
  554
+
547 555
         to_send.append(DELIM)
548 556
         to_send.append(self.sign(msg_list))
549 557
         to_send.extend(msg_list)
@@ -578,7 +586,7 @@ def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
578 586
         # invalid large messages can cause very expensive string comparisons
579 587
         idents, msg_list = self.feed_identities(msg_list, copy)
580 588
         try:
581  
-            return idents, self.unpack_message(msg_list, content=content, copy=copy)
  589
+            return idents, self.unserialize(msg_list, content=content, copy=copy)
582 590
         except Exception as e:
583 591
             print (idents, msg_list)
584 592
             # TODO: handle it
@@ -600,10 +608,12 @@ def feed_identities(self, msg_list, copy=True):
600 608
         
601 609
         Returns
602 610
         -------
603  
-        (idents,msg_list) : two lists
604  
-            idents will always be a list of bytes - the indentity prefix
605  
-            msg_list will be a list of bytes or Messages, unchanged from input
606  
-            msg_list should be unpackable via self.unpack_message at this point.
  611
+        (idents, msg_list) : two lists
  612
+            idents will always be a list of bytes, each of which is a ZMQ
  613
+            identity. msg_list will be a list of bytes or zmq.Messages of the
  614
+            form [HMAC,p_header,p_parent,p_content,buffer1,buffer2,...] and
  615
+            should be unpackable/unserializable via self.unserialize at this
  616
+            point.
607 617
         """
608 618
         if copy:
609 619
             idx = msg_list.index(DELIM)
@@ -619,21 +629,30 @@ def feed_identities(self, msg_list, copy=True):
619 629
             idents, msg_list = msg_list[:idx], msg_list[idx+1:]
620 630
             return [m.bytes for m in idents], msg_list
621 631
     
622  
-    def unpack_message(self, msg_list, content=True, copy=True):
623  
-        """Return a message object from the format
624  
-        sent by self.send.
625  
-        
  632
+    def unserialize(self, msg_list, content=True, copy=True):
  633
+        """Unserialize a msg_list to a nested message dict.
  634
+
  635
+        This is roughly the inverse of serialize. The serialize/unserialize
  636
+        methods work with full message lists, whereas pack/unpack work with
  637
+        the individual message parts in the message list.
  638
+
626 639
         Parameters:
627 640
         -----------
628  
-        
  641
+        msg_list : list of bytes or Message objects
  642
+            The list of message parts of the form [HMAC,p_header,p_parent,
  643
+            p_content,buffer1,buffer2,...].
629 644
         content : bool (True)
630  
-            whether to unpack the content dict (True), 
631  
-            or leave it serialized (False)
632  
-        
  645
+            Whether to unpack the content dict (True), or leave it packed
  646
+            (False).
633 647
         copy : bool (True)
634  
-            whether to return the bytes (True), 
635  
-            or the non-copying Message object in each place (False)
636  
-        
  648
+            Whether to return the bytes (True), or the non-copying Message
  649
+            object in each place (False).
  650
+
  651
+        Returns
  652
+        -------
  653
+        msg : dict
  654
+            The nested message dict with top-level keys [header, parent_header,
  655
+            content, buffers].
637 656
         """
638 657
         minlen = 4
639 658
         message = {}
@@ -651,7 +670,6 @@ def unpack_message(self, msg_list, content=True, copy=True):
651 670
         if not len(msg_list) >= minlen:
652 671
             raise TypeError("malformed message, must have at least %i elements"%minlen)
653 672
         message['header'] = self.unpack(msg_list[1])
654  
-        message['msg_type'] = message['header']['msg_type']
655 673
         message['parent_header'] = self.unpack(msg_list[2])
656 674
         if content:
657 675
             message['content'] = self.unpack(msg_list[3])
84  IPython/zmq/tests/test_session.py
@@ -26,37 +26,102 @@ def setUp(self):
26 26
         BaseZMQTestCase.setUp(self)
27 27
         self.session = ss.Session()
28 28
 
  29
+
  30
+class MockSocket(zmq.Socket):
  31
+
  32
+    def __init__(self, *args, **kwargs):
  33
+        super(MockSocket,self).__init__(*args,**kwargs)
  34
+        self.data = []
  35
+
  36
+    def send_multipart(self, msgparts, *args, **kwargs):
  37
+        self.data.extend(msgparts)
  38
+
  39
+    def send(self, part, *args, **kwargs):
  40
+        self.data.append(part)
  41
+
  42
+    def recv_multipart(self, *args, **kwargs):
  43
+        return self.data
  44
+
29 45
 class TestSession(SessionTestCase):
30 46
     
31 47
     def test_msg(self):
32 48
         """message format"""
33 49
         msg = self.session.msg('execute')
34  
-        thekeys = set('header msg_id parent_header msg_type content'.split())
  50
+        thekeys = set('header parent_header content'.split())
35 51
         s = set(msg.keys())
36 52
         self.assertEquals(s, thekeys)
37 53
         self.assertTrue(isinstance(msg['content'],dict))
38 54
         self.assertTrue(isinstance(msg['header'],dict))
39 55
         self.assertTrue(isinstance(msg['parent_header'],dict))
40  
-        self.assertEquals(msg['msg_type'], 'execute')
41  
-        
42  
-            
43  
-    
  56
+        self.assertEquals(msg['header']['msg_type'], 'execute')
  57
+
  58
+    def test_serialize(self):
  59
+        msg = self.session.msg('execute',content=dict(a=10))
  60
+        msg_list = self.session.serialize(msg, ident=b'foo')
  61
+        ident, msg_list = self.session.feed_identities(msg_list)
  62
+        new_msg = self.session.unserialize(msg_list)
  63
+        self.assertEquals(ident[0], b'foo')
  64
+        self.assertEquals(new_msg['header'],msg['header'])
  65
+        self.assertEquals(new_msg['content'],msg['content'])
  66
+        self.assertEquals(new_msg['parent_header'],msg['parent_header'])
  67
+
  68
+    def test_send(self):
  69
+        socket = MockSocket(zmq.Context.instance(),zmq.PAIR)
  70
+
  71
+        msg = self.session.msg('execute', content=dict(a=10))
  72
+        self.session.send(socket, msg, ident=b'foo', buffers=[b'bar'])
  73
+        ident, msg_list = self.session.feed_identities(socket.data)
  74
+        new_msg = self.session.unserialize(msg_list)
  75
+        self.assertEquals(ident[0], b'foo')
  76
+        self.assertEquals(new_msg['header'],msg['header'])
  77
+        self.assertEquals(new_msg['content'],msg['content'])
  78
+        self.assertEquals(new_msg['parent_header'],msg['parent_header'])        
  79
+        self.assertEquals(new_msg['buffers'],[b'bar'])
  80
+
  81
+        socket.data = []
  82
+
  83
+        content = msg['content']
  84
+        header = msg['header']
  85
+        parent = msg['parent_header']
  86
+        msg_type = header['msg_type']
  87
+        self.session.send(socket, None, content=content, parent=parent,
  88
+            header=header, ident=b'foo', buffers=[b'bar'])
  89
+        ident, msg_list = self.session.feed_identities(socket.data)
  90
+        new_msg = self.session.unserialize(msg_list)
  91
+        self.assertEquals(ident[0], b'foo')
  92
+        self.assertEquals(new_msg['header'],msg['header'])
  93
+        self.assertEquals(new_msg['content'],msg['content'])
  94
+        self.assertEquals(new_msg['parent_header'],msg['parent_header'])        
  95
+        self.assertEquals(new_msg['buffers'],[b'bar'])
  96
+
  97
+        socket.data = []
  98
+
  99
+        self.session.send(socket, msg, ident=b'foo', buffers=[b'bar'])
  100
+        ident, new_msg = self.session.recv(socket)
  101
+        self.assertEquals(ident[0], b'foo')
  102
+        self.assertEquals(new_msg['header'],msg['header'])
  103
+        self.assertEquals(new_msg['content'],msg['content'])
  104
+        self.assertEquals(new_msg['parent_header'],msg['parent_header'])        
  105
+        self.assertEquals(new_msg['buffers'],[b'bar'])
  106
+
  107
+        socket.close()
  108
+
44 109
     def test_args(self):
45 110
         """initialization arguments for Session"""
46 111
         s = self.session
47 112
         self.assertTrue(s.pack is ss.default_packer)
48 113
         self.assertTrue(s.unpack is ss.default_unpacker)
49  
-        self.assertEquals(s.username, os.environ.get('USER', 'username'))
  114
+        self.assertEquals(s.username, os.environ.get('USER', u'username'))
50 115
         
51 116
         s = ss.Session()
52  
-        self.assertEquals(s.username, os.environ.get('USER', 'username'))
  117
+        self.assertEquals(s.username, os.environ.get('USER', u'username'))
53 118
         
54 119
         self.assertRaises(TypeError, ss.Session, pack='hi')
55 120
         self.assertRaises(TypeError, ss.Session, unpack='hi')
56 121
         u = str(uuid.uuid4())
57  
-        s = ss.Session(username='carrot', session=u)
  122
+        s = ss.Session(username=u'carrot', session=u)
58 123
         self.assertEquals(s.session, u)
59  
-        self.assertEquals(s.username, 'carrot')
  124
+        self.assertEquals(s.username, u'carrot')
60 125
         
61 126
     def test_tracking(self):
62 127
         """test tracking messages"""
@@ -109,3 +174,4 @@ def test_feed_identities(self):
109 174
         content = dict(code='whoda',stuff=object())
110 175
         themsg = self.session.msg('execute',content=content)
111 176
         pmsg = theids
  177
+
12  docs/source/development/messaging.txt
@@ -101,18 +101,18 @@ generic structure::
101 101
       # collaborative settings where multiple users may be interacting with the
102 102
       # same kernel simultaneously, so that frontends can label the various
103 103
       # messages in a meaningful way.
104  
-      'header' : { 'msg_id' : uuid,
105  
-                   'username' : str,
106  
-           'session' : uuid
  104
+      'header' : {
  105
+                    'msg_id' : uuid,
  106
+                    'username' : str,
  107
+                    'session' : uuid
  108
+                    # All recognized message type strings are listed below.
  109
+                    'msg_type' : str,
107 110
          },
108 111
 
109 112
       # In a chain of messages, the header from the parent is copied so that
110 113
       # clients can track where messages come from.
111 114
       'parent_header' : dict,
112 115
 
113  
-      # All recognized message type strings are listed below.
114  
-      'msg_type' : str,
115  
-
116 116
       # The actual content of the message must be a dict, whose structure
117 117
       # depends on the message type.x
118 118
       'content' : dict,
Commit_comment_tip

Tip: You can add notes to lines in a file. Hover to the left of a line to make a note

Something went wrong with that request. Please try again.