Skip to content

Commit

Permalink
Fixing code to assume msg_type and msg_id are top-level.
Browse files Browse the repository at this point in the history
* I have gone through and looked for instances of ['msg_type'] and
  ['msg_id'] and tried to make sure that I added ['header'] so
  pull the values out of the header.
* But there are many cases where I can't tell if the dict is the
  full message or the header already. This is especially true
  of the msg_id in the parallel db parts of the code.
* Tests pass, but this is scary.
  • Loading branch information
ellisonbg committed Jul 21, 2011
1 parent bbe178a commit 1bc3aac
Show file tree
Hide file tree
Showing 13 changed files with 29 additions and 34 deletions.
2 changes: 1 addition & 1 deletion IPython/frontend/qt/base_frontend_mixin.py
Expand Up @@ -96,7 +96,7 @@ def _dispatch(self, msg):
""" Calls the frontend handler associated with the message type of the
given message.
"""
msg_type = msg['msg_type']
msg_type = msg['header']['msg_type']
handler = getattr(self, '_handle_' + msg_type, None)
if handler:
handler(msg)
Expand Down
6 changes: 3 additions & 3 deletions IPython/frontend/qt/kernelmanager.py
Expand Up @@ -66,7 +66,7 @@ def call_handlers(self, msg):
self.message_received.emit(msg)

# Emit signals for specialized message types.
msg_type = msg['msg_type']
msg_type = msg['header']['msg_type']
signal = getattr(self, msg_type, None)
if signal:
signal.emit(msg)
Expand Down Expand Up @@ -122,7 +122,7 @@ def call_handlers(self, msg):
# Emit the generic signal.
self.message_received.emit(msg)
# Emit signals for specialized message types.
msg_type = msg['msg_type']
msg_type = msg['header']['msg_type']
signal = getattr(self, msg_type + '_received', None)
if signal:
signal.emit(msg)
Expand Down Expand Up @@ -155,7 +155,7 @@ def call_handlers(self, msg):
self.message_received.emit(msg)

# Emit signals for specialized message types.
msg_type = msg['msg_type']
msg_type = msg['header']['msg_type']
if msg_type == 'input_request':
self.input_requested.emit(msg)

Expand Down
8 changes: 4 additions & 4 deletions IPython/parallel/client/client.py
Expand Up @@ -670,7 +670,7 @@ def _flush_notifications(self):
while msg is not None:
if self.debug:
pprint(msg)
msg_type = msg['msg_type']
msg_type = msg['header']['msg_type']
handler = self._notification_handlers.get(msg_type, None)
if handler is None:
raise Exception("Unhandled message type: %s"%msg.msg_type)
Expand All @@ -684,7 +684,7 @@ def _flush_results(self, sock):
while msg is not None:
if self.debug:
pprint(msg)
msg_type = msg['msg_type']
msg_type = msg['header']['msg_type']
handler = self._queue_handlers.get(msg_type, None)
if handler is None:
raise Exception("Unhandled message type: %s"%msg.msg_type)
Expand Down Expand Up @@ -729,7 +729,7 @@ def _flush_iopub(self, sock):
msg_id = parent['msg_id']
content = msg['content']
header = msg['header']
msg_type = msg['msg_type']
msg_type = msg['header']['msg_type']

# init metadata:
md = self.metadata[msg_id]
Expand Down Expand Up @@ -994,7 +994,7 @@ def send_apply_message(self, socket, f, args=None, kwargs=None, subheader=None,
msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
subheader=subheader, track=track)

msg_id = msg['msg_id']
msg_id = msg['header']['msg_id']
self.outstanding.add(msg_id)
if ident:
# possibly routed to a specific engine
Expand Down
4 changes: 2 additions & 2 deletions IPython/parallel/client/view.py
Expand Up @@ -523,7 +523,7 @@ def _really_apply(self, f, args=None, kwargs=None, targets=None, block=None, tra
ident=ident)
if track:
trackers.append(msg['tracker'])
msg_ids.append(msg['msg_id'])
msg_ids.append(msg['header']['msg_id'])
tracker = None if track is False else zmq.MessageTracker(*trackers)
ar = AsyncResult(self.client, msg_ids, fname=f.__name__, targets=targets, tracker=tracker)
if block:
Expand Down Expand Up @@ -980,7 +980,7 @@ def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
subheader=subheader)
tracker = None if track is False else msg['tracker']

ar = AsyncResult(self.client, msg['msg_id'], fname=f.__name__, targets=None, tracker=tracker)
ar = AsyncResult(self.client, msg['header']['msg_id'], fname=f.__name__, targets=None, tracker=tracker)

if block:
try:
Expand Down
4 changes: 2 additions & 2 deletions IPython/parallel/controller/hub.py
Expand Up @@ -494,7 +494,7 @@ def dispatch_query(self, msg):
return
# print client_id, header, parent, content
#switch on message type:
msg_type = msg['msg_type']
msg_type = msg['header']['msg_type']
self.log.info("client::client %r requested %r"%(client_id, msg_type))
handler = self.query_handlers.get(msg_type, None)
try:
Expand Down Expand Up @@ -791,7 +791,7 @@ def save_iopub_message(self, topics, msg):
self.log.error("iopub::invalid IOPub message: %r"%msg)
return
msg_id = parent['msg_id']
msg_type = msg['msg_type']
msg_type = msg['header']['msg_type']
content = msg['content']

# ensure msg_id is in db
Expand Down
2 changes: 1 addition & 1 deletion IPython/parallel/controller/scheduler.py
Expand Up @@ -216,7 +216,7 @@ def dispatch_notification(self, msg):
self.log.warn("task::Unauthorized message from: %r"%idents)
return

msg_type = msg['msg_type']
msg_type = msg['header']['msg_type']

handler = self._notification_handlers.get(msg_type, None)
if handler is None:
Expand Down
6 changes: 3 additions & 3 deletions IPython/parallel/engine/kernelstarter.py
Expand Up @@ -44,7 +44,7 @@ def dispatch_request(self, raw_msg):
except:
print ("bad msg: %s"%msg)

msgtype = msg['msg_type']
msgtype = msg['header']['msg_type']
handler = self.handlers.get(msgtype, None)
if handler is None:
self.downstream.send_multipart(raw_msg, copy=False)
Expand All @@ -58,7 +58,7 @@ def dispatch_reply(self, raw_msg):
except:
print ("bad msg: %s"%msg)

msgtype = msg['msg_type']
msgtype = msg['header']['msg_type']
handler = self.handlers.get(msgtype, None)
if handler is None:
self.upstream.send_multipart(raw_msg, copy=False)
Expand Down Expand Up @@ -227,4 +227,4 @@ def make_starter(up_addr, down_addr, *args, **kwargs):
starter = KernelStarter(session, upstream, downstream, *args, **kwargs)
starter.start()
loop.start()


12 changes: 6 additions & 6 deletions IPython/parallel/engine/streamkernel.py
Expand Up @@ -150,7 +150,7 @@ def abort_queue(self, stream):

self.log.info("Aborting:")
self.log.info(str(msg))
msg_type = msg['msg_type']
msg_type = msg['header']['msg_type']
reply_type = msg_type.split('_')[0] + '_reply'
# reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
# self.reply_socket.send(ident,zmq.SNDMORE)
Expand Down Expand Up @@ -205,9 +205,9 @@ def dispatch_control(self, msg):
header = msg['header']
msg_id = header['msg_id']

handler = self.control_handlers.get(msg['msg_type'], None)
handler = self.control_handlers.get(msg['header']['msg_type'], None)
if handler is None:
self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type'])
self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['header']['msg_type'])
else:
handler(self.control_stream, idents, msg)

Expand Down Expand Up @@ -386,14 +386,14 @@ def dispatch_queue(self, stream, msg):
if self.check_aborted(msg_id):
self.aborted.remove(msg_id)
# is it safe to assume a msg_id will not be resubmitted?
reply_type = msg['msg_type'].split('_')[0] + '_reply'
reply_type = msg['header']['msg_type'].split('_')[0] + '_reply'
status = {'status' : 'aborted'}
reply_msg = self.session.send(stream, reply_type, subheader=status,
content=status, parent=msg, ident=idents)
return
handler = self.shell_handlers.get(msg['msg_type'], None)
handler = self.shell_handlers.get(msg['header']['msg_type'], None)
if handler is None:
self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type'])
self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg['header']['msg_type'])
else:
handler(stream, idents, msg)

Expand Down
4 changes: 2 additions & 2 deletions IPython/parallel/tests/test_db.py
Expand Up @@ -56,8 +56,8 @@ def load_records(self, n=1):
msg = self.session.msg('apply_request', content=dict(a=5))
msg['buffers'] = []
rec = init_record(msg)
msg_ids.append(msg['msg_id'])
self.db.add_record(msg['msg_id'], rec)
msg_ids.append(msg['header']['msg_id'])
self.db.add_record(msg['header']['msg_id'], rec)
return msg_ids

def test_add_record(self):
Expand Down
6 changes: 3 additions & 3 deletions IPython/zmq/ipkernel.py
Expand Up @@ -133,11 +133,11 @@ def do_one_iteration(self):
# Print some info about this message and leave a '--->' marker, so it's
# easier to trace visually the message chain when debugging. Each
# handler prints its message at the end.
self.log.debug('\n*** MESSAGE TYPE:'+str(msg['msg_type'])+'***')
self.log.debug('\n*** MESSAGE TYPE:'+str(msg['header']['msg_type'])+'***')
self.log.debug(' Content: '+str(msg['content'])+'\n --->\n ')

# Find and call actual handler for message
handler = self.handlers.get(msg['msg_type'], None)
handler = self.handlers.get(msg['header']['msg_type'], None)
if handler is None:
self.log.error("UNKNOWN MESSAGE TYPE:" +str(msg))
else:
Expand Down Expand Up @@ -375,7 +375,7 @@ def _abort_queue(self):
"Unexpected missing message part."

self.log.debug("Aborting:\n"+str(Message(msg)))
msg_type = msg['msg_type']
msg_type = msg['header']['msg_type']
reply_type = msg_type.split('_')[0] + '_reply'
reply_msg = self.session.send(self.shell_socket, reply_type,
{'status' : 'aborted'}, msg, ident=ident)
Expand Down
2 changes: 1 addition & 1 deletion IPython/zmq/pykernel.py
Expand Up @@ -190,7 +190,7 @@ def _abort_queue(self):
else:
assert ident is not None, "Missing message part."
self.log.debug("Aborting: %s"%Message(msg))
msg_type = msg['msg_type']
msg_type = msg['header']['msg_type']
reply_type = msg_type.split('_')[0] + '_reply'
reply_msg = self.session.send(self.shell_socket, reply_type, {'status':'aborted'}, msg, ident=ident)
self.log.debug(Message(reply_msg))
Expand Down
3 changes: 0 additions & 3 deletions IPython/zmq/session.py
Expand Up @@ -359,9 +359,7 @@ def msg(self, msg_type, content=None, parent=None, subheader=None):
"""
msg = {}
msg['header'] = self.msg_header(msg_type)
msg['msg_id'] = msg['header']['msg_id']
msg['parent_header'] = {} if parent is None else extract_header(parent)
msg['msg_type'] = msg_type
msg['content'] = {} if content is None else content
sub = {} if subheader is None else subheader
msg['header'].update(sub)
Expand Down Expand Up @@ -651,7 +649,6 @@ def unpack_message(self, msg_list, content=True, copy=True):
if not len(msg_list) >= minlen:
raise TypeError("malformed message, must have at least %i elements"%minlen)
message['header'] = self.unpack(msg_list[1])
message['msg_type'] = message['header']['msg_type']
message['parent_header'] = self.unpack(msg_list[2])
if content:
message['content'] = self.unpack(msg_list[3])
Expand Down
4 changes: 1 addition & 3 deletions IPython/zmq/tests/test_session.py
Expand Up @@ -37,10 +37,8 @@ def test_msg(self):
self.assertTrue(isinstance(msg['content'],dict))
self.assertTrue(isinstance(msg['header'],dict))
self.assertTrue(isinstance(msg['parent_header'],dict))
self.assertEquals(msg['msg_type'], 'execute')
self.assertEquals(msg['header']['msg_type'], 'execute')



def test_args(self):
"""initialization arguments for Session"""
s = self.session
Expand Down

0 comments on commit 1bc3aac

Please sign in to comment.