Skip to content
This repository

simplify IPython.parallel connections and enable Controller Resume #1471

Merged
merged 6 commits into from almost 2 years ago

2 participants

Min RK Fernando Perez
Min RK
Owner
minrk commented

Rolls back two-stage connection info, putting more information into the connection files. This makes it easier to use hand-crafted ssh tunnels, as all ports are read from the file, rather than from the reply to registration/connection requests.

It is no longer possible to connect to the Controller without a connection file.

Adding the serialization method to the connection file also makes it harder for custom serialization to result in a mismatch in configuration between the various objects.

Min RK
Owner
minrk commented

Sample connection file before change:

{
  "url": "tcp://127.0.0.1:56224", 
  "exec_key": "e257a5bb-154f-41ee-a8aa-c51cea42b54a", 
  "ssh": "", 
  "location": "10.71.0.113"
}

and after:

{
  "control": 62418, 
  "task": 62424, 
  "hb_ping": 62421, 
  "exec_key": "fde6ebb4-8310-4fa1-b97a-dfacbbdaa2e7", 
  "mux": 62420, 
  "hb_pong": 62422, 
  "ssh": "", 
  "registration": 62415, 
  "interface": "tcp://127.0.0.1", 
  "iopub": 62426, 
  "pack": "msgpack.packb", 
  "unpack": "msgpack.unpackb", 
  "location": "192.168.1.188"
}
Min RK
Owner
minrk commented

This PR now contains a simple first implementation of controller-resume. Engine state is stored to JSON on register/unregister, and loaded on startup if the --restore flag is given.

This should behave as expected as long as the cluster is idle, but is untested for resuming the Controller during active execution.

Min RK
Owner
minrk commented

This enables resume of controller, but also removes explicit identities from the Client.

We need to make a decision on the explicit client identities, because there is a reason to go either way:

  1. in libzmq-2.1, setting an explicit identity results in a 'durable socket', which means that the Schedulers act on the assumption that clients never go away, and queue up replies to disconnected clients in memory. For users doing 'fire&forget' task submission, this will result in an apparent memory leak in the Schedulers.
  2. The durable socket side-effect of explicit identities is removed from libzmq-3.1.
  3. removing explicit identities means that client sockets will get new identities when using Controller resume introduced here, breaking the propagation of results of tasks submitted prior to controller crash.

So we have two principal choices:

  1. Continue to use explicit identities, and just state that the apparent memory leak is fixed by upgrading to libzmq-3.1.
  2. Remove explicit identities, and break client-replies across controller restart.

I think 2. makes the most sense, because fire&forget seems more important than preserving interactivity across controller crashes. The code here reflects this.

Min RK
Owner
minrk commented

I should note that the changes here are highly experimental, and shouldn't go into 0.13.

Fernando Perez
Owner

@minrk, do you still think we should hold off on this one for after 0.13? I understand if they aren't robust yet, but do they introduce any potential problems? Because if it's only a matter of robustness, we can merge it and flag this explicitly as an experimental feature; that at least will give it some field-testing...

Min RK
Owner
minrk commented

This would break connection-file compatibility between 0.13 and 0.12 clients/clusters, which I know there are people using right now. But the main reason is just that this is a big enough and tricky enough change that I want it to bake in master for longer than we have between now and 0.13. Especially the controller-resume part, which is far from complete, though it certainly works in some simple cases.

Fernando Perez
Owner

Understood; I'll update the description at the top with a more visible note about this.

Min RK
Owner
minrk commented

Test results for commit 7d7a06b merged into master
Platform: darwin

  • python2.6: Failed, log at https://gist.github.com/2839954 (libraries not available: matplotlib pygments pymongo qt tornado wx wx.aui)
  • python2.7: OK (libraries not available: wx wx.aui zmq)
  • python3.2: OK (libraries not available: matplotlib pymongo qt wx wx.aui)

Not available for testing:

Min RK
Owner
minrk commented

As we continue to move the goalposts on 0.13, I'm starting to feel like I want to split this one, and merge the new connection files into 0.13, and only leave controller resume until after release. Thoughts on that @fperez?

Fernando Perez
Owner
fperez commented

I was just thinking about pinging the list for 0.13 plans... We have so much new stuff that it's high time we start settling it down with eye towards a release... Do you think we'd be able to pull off an 0.13 release before we head out to SciPy?

Min RK
Owner
minrk commented

Not sure about a Final, but at least a beta or RC. We have 70 issues marked for 0.13 and 35 PRs, but I think very few of them actually need to get in to 0.13 (0 of mine, for instance, though I would like the file/shell cell magics). So if we switch into stabilization-only mode this week, I think we can make it.

It's a bit of an unfortunate time as we are just now exploring what we can/should be doing with Cell Magics.

We still haven't done the cell-metadata bit, which I think is a blocker so we reduce the need to increment the nbformat in the future.

Fernando Perez
Owner
fperez commented

That sounds like a good plan. I'm going to try to finish this lovely report now, and will then ping the list.

minrk added some commits
Min RK minrk Don't set explicit IDENTITY for clients
explicit IDENTITIES imply a potentially undesirable side-effect called 'durable sockets', which should be avoided.
7ea775c
Min RK minrk simplify IPython.parallel connections
Rolls back two-stage connection, putting complete connection info
into the connection files.  This makes it easier to use hand-crafted
ssh tunnels, as all ports are read from the file, rather than
from the reply to registration/connection requests.

It is no longer possible to connect to the Controller without a connection file.

Adding the serialization methods to the connection file also makes it
harder for custom serialization to result in a mismatch in configuration
between the various objects.
a70d89c
Min RK minrk use individual ports, rather than full urls in connection files be9601b
Min RK minrk enables resume of ipcontroller
cleans up EngineConnector and registration messages further, only storing single UUID for each engine (all the stored UUIDs happened to be the same already).

Message spec docs updated to reflect changes to message formats.
456307a
Min RK minrk remove a few dangling asbytes from rebase 5acc971
Min RK minrk IPython.parallel py3compat 2888383
Min RK
Owner
minrk commented

Rebased. I don't know why GitHub thinks it cannot be automatically merged, because it certainly can. Presumably this will resolve itself before long.

Fernando Perez
Owner
fperez commented

We should probably discuss this one at the sprint...

Min RK
Owner
minrk commented

Test results for commit 2888383 merged into master
Platform: darwin

  • python2.6: OK (libraries not available: cython matplotlib oct2py pygments pymongo qt rpy2 tornado wx wx.aui)
  • python2.7: OK (libraries not available: oct2py wx wx.aui)
  • python3.2: OK (libraries not available: cython matplotlib oct2py pymongo qt rpy2 wx wx.aui)

Not available for testing: python3.1

Fernando Perez
Owner
fperez commented

This is large and complex but the code and functionality looks very clean; as discussed during sprint this is best merged early in this cycle. Running testpr on linux just to be safe, will merge shortly if report is OK.

Fernando Perez
Owner
fperez commented

Test results for commit 2888383 merged into master
Platform: linux2

  • python2.7: OK
  • python3.2: OK (libraries not available: cython matplotlib oct2py pygments pymongo rpy2 wx wx.aui)

Not available for testing: python2.6, python3.1

Fernando Perez fperez merged commit bff463b into from
Fernando Perez fperez closed this
Min RK
Owner
minrk commented

I have a feeling this will conflict with Jason's, and I just asked him to rebase already and don't want to do it twice. So I will get that one in once it is rebased, then make sure this still applies, and merge.

Min RK
Owner
minrk commented

oops, nevermind. I'll take care of the metadata rebase if it needs one.

Min RK
Owner
minrk commented

metadata doesn't need a rebase, so all is well.

Fernando Perez
Owner
fperez commented

ok, glad to hear that.

Fernando Perez
Owner
fperez commented

ok, glad to hear that.

Min RK minrk deleted the branch
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Showing 6 unique commits by 1 author.

Jul 18, 2012
Min RK minrk Don't set explicit IDENTITY for clients
explicit IDENTITIES imply a potentially undesirable side-effect called 'durable sockets', which should be avoided.
7ea775c
Min RK minrk simplify IPython.parallel connections
Rolls back two-stage connection, putting complete connection info
into the connection files.  This makes it easier to use hand-crafted
ssh tunnels, as all ports are read from the file, rather than
from the reply to registration/connection requests.

It is no longer possible to connect to the Controller without a connection file.

Adding the serialization methods to the connection file also makes it
harder for custom serialization to result in a mismatch in configuration
between the various objects.
a70d89c
Min RK minrk use individual ports, rather than full urls in connection files be9601b
Min RK minrk enables resume of ipcontroller
cleans up EngineConnector and registration messages further, only storing single UUID for each engine (all the stored UUIDs happened to be the same already).

Message spec docs updated to reflect changes to message formats.
456307a
Min RK minrk remove a few dangling asbytes from rebase 5acc971
Min RK minrk IPython.parallel py3compat 2888383
This page is out of date. Refresh to see the latest.
141 IPython/parallel/apps/ipcontrollerapp.py
@@ -116,7 +116,10 @@
116 116 select one of the true db backends.
117 117 """),
118 118 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
119   - 'reuse existing json connection files')
  119 + 'reuse existing json connection files'),
  120 + 'restore' : ({'IPControllerApp' : {'restore_engines' : True, 'reuse_files' : True}},
  121 + 'Attempt to restore engines from a JSON file. '
  122 + 'For use when resuming a crashed controller'),
120 123 })
121 124
122 125 flags.update(session_flags)
@@ -156,6 +159,10 @@ class IPControllerApp(BaseParallelApplication):
156 159 If False, connection files will be removed on a clean exit.
157 160 """
158 161 )
  162 + restore_engines = Bool(False, config=True,
  163 + help="""Reload engine state from JSON file
  164 + """
  165 + )
159 166 ssh_server = Unicode(u'', config=True,
160 167 help="""ssh url for clients to use when connecting to the Controller
161 168 processes. It should be of the form: [user@]server[:port]. The
@@ -209,21 +216,17 @@ def _use_threads_changed(self, name, old, new):
209 216 def save_connection_dict(self, fname, cdict):
210 217 """save a connection dict to json file."""
211 218 c = self.config
212   - url = cdict['url']
  219 + url = cdict['registration']
213 220 location = cdict['location']
  221 +
214 222 if not location:
215 223 try:
216   - proto,ip,port = split_url(url)
217   - except AssertionError:
218   - pass
219   - else:
220   - try:
221   - location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
222   - except (socket.gaierror, IndexError):
223   - self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1."
224   - " You may need to specify '--location=<external_ip_address>' to help"
225   - " IPython decide when to connect via loopback.")
226   - location = '127.0.0.1'
  224 + location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
  225 + except (socket.gaierror, IndexError):
  226 + self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1."
  227 + " You may need to specify '--location=<external_ip_address>' to help"
  228 + " IPython decide when to connect via loopback.")
  229 + location = '127.0.0.1'
227 230 cdict['location'] = location
228 231 fname = os.path.join(self.profile_dir.security_dir, fname)
229 232 self.log.info("writing connection info to %s", fname)
@@ -235,35 +238,51 @@ def load_config_from_json(self):
235 238 """load config from existing json connector files."""
236 239 c = self.config
237 240 self.log.debug("loading config from JSON")
238   - # load from engine config
  241 +
  242 + # load engine config
  243 +
239 244 fname = os.path.join(self.profile_dir.security_dir, self.engine_json_file)
240 245 self.log.info("loading connection info from %s", fname)
241 246 with open(fname) as f:
242   - cfg = json.loads(f.read())
243   - key = cfg['exec_key']
  247 + ecfg = json.loads(f.read())
  248 +
244 249 # json gives unicode, Session.key wants bytes
245   - c.Session.key = key.encode('ascii')
246   - xport,addr = cfg['url'].split('://')
247   - c.HubFactory.engine_transport = xport
248   - ip,ports = addr.split(':')
  250 + c.Session.key = ecfg['exec_key'].encode('ascii')
  251 +
  252 + xport,ip = ecfg['interface'].split('://')
  253 +
249 254 c.HubFactory.engine_ip = ip
250   - c.HubFactory.regport = int(ports)
251   - self.location = cfg['location']
  255 + c.HubFactory.engine_transport = xport
  256 +
  257 + self.location = ecfg['location']
252 258 if not self.engine_ssh_server:
253   - self.engine_ssh_server = cfg['ssh']
  259 + self.engine_ssh_server = ecfg['ssh']
  260 +
254 261 # load client config
  262 +
255 263 fname = os.path.join(self.profile_dir.security_dir, self.client_json_file)
256 264 self.log.info("loading connection info from %s", fname)
257 265 with open(fname) as f:
258   - cfg = json.loads(f.read())
259   - assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
260   - xport,addr = cfg['url'].split('://')
  266 + ccfg = json.loads(f.read())
  267 +
  268 + for key in ('exec_key', 'registration', 'pack', 'unpack'):
  269 + assert ccfg[key] == ecfg[key], "mismatch between engine and client info: %r" % key
  270 +
  271 + xport,addr = ccfg['interface'].split('://')
  272 +
261 273 c.HubFactory.client_transport = xport
262   - ip,ports = addr.split(':')
263 274 c.HubFactory.client_ip = ip
264 275 if not self.ssh_server:
265   - self.ssh_server = cfg['ssh']
266   - assert int(ports) == c.HubFactory.regport, "regport mismatch"
  276 + self.ssh_server = ccfg['ssh']
  277 +
  278 + # load port config:
  279 + c.HubFactory.regport = ecfg['registration']
  280 + c.HubFactory.hb = (ecfg['hb_ping'], ecfg['hb_pong'])
  281 + c.HubFactory.control = (ccfg['control'], ecfg['control'])
  282 + c.HubFactory.mux = (ccfg['mux'], ecfg['mux'])
  283 + c.HubFactory.task = (ccfg['task'], ecfg['task'])
  284 + c.HubFactory.iopub = (ccfg['iopub'], ecfg['iopub'])
  285 + c.HubFactory.notifier_port = ccfg['notification']
267 286
268 287 def cleanup_connection_files(self):
269 288 if self.reuse_files:
@@ -314,29 +333,42 @@ def init_hub(self):
314 333 if self.write_connection_files:
315 334 # save to new json config files
316 335 f = self.factory
317   - cdict = {'exec_key' : f.session.key.decode('ascii'),
318   - 'ssh' : self.ssh_server,
319   - 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
320   - 'location' : self.location
321   - }
  336 + base = {
  337 + 'exec_key' : f.session.key.decode('ascii'),
  338 + 'location' : self.location,
  339 + 'pack' : f.session.packer,
  340 + 'unpack' : f.session.unpacker,
  341 + }
  342 +
  343 + cdict = {'ssh' : self.ssh_server}
  344 + cdict.update(f.client_info)
  345 + cdict.update(base)
322 346 self.save_connection_dict(self.client_json_file, cdict)
323   - edict = cdict
324   - edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
325   - edict['ssh'] = self.engine_ssh_server
  347 +
  348 + edict = {'ssh' : self.engine_ssh_server}
  349 + edict.update(f.engine_info)
  350 + edict.update(base)
326 351 self.save_connection_dict(self.engine_json_file, edict)
327 352
  353 + fname = "engines%s.json" % self.cluster_id
  354 + self.factory.hub.engine_state_file = os.path.join(self.profile_dir.log_dir, fname)
  355 + if self.restore_engines:
  356 + self.factory.hub._load_engine_state()
  357 +
328 358 def init_schedulers(self):
329 359 children = self.children
330 360 mq = import_item(str(self.mq_class))
331 361
332   - hub = self.factory
  362 + f = self.factory
  363 + ident = f.session.bsession
333 364 # disambiguate url, in case of *
334   - monitor_url = disambiguate_url(hub.monitor_url)
  365 + monitor_url = disambiguate_url(f.monitor_url)
335 366 # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
336 367 # IOPub relay (in a Process)
337 368 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
338   - q.bind_in(hub.client_info['iopub'])
339   - q.bind_out(hub.engine_info['iopub'])
  369 + q.bind_in(f.client_url('iopub'))
  370 + q.setsockopt_in(zmq.IDENTITY, ident + b"_iopub")
  371 + q.bind_out(f.engine_url('iopub'))
340 372 q.setsockopt_out(zmq.SUBSCRIBE, b'')
341 373 q.connect_mon(monitor_url)
342 374 q.daemon=True
@@ -344,18 +376,20 @@ def init_schedulers(self):
344 376
345 377 # Multiplexer Queue (in a Process)
346 378 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
347   - q.bind_in(hub.client_info['mux'])
348   - q.setsockopt_in(zmq.IDENTITY, b'mux')
349   - q.bind_out(hub.engine_info['mux'])
  379 + q.bind_in(f.client_url('mux'))
  380 + q.setsockopt_in(zmq.IDENTITY, b'mux_in')
  381 + q.bind_out(f.engine_url('mux'))
  382 + q.setsockopt_out(zmq.IDENTITY, b'mux_out')
350 383 q.connect_mon(monitor_url)
351 384 q.daemon=True
352 385 children.append(q)
353 386
354 387 # Control Queue (in a Process)
355 388 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
356   - q.bind_in(hub.client_info['control'])
357   - q.setsockopt_in(zmq.IDENTITY, b'control')
358   - q.bind_out(hub.engine_info['control'])
  389 + q.bind_in(f.client_url('control'))
  390 + q.setsockopt_in(zmq.IDENTITY, b'control_in')
  391 + q.bind_out(f.engine_url('control'))
  392 + q.setsockopt_out(zmq.IDENTITY, b'control_out')
359 393 q.connect_mon(monitor_url)
360 394 q.daemon=True
361 395 children.append(q)
@@ -368,9 +402,10 @@ def init_schedulers(self):
368 402 self.log.warn("task::using pure DEALER Task scheduler")
369 403 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
370 404 # q.setsockopt_out(zmq.HWM, hub.hwm)
371   - q.bind_in(hub.client_info['task'][1])
372   - q.setsockopt_in(zmq.IDENTITY, b'task')
373   - q.bind_out(hub.engine_info['task'])
  405 + q.bind_in(f.client_url('task'))
  406 + q.setsockopt_in(zmq.IDENTITY, b'task_in')
  407 + q.bind_out(f.engine_url('task'))
  408 + q.setsockopt_out(zmq.IDENTITY, b'task_out')
374 409 q.connect_mon(monitor_url)
375 410 q.daemon=True
376 411 children.append(q)
@@ -379,8 +414,10 @@ def init_schedulers(self):
379 414
380 415 else:
381 416 self.log.info("task::using Python %s Task scheduler"%scheme)
382   - sargs = (hub.client_info['task'][1], hub.engine_info['task'],
383   - monitor_url, disambiguate_url(hub.client_info['notification']))
  417 + sargs = (f.client_url('task'), f.engine_url('task'),
  418 + monitor_url, disambiguate_url(f.client_url('notification')),
  419 + disambiguate_url(f.client_url('registration')),
  420 + )
384 421 kwargs = dict(logname='scheduler', loglevel=self.log_level,
385 422 log_url = self.log_url, config=dict(self.config))
386 423 if 'Process' in self.mq_class:
38 IPython/parallel/apps/ipengineapp.py
@@ -45,7 +45,7 @@
45 45 from IPython.config.configurable import Configurable
46 46
47 47 from IPython.parallel.engine.engine import EngineFactory
48   -from IPython.parallel.util import disambiguate_url
  48 +from IPython.parallel.util import disambiguate_ip_address
49 49
50 50 from IPython.utils.importstring import import_item
51 51 from IPython.utils.py3compat import cast_bytes
@@ -211,24 +211,36 @@ def load_connector_file(self):
211 211 with open(self.url_file) as f:
212 212 d = json.loads(f.read())
213 213
214   - if 'exec_key' in d:
215   - config.Session.key = cast_bytes(d['exec_key'])
216   -
  214 + # allow hand-override of location for disambiguation
  215 + # and ssh-server
217 216 try:
218 217 config.EngineFactory.location
219 218 except AttributeError:
220 219 config.EngineFactory.location = d['location']
221 220
222   - d['url'] = disambiguate_url(d['url'], config.EngineFactory.location)
223   - try:
224   - config.EngineFactory.url
225   - except AttributeError:
226   - config.EngineFactory.url = d['url']
227   -
228 221 try:
229 222 config.EngineFactory.sshserver
230 223 except AttributeError:
231   - config.EngineFactory.sshserver = d['ssh']
  224 + config.EngineFactory.sshserver = d.get('ssh')
  225 +
  226 + location = config.EngineFactory.location
  227 +
  228 + proto, ip = d['interface'].split('://')
  229 + ip = disambiguate_ip_address(ip)
  230 + d['interface'] = '%s://%s' % (proto, ip)
  231 +
  232 + # DO NOT allow override of basic URLs, serialization, or exec_key
  233 + # JSON file takes top priority there
  234 + config.Session.key = cast_bytes(d['exec_key'])
  235 +
  236 + config.EngineFactory.url = d['interface'] + ':%i' % d['registration']
  237 +
  238 + config.Session.packer = d['pack']
  239 + config.Session.unpacker = d['unpack']
  240 +
  241 + self.log.debug("Config changed:")
  242 + self.log.debug("%r", config)
  243 + self.connection_info = d
232 244
233 245 def bind_kernel(self, **kwargs):
234 246 """Promote engine to listening kernel, accessible to frontends."""
@@ -320,7 +332,9 @@ def init_engine(self):
320 332 # shell_class = import_item(self.master_config.Global.shell_class)
321 333 # print self.config
322 334 try:
323   - self.engine = EngineFactory(config=config, log=self.log)
  335 + self.engine = EngineFactory(config=config, log=self.log,
  336 + connection_info=self.connection_info,
  337 + )
324 338 except:
325 339 self.log.error("Couldn't start the Engine", exc_info=True)
326 340 self.exit(1)
157 IPython/parallel/client/client.py
@@ -217,7 +217,9 @@ class Client(HasTraits):
217 217 Parameters
218 218 ----------
219 219
220   - url_or_file : bytes or unicode; zmq url or path to ipcontroller-client.json
  220 + url_file : str/unicode; path to ipcontroller-client.json
  221 + This JSON file should contain all the information needed to connect to a cluster,
  222 + and is likely the only argument needed.
221 223 Connection information for the Hub's registration. If a json connector
222 224 file is given, then likely no further configuration is necessary.
223 225 [Default: use profile]
@@ -239,14 +241,6 @@ class Client(HasTraits):
239 241 If specified, this will be relayed to the Session for configuration
240 242 username : str
241 243 set username for the session object
242   - packer : str (import_string) or callable
243   - Can be either the simple keyword 'json' or 'pickle', or an import_string to a
244   - function to serialize messages. Must support same input as
245   - JSON, and output must be bytes.
246   - You can pass a callable directly as `pack`
247   - unpacker : str (import_string) or callable
248   - The inverse of packer. Only necessary if packer is specified as *not* one
249   - of 'json' or 'pickle'.
250 244
251 245 #-------------- ssh related args ----------------
252 246 # These are args for configuring the ssh tunnel to be used
@@ -271,17 +265,6 @@ class Client(HasTraits):
271 265 flag for whether to use paramiko instead of shell ssh for tunneling.
272 266 [default: True on win32, False else]
273 267
274   - ------- exec authentication args -------
275   - If even localhost is untrusted, you can have some protection against
276   - unauthorized execution by signing messages with HMAC digests.
277   - Messages are still sent as cleartext, so if someone can snoop your
278   - loopback traffic this will not protect your privacy, but will prevent
279   - unauthorized execution.
280   -
281   - exec_key : str
282   - an authentication key or file containing a key
283   - default: None
284   -
285 268
286 269 Attributes
287 270 ----------
@@ -378,8 +361,8 @@ def __new__(self, *args, **kw):
378 361 # don't raise on positional args
379 362 return HasTraits.__new__(self, **kw)
380 363
381   - def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None,
382   - context=None, debug=False, exec_key=None,
  364 + def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=None,
  365 + context=None, debug=False,
383 366 sshserver=None, sshkey=None, password=None, paramiko=None,
384 367 timeout=10, **extra_args
385 368 ):
@@ -391,39 +374,46 @@ def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir
391 374 context = zmq.Context.instance()
392 375 self._context = context
393 376 self._stop_spinning = Event()
  377 +
  378 + if 'url_or_file' in extra_args:
  379 + url_file = extra_args['url_or_file']
  380 + warnings.warn("url_or_file arg no longer supported, use url_file", DeprecationWarning)
  381 +
  382 + if url_file and util.is_url(url_file):
  383 + raise ValueError("single urls cannot be specified, url-files must be used.")
394 384
395 385 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
  386 +
396 387 if self._cd is not None:
397   - if url_or_file is None:
398   - url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
399   - if url_or_file is None:
  388 + if url_file is None:
  389 + url_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
  390 + if url_file is None:
400 391 raise ValueError(
401 392 "I can't find enough information to connect to a hub!"
402   - " Please specify at least one of url_or_file or profile."
  393 + " Please specify at least one of url_file or profile."
403 394 )
404   -
405   - if not util.is_url(url_or_file):
406   - # it's not a url, try for a file
407   - if not os.path.exists(url_or_file):
408   - if self._cd:
409   - url_or_file = os.path.join(self._cd.security_dir, url_or_file)
410   - if not os.path.exists(url_or_file):
411   - raise IOError("Connection file not found: %r" % url_or_file)
412   - with open(url_or_file) as f:
413   - cfg = json.loads(f.read())
414   - else:
415   - cfg = {'url':url_or_file}
  395 +
  396 + with open(url_file) as f:
  397 + cfg = json.load(f)
  398 +
  399 + self._task_scheme = cfg['task_scheme']
416 400
417 401 # sync defaults from args, json:
418 402 if sshserver:
419 403 cfg['ssh'] = sshserver
420   - if exec_key:
421   - cfg['exec_key'] = exec_key
422   - exec_key = cfg['exec_key']
  404 +
423 405 location = cfg.setdefault('location', None)
424   - cfg['url'] = util.disambiguate_url(cfg['url'], location)
425   - url = cfg['url']
426   - proto,addr,port = util.split_url(url)
  406 +
  407 + proto,addr = cfg['interface'].split('://')
  408 + addr = util.disambiguate_ip_address(addr)
  409 + cfg['interface'] = "%s://%s" % (proto, addr)
  410 +
  411 + # turn interface,port into full urls:
  412 + for key in ('control', 'task', 'mux', 'iopub', 'notification', 'registration'):
  413 + cfg[key] = cfg['interface'] + ':%i' % cfg[key]
  414 +
  415 + url = cfg['registration']
  416 +
427 417 if location is not None and addr == '127.0.0.1':
428 418 # location specified, and connection is expected to be local
429 419 if location not in LOCAL_IPS and not sshserver:
@@ -448,7 +438,7 @@ def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir
448 438 self._ssh = bool(sshserver or sshkey or password)
449 439 if self._ssh and sshserver is None:
450 440 # default to ssh via localhost
451   - sshserver = url.split('://')[1].split(':')[0]
  441 + sshserver = addr
452 442 if self._ssh and password is None:
453 443 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
454 444 password=False
@@ -457,20 +447,18 @@ def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir
457 447 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
458 448
459 449 # configure and construct the session
460   - if exec_key is not None:
461   - if os.path.isfile(exec_key):
462   - extra_args['keyfile'] = exec_key
463   - else:
464   - exec_key = cast_bytes(exec_key)
465   - extra_args['key'] = exec_key
  450 + extra_args['packer'] = cfg['pack']
  451 + extra_args['unpacker'] = cfg['unpack']
  452 + extra_args['key'] = cast_bytes(cfg['exec_key'])
  453 +
466 454 self.session = Session(**extra_args)
467 455
468 456 self._query_socket = self._context.socket(zmq.DEALER)
469   - self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
  457 +
470 458 if self._ssh:
471   - tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
  459 + tunnel.tunnel_connection(self._query_socket, cfg['registration'], sshserver, **ssh_kwargs)
472 460 else:
473   - self._query_socket.connect(url)
  461 + self._query_socket.connect(cfg['registration'])
474 462
475 463 self.session.debug = self.debug
476 464
@@ -520,8 +508,9 @@ def _update_engines(self, engines):
520 508 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
521 509 for k,v in engines.iteritems():
522 510 eid = int(k)
  511 + if eid not in self._engines:
  512 + self._ids.append(eid)
523 513 self._engines[eid] = v
524   - self._ids.append(eid)
525 514 self._ids = sorted(self._ids)
526 515 if sorted(self._engines.keys()) != range(len(self._engines)) and \
527 516 self._task_scheme == 'pure' and self._task_socket:
@@ -583,7 +572,7 @@ def _connect(self, sshserver, ssh_kwargs, timeout):
583 572 self._connected=True
584 573
585 574 def connect_socket(s, url):
586   - url = util.disambiguate_url(url, self._config['location'])
  575 + # url = util.disambiguate_url(url, self._config['location'])
587 576 if self._ssh:
588 577 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
589 578 else:
@@ -600,38 +589,28 @@ def connect_socket(s, url):
600 589 idents,msg = self.session.recv(self._query_socket,mode=0)
601 590 if self.debug:
602 591 pprint(msg)
603   - msg = Message(msg)
604   - content = msg.content
605   - self._config['registration'] = dict(content)
606   - if content.status == 'ok':
607   - ident = self.session.bsession
608   - if content.mux:
609   - self._mux_socket = self._context.socket(zmq.DEALER)
610   - self._mux_socket.setsockopt(zmq.IDENTITY, ident)
611   - connect_socket(self._mux_socket, content.mux)
612   - if content.task:
613   - self._task_scheme, task_addr = content.task
614   - self._task_socket = self._context.socket(zmq.DEALER)
615   - self._task_socket.setsockopt(zmq.IDENTITY, ident)
616   - connect_socket(self._task_socket, task_addr)
617   - if content.notification:
618   - self._notification_socket = self._context.socket(zmq.SUB)
619   - connect_socket(self._notification_socket, content.notification)
620   - self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
621   - # if content.query:
622   - # self._query_socket = self._context.socket(zmq.DEALER)
623   - # self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
624   - # connect_socket(self._query_socket, content.query)
625   - if content.control:
626   - self._control_socket = self._context.socket(zmq.DEALER)
627   - self._control_socket.setsockopt(zmq.IDENTITY, ident)
628   - connect_socket(self._control_socket, content.control)
629   - if content.iopub:
630   - self._iopub_socket = self._context.socket(zmq.SUB)
631   - self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
632   - self._iopub_socket.setsockopt(zmq.IDENTITY, ident)
633   - connect_socket(self._iopub_socket, content.iopub)
634   - self._update_engines(dict(content.engines))
  592 + content = msg['content']
  593 + # self._config['registration'] = dict(content)
  594 + cfg = self._config
  595 + if content['status'] == 'ok':
  596 + self._mux_socket = self._context.socket(zmq.DEALER)
  597 + connect_socket(self._mux_socket, cfg['mux'])
  598 +
  599 + self._task_socket = self._context.socket(zmq.DEALER)
  600 + connect_socket(self._task_socket, cfg['task'])
  601 +
  602 + self._notification_socket = self._context.socket(zmq.SUB)
  603 + self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
  604 + connect_socket(self._notification_socket, cfg['notification'])
  605 +
  606 + self._control_socket = self._context.socket(zmq.DEALER)
  607 + connect_socket(self._control_socket, cfg['control'])
  608 +
  609 + self._iopub_socket = self._context.socket(zmq.SUB)
  610 + self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
  611 + connect_socket(self._iopub_socket, cfg['iopub'])
  612 +
  613 + self._update_engines(dict(content['engines']))
635 614 else:
636 615 self._connected = False
637 616 raise Exception("Failed to connect!")
@@ -674,7 +653,7 @@ def _register_engine(self, msg):
674 653 """Register a new engine, and update our connection info."""
675 654 content = msg['content']
676 655 eid = content['id']
677   - d = {eid : content['queue']}
  656 + d = {eid : content['uuid']}
678 657 self._update_engines(d)
679 658
680 659 def _unregister_engine(self, msg):
280 IPython/parallel/controller/hub.py
@@ -18,6 +18,8 @@
18 18 #-----------------------------------------------------------------------------
19 19 from __future__ import print_function
20 20
  21 +import json
  22 +import os
21 23 import sys
22 24 import time
23 25 from datetime import datetime
@@ -107,17 +109,16 @@ class EngineConnector(HasTraits):
107 109 """A simple object for accessing the various zmq connections of an object.
108 110 Attributes are:
109 111 id (int): engine ID
110   - uuid (str): uuid (unused?)
111   - queue (str): identity of queue's DEALER socket
112   - registration (str): identity of registration DEALER socket
113   - heartbeat (str): identity of heartbeat DEALER socket
  112 + uuid (unicode): engine UUID
  113 + pending: set of msg_ids
  114 + stallback: DelayedCallback for stalled registration
114 115 """
115   - id=Integer(0)
116   - queue=CBytes()
117   - control=CBytes()
118   - registration=CBytes()
119   - heartbeat=CBytes()
120   - pending=Set()
  116 +
  117 + id = Integer(0)
  118 + uuid = Unicode()
  119 + pending = Set()
  120 + stallback = Instance(ioloop.DelayedCallback)
  121 +
121 122
122 123 _db_shortcuts = {
123 124 'sqlitedb' : 'IPython.parallel.controller.sqlitedb.SQLiteDB',
@@ -131,29 +132,29 @@ class HubFactory(RegistrationFactory):
131 132
132 133 # port-pairs for monitoredqueues:
133 134 hb = Tuple(Integer,Integer,config=True,
134   - help="""DEALER/SUB Port pair for Engine heartbeats""")
  135 + help="""PUB/ROUTER Port pair for Engine heartbeats""")
135 136 def _hb_default(self):
136 137 return tuple(util.select_random_ports(2))
137 138
138 139 mux = Tuple(Integer,Integer,config=True,
139   - help="""Engine/Client Port pair for MUX queue""")
  140 + help="""Client/Engine Port pair for MUX queue""")
140 141
141 142 def _mux_default(self):
142 143 return tuple(util.select_random_ports(2))
143 144
144 145 task = Tuple(Integer,Integer,config=True,
145   - help="""Engine/Client Port pair for Task queue""")
  146 + help="""Client/Engine Port pair for Task queue""")
146 147 def _task_default(self):
147 148 return tuple(util.select_random_ports(2))
148 149
149 150 control = Tuple(Integer,Integer,config=True,
150   - help="""Engine/Client Port pair for Control queue""")
  151 + help="""Client/Engine Port pair for Control queue""")
151 152
152 153 def _control_default(self):
153 154 return tuple(util.select_random_ports(2))
154 155
155 156 iopub = Tuple(Integer,Integer,config=True,
156   - help="""Engine/Client Port pair for IOPub relay""")
  157 + help="""Client/Engine Port pair for IOPub relay""")
157 158
158 159 def _iopub_default(self):
159 160 return tuple(util.select_random_ports(2))
@@ -231,38 +232,77 @@ def start(self):
231 232 self.heartmonitor.start()
232 233 self.log.info("Heartmonitor started")
233 234
  235 + def client_url(self, channel):
  236 + """return full zmq url for a named client channel"""
  237 + return "%s://%s:%i" % (self.client_transport, self.client_ip, self.client_info[channel])
  238 +
  239 + def engine_url(self, channel):
  240 + """return full zmq url for a named engine channel"""
  241 + return "%s://%s:%i" % (self.engine_transport, self.engine_ip, self.engine_info[channel])
  242 +
234 243 def init_hub(self):
235   - """construct"""
236   - client_iface = "%s://%s:" % (self.client_transport, self.client_ip) + "%i"
237   - engine_iface = "%s://%s:" % (self.engine_transport, self.engine_ip) + "%i"
  244 + """construct Hub object"""
238 245
239 246 ctx = self.context
240 247 loop = self.loop
241 248
  249 + try:
  250 + scheme = self.config.TaskScheduler.scheme_name
  251 + except AttributeError:
  252 + from .scheduler import TaskScheduler
  253 + scheme = TaskScheduler.scheme_name.get_default_value()
  254 +
  255 + # build connection dicts
  256 + engine = self.engine_info = {
  257 + 'interface' : "%s://%s" % (self.engine_transport, self.engine_ip),
  258 + 'registration' : self.regport,
  259 + 'control' : self.control[1],
  260 + 'mux' : self.mux[1],
  261 + 'hb_ping' : self.hb[0],
  262 + 'hb_pong' : self.hb[1],
  263 + 'task' : self.task[1],
  264 + 'iopub' : self.iopub[1],
  265 + }
  266 +
  267 + client = self.client_info = {
  268 + 'interface' : "%s://%s" % (self.client_transport, self.client_ip),
  269 + 'registration' : self.regport,
  270 + 'control' : self.control[0],
  271 + 'mux' : self.mux[0],
  272 + 'task' : self.task[0],
  273 + 'task_scheme' : scheme,
  274 + 'iopub' : self.iopub[0],
  275 + 'notification' : self.notifier_port,
  276 + }
  277 +
  278 + self.log.debug("Hub engine addrs: %s", self.engine_info)
  279 + self.log.debug("Hub client addrs: %s", self.client_info)
  280 +
242 281 # Registrar socket
243 282 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
244   - q.bind(client_iface % self.regport)
245   - self.log.info("Hub listening on %s for registration.", client_iface % self.regport)
  283 + q.bind(self.client_url('registration'))
  284 + self.log.info("Hub listening on %s for registration.", self.client_url('registration'))
246 285 if self.client_ip != self.engine_ip:
247   - q.bind(engine_iface % self.regport)
248   - self.log.info("Hub listening on %s for registration.", engine_iface % self.regport)
  286 + q.bind(self.engine_url('registration'))
  287 + self.log.info("Hub listening on %s for registration.", self.engine_url('registration'))
249 288
250 289 ### Engine connections ###
251 290
252 291 # heartbeat
253 292 hpub = ctx.socket(zmq.PUB)
254   - hpub.bind(engine_iface % self.hb[0])
  293 + hpub.bind(self.engine_url('hb_ping'))
255 294 hrep = ctx.socket(zmq.ROUTER)
256   - hrep.bind(engine_iface % self.hb[1])
  295 + hrep.bind(self.engine_url('hb_pong'))
257 296 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
258 297 pingstream=ZMQStream(hpub,loop),
259 298 pongstream=ZMQStream(hrep,loop)
260 299 )
261 300
262 301 ### Client connections ###
  302 +
263 303 # Notifier socket
264 304 n = ZMQStream(ctx.socket(zmq.PUB), loop)
265   - n.bind(client_iface%self.notifier_port)
  305 + n.bind(self.client_url('notification'))
266 306
267 307 ### build and launch the queues ###
268 308
@@ -279,35 +319,10 @@ def init_hub(self):
279 319 self.db = import_item(str(db_class))(session=self.session.session,
280 320 config=self.config, log=self.log)
281 321 time.sleep(.25)
282   - try:
283   - scheme = self.config.TaskScheduler.scheme_name
284   - except AttributeError:
285   - from .scheduler import TaskScheduler
286   - scheme = TaskScheduler.scheme_name.get_default_value()
287   - # build connection dicts
288   - self.engine_info = {
289   - 'control' : engine_iface%self.control[1],
290   - 'mux': engine_iface%self.mux[1],
291   - 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
292   - 'task' : engine_iface%self.task[1],
293   - 'iopub' : engine_iface%self.iopub[1],
294   - # 'monitor' : engine_iface%self.mon_port,
295   - }
296   -
297   - self.client_info = {
298   - 'control' : client_iface%self.control[0],
299   - 'mux': client_iface%self.mux[0],
300   - 'task' : (scheme, client_iface%self.task[0]),
301   - 'iopub' : client_iface%self.iopub[0],
302   - 'notification': client_iface%self.notifier_port
303   - }
304   - self.log.debug("Hub engine addrs: %s", self.engine_info)
305   - self.log.debug("Hub client addrs: %s", self.client_info)
306 322
307 323 # resubmit stream
308 324 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
309   - url = util.disambiguate_url(self.client_info['task'][-1])
310   - r.setsockopt(zmq.IDENTITY, self.session.bsession)
  325 + url = util.disambiguate_url(self.client_url('task'))
311 326 r.connect(url)
312 327
313 328 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
@@ -335,6 +350,9 @@ class Hub(SessionFactory):
335 350 client_info: dict of zmq connection information for engines to connect
336 351 to the queues.
337 352 """
  353 +
  354 + engine_state_file = Unicode()
  355 +
338 356 # internal data structures:
339 357 ids=Set() # engine IDs
340 358 keytable=Dict()
@@ -382,15 +400,6 @@ def __init__(self, **kwargs):
382 400 super(Hub, self).__init__(**kwargs)
383 401 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
384 402
385   - # validate connection dicts:
386   - for k,v in self.client_info.iteritems():
387   - if k == 'task':
388   - util.validate_url_container(v[1])
389   - else:
390   - util.validate_url_container(v)
391   - # util.validate_url_container(self.client_info)
392   - util.validate_url_container(self.engine_info)
393   -
394 403 # register our callbacks
395 404 self.query.on_recv(self.dispatch_query)
396 405 self.monitor.on_recv(self.dispatch_monitor_traffic)
@@ -425,7 +434,7 @@ def __init__(self, **kwargs):
425 434 self.resubmit.on_recv(lambda msg: None, copy=False)
426 435
427 436 self.log.info("hub::created hub")
428   -
  437 +
429 438 @property
430 439 def _next_id(self):
431 440 """gemerate a new ID.
@@ -440,7 +449,7 @@ def _next_id(self):
440 449 # while newid in self.ids or newid in incoming:
441 450 # newid += 1
442 451 # return newid
443   -
  452 +
444 453 #-----------------------------------------------------------------------------
445 454 # message validation
446 455 #-----------------------------------------------------------------------------
@@ -556,11 +565,11 @@ def handle_heart_failure(self, heart):
556 565 triggers unregistration"""
557 566 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
558 567 eid = self.hearts.get(heart, None)
559   - queue = self.engines[eid].queue
  568 + uuid = self.engines[eid].uuid
560 569 if eid is None or self.keytable[eid] in self.dead_engines:
561 570 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
562 571 else:
563   - self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
  572 + self.unregister_engine(heart, dict(content=dict(id=eid, queue=uuid)))
564 573
565 574 #----------------------- MUX Queue Traffic ------------------------------
566 575
@@ -585,7 +594,7 @@ def save_queue_request(self, idents, msg):
585 594 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
586 595 # Unicode in records
587 596 record['engine_uuid'] = queue_id.decode('ascii')
588   - record['client_uuid'] = client_id.decode('ascii')
  597 + record['client_uuid'] = msg['header']['session']
589 598 record['queue'] = 'mux'
590 599
591 600 try:
@@ -677,7 +686,7 @@ def save_task_request(self, idents, msg):
677 686 return
678 687 record = init_record(msg)
679 688
680   - record['client_uuid'] = client_id.decode('ascii')
  689 + record['client_uuid'] = msg['header']['session']
681 690 record['queue'] = 'task'
682 691 header = msg['header']
683 692 msg_id = header['msg_id']
@@ -865,11 +874,10 @@ def connection_request(self, client_id, msg):
865 874 """Reply with connection addresses for clients."""
866 875 self.log.info("client::client %r connected", client_id)
867 876 content = dict(status='ok')
868   - content.update(self.client_info)
869 877 jsonable = {}
870 878 for k,v in self.keytable.iteritems():
871 879 if v not in self.dead_engines:
872   - jsonable[str(k)] = v.decode('ascii')
  880 + jsonable[str(k)] = v
873 881 content['engines'] = jsonable
874 882 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
875 883
@@ -877,48 +885,37 @@ def register_engine(self, reg, msg):
877 885 """Register a new engine."""
878 886 content = msg['content']
879 887 try:
880   - queue = cast_bytes(content['queue'])
  888 + uuid = content['uuid']
881 889 except KeyError:
882 890 self.log.error("registration::queue not specified", exc_info=True)
883 891 return
884   - heart = content.get('heartbeat', None)
885   - if heart:
886   - heart = cast_bytes(heart)
887   - """register a new engine, and create the socket(s) necessary"""
  892 +
888 893 eid = self._next_id
889   - # print (eid, queue, reg, heart)
890 894
891   - self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart)
  895 + self.log.debug("registration::register_engine(%i, %r)", eid, uuid)
892 896
893 897 content = dict(id=eid,status='ok')
894   - content.update(self.engine_info)
895 898 # check if requesting available IDs:
896   - if queue in self.by_ident:
897   - try:
898   - raise KeyError("queue_id %r in use" % queue)
899   - except:
900   - content = error.wrap_exception()
901   - self.log.error("queue_id %r in use", queue, exc_info=True)
902   - elif heart in self.hearts: # need to check unique hearts?
  899 + if cast_bytes(uuid) in self.by_ident:
903 900 try:
904   - raise KeyError("heart_id %r in use" % heart)
  901 + raise KeyError("uuid %r in use" % uuid)
905 902 except:
906   - self.log.error("heart_id %r in use", heart, exc_info=True)
907 903 content = error.wrap_exception()
  904 + self.log.error("uuid %r in use", uuid, exc_info=True)
908 905 else:
909   - for h, pack in self.incoming_registrations.iteritems():
910   - if heart == h:
  906 + for h, ec in self.incoming_registrations.iteritems():
  907 + if uuid == h:
911 908 try:
912   - raise KeyError("heart_id %r in use" % heart)
  909 + raise KeyError("heart_id %r in use" % uuid)
913 910 except:
914   - self.log.error("heart_id %r in use", heart, exc_info=True)
  911 + self.log.error("heart_id %r in use", uuid, exc_info=True)
915 912 content = error.wrap_exception()
916 913 break
917   - elif queue == pack[1]:
  914 + elif uuid == ec.uuid:
918 915 try:
919   - raise KeyError("queue_id %r in use" % queue)
  916 + raise KeyError("uuid %r in use" % uuid)
920 917 except:
921   - self.log.error("queue_id %r in use", queue, exc_info=True)
  918 + self.log.error("uuid %r in use", uuid, exc_info=True)
922 919 content = error.wrap_exception()
923 920 break
924 921
@@ -926,18 +923,21 @@ def register_engine(self, reg, msg):
926 923 content=content,
927 924 ident=reg)
928 925
  926 + heart = cast_bytes(uuid)
  927 +
929 928 if content['status'] == 'ok':
930 929 if heart in self.heartmonitor.hearts:
931 930 # already beating
932   - self.incoming_registrations[heart] = (eid,queue,reg[0],None)
  931 + self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid)
933 932 self.finish_registration(heart)
934 933 else:
935 934 purge = lambda : self._purge_stalled_registration(heart)
936 935 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
937 936 dc.start()
938   - self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
  937 + self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=dc)
939 938 else:
940 939 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
  940 +
941 941 return eid
942 942
943 943 def unregister_engine(self, ident, msg):
@@ -950,7 +950,7 @@ def unregister_engine(self, ident, msg):
950 950 self.log.info("registration::unregister_engine(%r)", eid)
951 951 # print (eid)
952 952 uuid = self.keytable[eid]
953   - content=dict(id=eid, queue=uuid.decode('ascii'))
  953 + content=dict(id=eid, uuid=uuid)
954 954 self.dead_engines.add(uuid)
955 955 # self.ids.remove(eid)
956 956 # uuid = self.keytable.pop(eid)
@@ -963,6 +963,8 @@ def unregister_engine(self, ident, msg):
963 963 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
964 964 dc.start()
965 965 ############## TODO: HANDLE IT ################
  966 +
  967 + self._save_engine_state()
966 968
967 969 if self.notifier:
968 970 self.session.send(self.notifier, "unregistration_notification", content=content)
@@ -1001,36 +1003,97 @@ def finish_registration(self, heart):
1001 1003 """Second half of engine registration, called after our HeartMonitor
1002 1004 has received a beat from the Engine's Heart."""
1003 1005 try:
1004   - (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
  1006 + ec = self.incoming_registrations.pop(heart)
1005 1007 except KeyError:
1006 1008 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
1007 1009 return
1008   - self.log.info("registration::finished registering engine %i:%r", eid, queue)
1009   - if purge is not None:
1010   - purge.stop()
1011   - control = queue
  1010 + self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid)
  1011 + if ec.stallback is not None:
  1012 + ec.stallback.stop()
  1013 + eid = ec.id
1012 1014 self.ids.add(eid)
1013   - self.keytable[eid] = queue
1014   - self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
1015   - control=control, heartbeat=heart)
1016   - self.by_ident[queue] = eid
  1015 + self.keytable[eid] = ec.uuid
  1016 + self.engines[eid] = ec
  1017 + self.by_ident[cast_bytes(ec.uuid)] = ec.id
1017 1018 self.queues[eid] = list()
1018 1019 self.tasks[eid] = list()
1019 1020 self.completed[eid] = list()
1020 1021 self.hearts[heart] = eid
1021   - content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii'))
  1022 + content = dict(id=eid, uuid=self.engines[eid].uuid)
1022 1023 if self.notifier:
1023 1024 self.session.send(self.notifier, "registration_notification", content=content)
1024 1025 self.log.info("engine::Engine Connected: %i", eid)
  1026 +
  1027 + self._save_engine_state()
1025 1028
1026 1029 def _purge_stalled_registration(self, heart):
1027 1030 if heart in self.incoming_registrations:
1028   - eid = self.incoming_registrations.pop(heart)[0]
1029   - self.log.info("registration::purging stalled registration: %i", eid)
  1031 + ec = self.incoming_registrations.pop(heart)
  1032 + self.log.info("registration::purging stalled registration: %i", ec.id)
1030 1033 else:
1031 1034 pass
1032 1035
1033 1036 #-------------------------------------------------------------------------
  1037 + # Engine State