Skip to content
This repository

unordered iteration of AsyncMapResults (+ a couple fixes) #920

Merged
merged 11 commits into from over 2 years ago

2 participants

Min RK Fernando Perez
Min RK
Owner

Feature:

  • AsyncMapResults now support unordered iteration, in case your map is inhomogeneous, and you want to get the quick results first (including new tests). Fix:
  • fixed '*' as an ip argument in the kernel/qtconsole and ipcontroller. It was not always getting converted to a connectable interface.

Docs and Examples:

  • added a couple more parallel examples, based on recent discussions (monitoring engine stdout/err, more advanced handling of results as they come in, AsyncMapResult iteration).
  • merged in some cleanup / fixes to the parallel docs from my SciPy 2011 tutorial.
  • moved parallel doc figures into figs subdir
added some commits October 21, 2011
Min RK move parallel doc figures into 'figs' subdir
also fix a few broken links after some renames in the examples dir.
ad7b07b
Min RK fix --ip='*' argument in various apps
It wasn't properly converted to localhost/0.0.0.0 everywhere, causing errors on startup.

Fixed in KernelApp, KernelManager (affects QtConsole), and ipcontroller
cde501a
Min RK update parallel docs with some changes from scipy tutorial
also added a couple more parallel examples:

* iopubwatcher for watching stdout/stderr of a cluster
* itermapresult for iterating through AsyncMapResults without waiting for all to complete
* customresults for advanced handling of results not provided by AsyncResult objects
0cd2b7a
Min RK add unordered iteration to AsyncMapResults
only exposed explicitly on LoadBalancedView.map
0d15755
Min RK add '/' separator to keys in SSH engine dict
to prevent confusing with number-ending nodenames in log output.
79a50b9
Min RK add deprecation warning for renamed engine/controller_launcher config
These have been renamed to add _class, which makes it clearer that they
are class names/paths.  This allows 0.11-style specification of the
names to work, but with a warning that the old name is deprecated.
79c1f62
Min RK expand engine/controller_launcher_class helpstring and docs
also fix foo_launcher -> foo_launcher_class in parallel docs

The names for these have been changed in 0.12
ad13d53
Min RK
Owner

I also made some further adjustments to docs and helpstrings, based on recent user discussion on-list, including a deprecation warning for the name-change from foo_launcher -> foo_launcher_class.

Min RK add early shutdown detection, and public-ip message to ipcluster/ipen…
…gine

When engines fail to connect, the error message will describe the
most likely cause (not instructing the controller to listen on a public interface).  An similar message is included in ipcluster, but due to restrictions, its trigger is purely time-based.
b85092a
IPython/parallel/apps/ipclusterapp.py
@@ -303,13 +321,42 @@ class IPClusterEngines(BaseParallelApplication):
303 321
         )
304 322
         return launcher
305 323
 
  324
+    def engines_started_okay(self):
1
Fernando Perez Owner
fperez added a note October 27, 2011

I'd name this just ..._ok for brevity (it also reads better in writing to me, but keep in mind I'm not a native speaker).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Fernando Perez fperez commented on the diff October 27, 2011
IPython/parallel/apps/ipcontrollerapp.py
@@ -287,13 +287,15 @@ class IPControllerApp(BaseParallelApplication):
287 287
         mq = import_item(str(self.mq_class))
288 288
         
289 289
         hub = self.factory
290  
-        # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
  290
+        # disambiguate url, in case of *
  291
+        monitor_url = disambiguate_url(hub.monitor_url)
  292
+        # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
2
Fernando Perez Owner
fperez added a note October 27, 2011

was this line meant to be left commented out?

Min RK Owner
minrk added a note October 27, 2011

yes, it's an artefact and reminder for using inproc instead of tcp to communicate between the schedulers and the Hub. We can only use it once we bump the pyzmq dependency to 2.1.9 (I think, possibly 2.1.7), which started using Context.instance() in thread devices, which is necessary for inproc support (inproc could also be called in-context, because that is the boundary that matters, not the process).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
IPython/parallel/engine/engine.py
@@ -214,6 +214,14 @@ class EngineFactory(RegistrationFactory):
214 214
 
215 215
     def abort(self):
216 216
         self.log.fatal("Registration timed out after %.1f seconds"%self.timeout)
  217
+        if '127' in self.url:
2
Fernando Perez Owner
fperez added a note October 27, 2011

Don't you mean if self.url.startswith('127.')? This will trigger on a perfectly valid IP like 128.32.52.127.

Min RK Owner
minrk added a note October 27, 2011

Good call, though it's better to show this message more often than less. I will make the change.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
docs/examples/parallel/itermapresult.py
... ...
@@ -0,0 +1,52 @@
  1
+"""Example of iteration through AsyncMapResult, without waiting for all results
2
Fernando Perez Owner
fperez added a note October 27, 2011

I think a slightly more detailed description here of what the example does would be useful. Most people will only read the docstring, so they need a bit more 'meat' in there to decide whether the example explains what they're looking for.

Min RK Owner
minrk added a note October 27, 2011

Sure, I'll add a paragraph about heterogeneous workloads, etc.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Fernando Perez
Owner

This looks awesome, and I only noticed one small thing that could be a bug. Once that's fixed, this must definitely go in.

If you do it, remember to push a fresh build of the docs, as there's a bunch of great new material here.

Thanks a ton, excellent work!

added some commits October 28, 2011
Min RK expand itermapresult docstring 2fad3ac
Min RK cleanup per review by @fperez
* '127' in url -> url.startswith('127.')
* engines_started_okay -> engines_started_ok
fe3786a
Min RK
Owner

comments should be addressed

Fernando Perez
Owner

@minrk, I got an error in the new test; running with

iptest -vvsx IPython.parallel

I got

======================================================================
ERROR: test_map_ordered (IPython.parallel.tests.test_lbview.TestLoadBalancedView)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/fperez/usr/lib/python2.6/site-packages/IPython/parallel/tests/test_lbview.py", line 92, in test_map_ordered
    astheycame = list(amr)
  File "<string>", line 2, in __getattr__
  File "/home/fperez/usr/lib/python2.6/site-packages/IPython/parallel/client/asyncresult.py", line 37, in check_ready
    raise error.TimeoutError("result not ready")
TimeoutError: result not ready

----------------------------------------------------------------------
Ran 61 tests in 33.994s

Thoughts?

Min RK AsyncResult.__getattr__ shouldn't raise TimeoutError
This causes problems for things that use hasattr, e.g. list(ar) checking
for `__length_hint__`.

tests added for getattr/getitem behavior
d1c1aaa
Min RK
Owner

Issue is unique to 2.6, and digging reveals it's a bug in the getattr code. In 2.6, list(amr) asks for __length_hint__, whereas 2.7 does not, and the getattr code checked the ready state, raising a TimeoutError. I've fixed it so that it only raises AttributeError or returns metadata. This means that asking for a metadata key by attr (ar.engine_id) will never raise a TimeoutError, it will raise an AttributeError if the metadata is not ready, whereas asking for ar['engine_id'] will raise TimeoutError if it is not available. It made sense to me before, to have getattr and getitem behave the same, but I realize now that raising errors other than AttributeError in getattr is a bad idea.

Tests for getattr and getitem behavior, and explicit test for calling list(amr) have been added to test_asyncresult.

Fernando Perez
Owner

Looks good now, tested on 2.6 and 2.7; merging. Thanks for the great work!

Fernando Perez fperez merged commit 854f3d9 into from October 29, 2011
Fernando Perez fperez closed this October 29, 2011
Fernando Perez fperez referenced this pull request from a commit January 10, 2012
Commit has since been removed from the repository and is no longer available.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Showing 11 unique commits by 1 author.

Oct 21, 2011
Min RK move parallel doc figures into 'figs' subdir
also fix a few broken links after some renames in the examples dir.
ad7b07b
Min RK fix --ip='*' argument in various apps
It wasn't properly converted to localhost/0.0.0.0 everywhere, causing errors on startup.

Fixed in KernelApp, KernelManager (affects QtConsole), and ipcontroller
cde501a
Min RK update parallel docs with some changes from scipy tutorial
also added a couple more parallel examples:

* iopubwatcher for watching stdout/stderr of a cluster
* itermapresult for iterating through AsyncMapResults without waiting for all to complete
* customresults for advanced handling of results not provided by AsyncResult objects
0cd2b7a
Min RK add unordered iteration to AsyncMapResults
only exposed explicitly on LoadBalancedView.map
0d15755
Oct 23, 2011
Min RK add '/' separator to keys in SSH engine dict
to prevent confusing with number-ending nodenames in log output.
79a50b9
Min RK add deprecation warning for renamed engine/controller_launcher config
These have been renamed to add _class, which makes it clearer that they
are class names/paths.  This allows 0.11-style specification of the
names to work, but with a warning that the old name is deprecated.
79c1f62
Min RK expand engine/controller_launcher_class helpstring and docs
also fix foo_launcher -> foo_launcher_class in parallel docs

The names for these have been changed in 0.12
ad13d53
Oct 26, 2011
Min RK add early shutdown detection, and public-ip message to ipcluster/ipen…
…gine

When engines fail to connect, the error message will describe the
most likely cause (not instructing the controller to listen on a public interface).  An similar message is included in ipcluster, but due to restrictions, its trigger is purely time-based.
b85092a
Oct 28, 2011
Min RK expand itermapresult docstring 2fad3ac
Min RK cleanup per review by @fperez
* '127' in url -> url.startswith('127.')
* engines_started_okay -> engines_started_ok
fe3786a
Min RK AsyncResult.__getattr__ shouldn't raise TimeoutError
This causes problems for things that use hasattr, e.g. list(ar) checking
for `__length_hint__`.

tests added for getattr/getitem behavior
d1c1aaa
This page is out of date. Refresh to see the latest.

Showing 46 changed files with 664 additions and 154 deletions. Show diff stats Hide diff stats

  1. 81  IPython/parallel/apps/ipclusterapp.py
  2. 16  IPython/parallel/apps/ipcontrollerapp.py
  3. 2  IPython/parallel/apps/launcher.py
  4. 59  IPython/parallel/client/asyncresult.py
  5. 15  IPython/parallel/client/remotefunction.py
  6. 19  IPython/parallel/client/view.py
  7. 8  IPython/parallel/engine/engine.py
  8. 42  IPython/parallel/tests/test_asyncresult.py
  9. 38  IPython/parallel/tests/test_lbview.py
  10. 9  IPython/zmq/heartbeat.py
  11. 3  IPython/zmq/kernelmanager.py
  12. 61  docs/examples/parallel/customresults.py
  13. 83  docs/examples/parallel/iopubwatcher.py
  14. 65  docs/examples/parallel/itermapresult.py
  15. 36  docs/source/parallel/dag_dependencies.txt
  16. 0  docs/source/parallel/{ → figs}/asian_call.pdf
  17. 0  docs/source/parallel/{ → figs}/asian_call.png
  18. 0  docs/source/parallel/{ → figs}/asian_put.pdf
  19. 0  docs/source/parallel/{ → figs}/asian_put.png
  20. 0  docs/source/parallel/{ → figs}/dagdeps.pdf
  21. 0  docs/source/parallel/{ → figs}/dagdeps.png
  22. 0  docs/source/parallel/{ → figs}/hpc_job_manager.pdf
  23. 0  docs/source/parallel/{ → figs}/hpc_job_manager.png
  24. 0  docs/source/parallel/{ → figs}/ipcluster_create.pdf
  25. 0  docs/source/parallel/{ → figs}/ipcluster_create.png
  26. 0  docs/source/parallel/{ → figs}/ipcluster_start.pdf
  27. 0  docs/source/parallel/{ → figs}/ipcluster_start.png
  28. 0  docs/source/parallel/{ → figs}/ipython_shell.pdf
  29. 0  docs/source/parallel/{ → figs}/ipython_shell.png
  30. 0  docs/source/parallel/{ → figs}/mec_simple.pdf
  31. 0  docs/source/parallel/{ → figs}/mec_simple.png
  32. 0  docs/source/parallel/{ → figs}/parallel_pi.pdf
  33. 0  docs/source/parallel/{ → figs}/parallel_pi.png
  34. 0  docs/source/parallel/{ → figs}/simpledag.pdf
  35. 0  docs/source/parallel/{ → figs}/simpledag.png
  36. 0  docs/source/parallel/{ → figs}/single_digits.pdf
  37. 0  docs/source/parallel/{ → figs}/single_digits.png
  38. 0  docs/source/parallel/{ → figs}/two_digit_counts.pdf
  39. 0  docs/source/parallel/{ → figs}/two_digit_counts.png
  40. BIN  docs/source/parallel/figs/wideView.png
  41. 32  docs/source/parallel/parallel_demos.txt
  42. 36  docs/source/parallel/parallel_intro.txt
  43. 80  docs/source/parallel/parallel_multiengine.txt
  44. 51  docs/source/parallel/parallel_process.txt
  45. 66  docs/source/parallel/parallel_task.txt
  46. 16  docs/source/parallel/parallel_winhpc.txt
81  IPython/parallel/apps/ipclusterapp.py
@@ -38,7 +38,7 @@
38 38
 from IPython.utils.daemonize import daemonize
39 39
 from IPython.utils.importstring import import_item
40 40
 from IPython.utils.sysinfo import num_cpus
41  
-from IPython.utils.traitlets import (Int, Unicode, Bool, CFloat, Dict, List,
  41
+from IPython.utils.traitlets import (Int, Unicode, Bool, CFloat, Dict, List, Any,
42 42
                                         DottedObjectName)
43 43
 
44 44
 from IPython.parallel.apps.baseapp import (
@@ -233,6 +233,12 @@ def _classes_default(self):
233 233
         help="""The number of engines to start. The default is to use one for each
234 234
         CPU on your machine""")
235 235
 
  236
+    engine_launcher = Any(config=True, help="Deprecated, use engine_launcher_class")
  237
+    def _engine_launcher_changed(self, name, old, new):
  238
+        if isinstance(new, basestring):
  239
+            self.log.warn("WARNING: %s.engine_launcher is deprecated as of 0.12,"
  240
+                    " use engine_launcher_class" % self.__class__.__name__)
  241
+            self.engine_launcher_class = new
236 242
     engine_launcher_class = DottedObjectName('LocalEngineSetLauncher',
237 243
         config=True,
238 244
         help="""The class for launching a set of Engines. Change this value
@@ -249,11 +255,22 @@ def _classes_default(self):
249 255
             MPIExecEngineSetLauncher : use mpiexec to launch in an MPI environment
250 256
             PBSEngineSetLauncher : use PBS (qsub) to submit engines to a batch queue
251 257
             SGEEngineSetLauncher : use SGE (qsub) to submit engines to a batch queue
  258
+            LSFEngineSetLauncher : use LSF (bsub) to submit engines to a batch queue
252 259
             SSHEngineSetLauncher : use SSH to start the controller
253 260
                                 Note that SSH does *not* move the connection files
254 261
                                 around, so you will likely have to do this manually
255 262
                                 unless the machines are on a shared file system.
256 263
             WindowsHPCEngineSetLauncher : use Windows HPC
  264
+
  265
+        If you are using one of IPython's builtin launchers, you can specify just the
  266
+        prefix, e.g:
  267
+
  268
+            c.IPClusterEngines.engine_launcher_class = 'SSH'
  269
+
  270
+        or:
  271
+
  272
+            ipcluster start --engines 'MPIExec'
  273
+
257 274
         """
258 275
         )
259 276
     daemonize = Bool(False, config=True,
@@ -265,9 +282,11 @@ def _daemonize_changed(self, name, old, new):
265 282
         if new:
266 283
             self.log_to_file = True
267 284
 
  285
+    early_shutdown = Int(30, config=True, help="The timeout (in seconds)")
  286
+    _stopping = False
  287
+    
268 288
     aliases = Dict(engine_aliases)
269 289
     flags = Dict(engine_flags)
270  
-    _stopping = False
271 290
 
272 291
     def initialize(self, argv=None):
273 292
         super(IPClusterEngines, self).initialize(argv)
@@ -276,7 +295,6 @@ def initialize(self, argv=None):
276 295
 
277 296
     def init_launchers(self):
278 297
         self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
279  
-        self.engine_launcher.on_stop(lambda r: self.loop.stop())
280 298
 
281 299
     def init_signal(self):
282 300
         # Setup signals
@@ -303,13 +321,42 @@ def build_launcher(self, clsname, kind=None):
303 321
         )
304 322
         return launcher
305 323
 
  324
+    def engines_started_ok(self):
  325
+        self.log.info("Engines appear to have started successfully")
  326
+        self.early_shutdown = 0
  327
+    
306 328
     def start_engines(self):
307 329
         self.log.info("Starting %i engines"%self.n)
308 330
         self.engine_launcher.start(self.n)
  331
+        self.engine_launcher.on_stop(self.engines_stopped_early)
  332
+        if self.early_shutdown:
  333
+            ioloop.DelayedCallback(self.engines_started_ok, self.early_shutdown*1000, self.loop).start()
  334
+
  335
+    def engines_stopped_early(self, r):
  336
+        if self.early_shutdown and not self._stopping:
  337
+            self.log.error("""
  338
+            Engines shutdown early, they probably failed to connect.
  339
+            
  340
+            Check the engine log files for output.
  341
+            
  342
+            If your controller and engines are not on the same machine, you probably
  343
+            have to instruct the controller to listen on an interface other than localhost.
  344
+            
  345
+            You can set this by adding "--ip='*'" to your ControllerLauncher.controller_args.
  346
+            
  347
+            Be sure to read our security docs before instructing your controller to listen on
  348
+            a public interface.
  349
+            """)
  350
+            self.stop_launchers()
  351
+        
  352
+        return self.engines_stopped(r)
  353
+    
  354
+    def engines_stopped(self, r):
  355
+        return self.loop.stop()
309 356
 
310 357
     def stop_engines(self):
311  
-        self.log.info("Stopping Engines...")
312 358
         if self.engine_launcher.running:
  359
+            self.log.info("Stopping Engines...")
313 360
             d = self.engine_launcher.stop()
314 361
             return d
315 362
         else:
@@ -321,7 +368,7 @@ def stop_launchers(self, r=None):
321 368
             self.log.error("IPython cluster: stopping")
322 369
             self.stop_engines()
323 370
             # Wait a few seconds to let things shut down.
324  
-            dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
  371
+            dc = ioloop.DelayedCallback(self.loop.stop, 3000, self.loop)
325 372
             dc.start()
326 373
 
327 374
     def sigint_handler(self, signum, frame):
@@ -393,6 +440,13 @@ def _classes_default(self,):
393 440
     delay = CFloat(1., config=True,
394 441
         help="delay (in s) between starting the controller and the engines")
395 442
 
  443
+    controller_launcher = Any(config=True, help="Deprecated, use controller_launcher_class")
  444
+    def _controller_launcher_changed(self, name, old, new):
  445
+        if isinstance(new, basestring):
  446
+            # old 0.11-style config
  447
+            self.log.warn("WARNING: %s.controller_launcher is deprecated as of 0.12,"
  448
+                    " use controller_launcher_class" % self.__class__.__name__)
  449
+            self.controller_launcher_class = new
396 450
     controller_launcher_class = DottedObjectName('LocalControllerLauncher',
397 451
         config=True,
398 452
         help="""The class for launching a Controller. Change this value if you want
@@ -407,8 +461,19 @@ def _classes_default(self,):
407 461
             MPIExecControllerLauncher : use mpiexec to launch engines in an MPI universe
408 462
             PBSControllerLauncher : use PBS (qsub) to submit engines to a batch queue
409 463
             SGEControllerLauncher : use SGE (qsub) to submit engines to a batch queue
  464
+            LSFControllerLauncher : use LSF (bsub) to submit engines to a batch queue
410 465
             SSHControllerLauncher : use SSH to start the controller
411 466
             WindowsHPCControllerLauncher : use Windows HPC
  467
+
  468
+        If you are using one of IPython's builtin launchers, you can specify just the
  469
+        prefix, e.g:
  470
+
  471
+            c.IPClusterStart.controller_launcher_class = 'SSH'
  472
+
  473
+        or:
  474
+
  475
+            ipcluster start --controller 'MPIExec'
  476
+
412 477
         """
413 478
         )
414 479
     reset = Bool(False, config=True,
@@ -422,7 +487,11 @@ def init_launchers(self):
422 487
         self.controller_launcher = self.build_launcher(self.controller_launcher_class, 'Controller')
423 488
         self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
424 489
         self.controller_launcher.on_stop(self.stop_launchers)
425  
-
  490
+    
  491
+    def engines_stopped(self, r):
  492
+        """prevent parent.engines_stopped from stopping everything on engine shutdown"""
  493
+        pass
  494
+    
426 495
     def start_controller(self):
427 496
         self.controller_launcher.start()
428 497
 
16  IPython/parallel/apps/ipcontrollerapp.py
@@ -54,7 +54,7 @@
54 54
 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
55 55
 from IPython.parallel.controller.sqlitedb import SQLiteDB
56 56
 
57  
-from IPython.parallel.util import signal_children, split_url, asbytes
  57
+from IPython.parallel.util import signal_children, split_url, asbytes, disambiguate_url
58 58
 
59 59
 # conditional import of MongoDB backend class
60 60
 
@@ -287,13 +287,15 @@ def init_schedulers(self):
287 287
         mq = import_item(str(self.mq_class))
288 288
         
289 289
         hub = self.factory
290  
-        # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
  290
+        # disambiguate url, in case of *
  291
+        monitor_url = disambiguate_url(hub.monitor_url)
  292
+        # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
291 293
         # IOPub relay (in a Process)
292 294
         q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
293 295
         q.bind_in(hub.client_info['iopub'])
294 296
         q.bind_out(hub.engine_info['iopub'])
295 297
         q.setsockopt_out(zmq.SUBSCRIBE, b'')
296  
-        q.connect_mon(hub.monitor_url)
  298
+        q.connect_mon(monitor_url)
297 299
         q.daemon=True
298 300
         children.append(q)
299 301
 
@@ -302,7 +304,7 @@ def init_schedulers(self):
302 304
         q.bind_in(hub.client_info['mux'])
303 305
         q.setsockopt_in(zmq.IDENTITY, b'mux')
304 306
         q.bind_out(hub.engine_info['mux'])
305  
-        q.connect_mon(hub.monitor_url)
  307
+        q.connect_mon(monitor_url)
306 308
         q.daemon=True
307 309
         children.append(q)
308 310
 
@@ -311,7 +313,7 @@ def init_schedulers(self):
311 313
         q.bind_in(hub.client_info['control'])
312 314
         q.setsockopt_in(zmq.IDENTITY, b'control')
313 315
         q.bind_out(hub.engine_info['control'])
314  
-        q.connect_mon(hub.monitor_url)
  316
+        q.connect_mon(monitor_url)
315 317
         q.daemon=True
316 318
         children.append(q)
317 319
         try:
@@ -326,7 +328,7 @@ def init_schedulers(self):
326 328
             q.bind_in(hub.client_info['task'][1])
327 329
             q.setsockopt_in(zmq.IDENTITY, b'task')
328 330
             q.bind_out(hub.engine_info['task'])
329  
-            q.connect_mon(hub.monitor_url)
  331
+            q.connect_mon(monitor_url)
330 332
             q.daemon=True
331 333
             children.append(q)
332 334
         elif scheme == 'none':
@@ -335,7 +337,7 @@ def init_schedulers(self):
335 337
         else:
336 338
             self.log.info("task::using Python %s Task scheduler"%scheme)
337 339
             sargs = (hub.client_info['task'][1], hub.engine_info['task'],
338  
-                                hub.monitor_url, hub.client_info['notification'])
  340
+                                monitor_url, disambiguate_url(hub.client_info['notification']))
339 341
             kwargs = dict(logname='scheduler', loglevel=self.log_level,
340 342
                             log_url = self.log_url, config=dict(self.config))
341 343
             if 'Process' in self.mq_class:
2  IPython/parallel/apps/launcher.py
@@ -633,7 +633,7 @@ def start(self, n):
633 633
                 d = el.start(user=user, hostname=host)
634 634
                 if i==0:
635 635
                     self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
636  
-                self.launchers[host+str(i)] = el
  636
+                self.launchers[ "%s/%i" % (host,i) ] = el
637 637
                 dlist.append(d)
638 638
         self.notify_start(dlist)
639 639
         return dlist
59  IPython/parallel/client/asyncresult.py
@@ -62,6 +62,7 @@ def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None)
62 62
         self._tracker = tracker
63 63
         self._ready = False
64 64
         self._success = None
  65
+        self._metadata = None
65 66
         if len(msg_ids) == 1:
66 67
             self._single_result = not isinstance(targets, (list, tuple))
67 68
         else:
@@ -231,13 +232,13 @@ def __getitem__(self, key):
231 232
         else:
232 233
             raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
233 234
 
234  
-    @check_ready
235 235
     def __getattr__(self, key):
236 236
         """getattr maps to getitem for convenient attr access to metadata."""
237  
-        if key not in self._metadata[0].keys():
  237
+        try:
  238
+            return self.__getitem__(key)
  239
+        except (error.TimeoutError, KeyError):
238 240
             raise AttributeError("%r object has no attribute %r"%(
239 241
                     self.__class__.__name__, key))
240  
-        return self.__getitem__(key)
241 242
 
242 243
     # asynchronous iterator:
243 244
     def __iter__(self):
@@ -261,12 +262,19 @@ class AsyncMapResult(AsyncResult):
261 262
     """Class for representing results of non-blocking gathers.
262 263
 
263 264
     This will properly reconstruct the gather.
  265
+    
  266
+    This class is iterable at any time, and will wait on results as they come.
  267
+    
  268
+    If ordered=False, then the first results to arrive will come first, otherwise
  269
+    results will be yielded in the order they were submitted.
  270
+    
264 271
     """
265 272
 
266  
-    def __init__(self, client, msg_ids, mapObject, fname=''):
  273
+    def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
267 274
         AsyncResult.__init__(self, client, msg_ids, fname=fname)
268 275
         self._mapObject = mapObject
269 276
         self._single_result = False
  277
+        self.ordered = ordered
270 278
 
271 279
     def _reconstruct_result(self, res):
272 280
         """Perform the gather on the actual results."""
@@ -274,6 +282,13 @@ def _reconstruct_result(self, res):
274 282
 
275 283
     # asynchronous iterator:
276 284
     def __iter__(self):
  285
+        it = self._ordered_iter if self.ordered else self._unordered_iter
  286
+        for r in it():
  287
+            yield r
  288
+
  289
+    # asynchronous ordered iterator:
  290
+    def _ordered_iter(self):
  291
+        """iterator for results *as they arrive*, preserving submission order."""
277 292
         try:
278 293
             rlist = self.get(0)
279 294
         except error.TimeoutError:
@@ -294,6 +309,42 @@ def __iter__(self):
294 309
             for r in rlist:
295 310
                 yield r
296 311
 
  312
+    # asynchronous unordered iterator:
  313
+    def _unordered_iter(self):
  314
+        """iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
  315
+        try:
  316
+            rlist = self.get(0)
  317
+        except error.TimeoutError:
  318
+            pending = set(self.msg_ids)
  319
+            while pending:
  320
+                try:
  321
+                    self._client.wait(pending, 1e-3)
  322
+                except error.TimeoutError:
  323
+                    # ignore timeout error, because that only means
  324
+                    # *some* jobs are outstanding
  325
+                    pass
  326
+                # update ready set with those no longer outstanding:
  327
+                ready = pending.difference(self._client.outstanding)
  328
+                # update pending to exclude those that are finished
  329
+                pending = pending.difference(ready)
  330
+                while ready:
  331
+                    msg_id = ready.pop()
  332
+                    ar = AsyncResult(self._client, msg_id, self._fname)
  333
+                    rlist = ar.get()
  334
+                    try:
  335
+                        for r in rlist:
  336
+                            yield r
  337
+                    except TypeError:
  338
+                        # flattened, not a list
  339
+                        # this could get broken by flattened data that returns iterables
  340
+                        # but most calls to map do not expose the `flatten` argument
  341
+                        yield rlist
  342
+        else:
  343
+            # already done
  344
+            for r in rlist:
  345
+                yield r
  346
+
  347
+
297 348
 
298 349
 class AsyncHubResult(AsyncResult):
299 350
     """Class to wrap pending results that must be requested from the Hub.
15  IPython/parallel/client/remotefunction.py
@@ -46,7 +46,7 @@ def remote_function(f):
46 46
     return remote_function
47 47
 
48 48
 @skip_doctest
49  
-def parallel(view, dist='b', block=None, **flags):
  49
+def parallel(view, dist='b', block=None, ordered=True, **flags):
50 50
     """Turn a function into a parallel remote function.
51 51
 
52 52
     This method can be used for map:
@@ -57,7 +57,7 @@ def parallel(view, dist='b', block=None, **flags):
57 57
     """
58 58
 
59 59
     def parallel_function(f):
60  
-        return ParallelFunction(view, f, dist=dist, block=block, **flags)
  60
+        return ParallelFunction(view, f, dist=dist, block=block, ordered=ordered, **flags)
61 61
     return parallel_function
62 62
 
63 63
 #--------------------------------------------------------------------------
@@ -122,15 +122,19 @@ class ParallelFunction(RemoteFunction):
122 122
         to use the current `block` attribute of `view`
123 123
     chunksize : int or None
124 124
         The size of chunk to use when breaking up sequences in a load-balanced manner
  125
+    ordered : bool [default: True]
  126
+        Whether 
125 127
     **flags : remaining kwargs are passed to View.temp_flags
126 128
     """
127 129
 
128 130
     chunksize=None
  131
+    ordered=None
129 132
     mapObject=None
130 133
 
131  
-    def __init__(self, view, f, dist='b', block=None, chunksize=None, **flags):
  134
+    def __init__(self, view, f, dist='b', block=None, chunksize=None, ordered=True, **flags):
132 135
         super(ParallelFunction, self).__init__(view, f, block=block, **flags)
133 136
         self.chunksize = chunksize
  137
+        self.ordered = ordered
134 138
 
135 139
         mapClass = Map.dists[dist]
136 140
         self.mapObject = mapClass()
@@ -186,7 +190,10 @@ def __call__(self, *sequences):
186 190
 
187 191
             msg_ids.append(ar.msg_ids[0])
188 192
 
189  
-        r = AsyncMapResult(self.view.client, msg_ids, self.mapObject, fname=self.func.__name__)
  193
+        r = AsyncMapResult(self.view.client, msg_ids, self.mapObject, 
  194
+                            fname=self.func.__name__,
  195
+                            ordered=self.ordered
  196
+                        )
190 197
 
191 198
         if self.block:
192 199
             try:
19  IPython/parallel/client/view.py
@@ -992,7 +992,7 @@ def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
992 992
     @spin_after
993 993
     @save_ids
994 994
     def map(self, f, *sequences, **kwargs):
995  
-        """view.map(f, *sequences, block=self.block, chunksize=1) => list|AsyncMapResult
  995
+        """view.map(f, *sequences, block=self.block, chunksize=1, ordered=True) => list|AsyncMapResult
996 996
 
997 997
         Parallel version of builtin `map`, load-balanced by this View.
998 998
 
@@ -1009,14 +1009,20 @@ def map(self, f, *sequences, **kwargs):
1009 1009
             function to be mapped
1010 1010
         *sequences: one or more sequences of matching length
1011 1011
             the sequences to be distributed and passed to `f`
1012  
-        block : bool
1013  
-            whether to wait for the result or not [default self.block]
  1012
+        block : bool [default self.block]
  1013
+            whether to wait for the result or not
1014 1014
         track : bool
1015 1015
             whether to create a MessageTracker to allow the user to
1016 1016
             safely edit after arrays and buffers during non-copying
1017 1017
             sends.
1018  
-        chunksize : int
1019  
-            how many elements should be in each task [default 1]
  1018
+        chunksize : int [default 1]
  1019
+            how many elements should be in each task.
  1020
+        ordered : bool [default True]
  1021
+            Whether the results should be gathered as they arrive, or enforce
  1022
+            the order of submission.
  1023
+            
  1024
+            Only applies when iterating through AsyncMapResult as results arrive.
  1025
+            Has no effect when block=True.
1020 1026
 
1021 1027
         Returns
1022 1028
         -------
@@ -1034,6 +1040,7 @@ def map(self, f, *sequences, **kwargs):
1034 1040
         # default
1035 1041
         block = kwargs.get('block', self.block)
1036 1042
         chunksize = kwargs.get('chunksize', 1)
  1043
+        ordered = kwargs.get('ordered', True)
1037 1044
 
1038 1045
         keyset = set(kwargs.keys())
1039 1046
         extra_keys = keyset.difference_update(set(['block', 'chunksize']))
@@ -1042,7 +1049,7 @@ def map(self, f, *sequences, **kwargs):
1042 1049
 
1043 1050
         assert len(sequences) > 0, "must have some sequences to map onto!"
1044 1051
 
1045  
-        pf = ParallelFunction(self, f, block=block,  chunksize=chunksize)
  1052
+        pf = ParallelFunction(self, f, block=block, chunksize=chunksize, ordered=ordered)
1046 1053
         return pf.map(*sequences)
1047 1054
 
1048 1055
 __all__ = ['LoadBalancedView', 'DirectView']
8  IPython/parallel/engine/engine.py
@@ -214,6 +214,14 @@ def complete_registration(self, msg, connect, maybe_tunnel):
214 214
 
215 215
     def abort(self):
216 216
         self.log.fatal("Registration timed out after %.1f seconds"%self.timeout)
  217
+        if self.url.startswith('127.'):
  218
+            self.log.fatal("""
  219
+            If the controller and engines are not on the same machine,
  220
+            you will have to instruct the controller to listen on an external IP (in ipcontroller_config.py):
  221
+                c.HubFactory.ip='*' # for all interfaces, internal and external
  222
+                c.HubFactory.ip='192.168.1.101' # or any interface that the engines can see
  223
+            or tunnel connections via ssh.
  224
+            """)
217 225
         self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
218 226
         time.sleep(1)
219 227
         sys.exit(255)
42  IPython/parallel/tests/test_asyncresult.py
@@ -70,4 +70,46 @@ def test_get_dict(self):
70 70
         self.assertEquals(sorted(d.keys()), sorted(self.client.ids))
71 71
         for eid,r in d.iteritems():
72 72
             self.assertEquals(r, 5)
  73
+    
  74
+    def test_list_amr(self):
  75
+        ar = self.client.load_balanced_view().map_async(wait, [0.1]*5)
  76
+        rlist = list(ar)
  77
+    
  78
+    def test_getattr(self):
  79
+        ar = self.client[:].apply_async(wait, 0.5)
  80
+        self.assertRaises(AttributeError, lambda : ar._foo)
  81
+        self.assertRaises(AttributeError, lambda : ar.__length_hint__())
  82
+        self.assertRaises(AttributeError, lambda : ar.foo)
  83
+        self.assertRaises(AttributeError, lambda : ar.engine_id)
  84
+        self.assertFalse(hasattr(ar, '__length_hint__'))
  85
+        self.assertFalse(hasattr(ar, 'foo'))
  86
+        self.assertFalse(hasattr(ar, 'engine_id'))
  87
+        ar.get(5)
  88
+        self.assertRaises(AttributeError, lambda : ar._foo)
  89
+        self.assertRaises(AttributeError, lambda : ar.__length_hint__())
  90
+        self.assertRaises(AttributeError, lambda : ar.foo)
  91
+        self.assertTrue(isinstance(ar.engine_id, list))
  92
+        self.assertEquals(ar.engine_id, ar['engine_id'])
  93
+        self.assertFalse(hasattr(ar, '__length_hint__'))
  94
+        self.assertFalse(hasattr(ar, 'foo'))
  95
+        self.assertTrue(hasattr(ar, 'engine_id'))
  96
+
  97
+    def test_getitem(self):
  98
+        ar = self.client[:].apply_async(wait, 0.5)
  99
+        self.assertRaises(TimeoutError, lambda : ar['foo'])
  100
+        self.assertRaises(TimeoutError, lambda : ar['engine_id'])
  101
+        ar.get(5)
  102
+        self.assertRaises(KeyError, lambda : ar['foo'])
  103
+        self.assertTrue(isinstance(ar['engine_id'], list))
  104
+        self.assertEquals(ar.engine_id, ar['engine_id'])
  105
+    
  106
+    def test_single_result(self):
  107
+        ar = self.client[-1].apply_async(wait, 0.5)
  108
+        self.assertRaises(TimeoutError, lambda : ar['foo'])
  109
+        self.assertRaises(TimeoutError, lambda : ar['engine_id'])
  110
+        self.assertTrue(ar.get(5) == 0.5)
  111
+        self.assertTrue(isinstance(ar['engine_id'], int))
  112
+        self.assertTrue(isinstance(ar.engine_id, int))
  113
+        self.assertEquals(ar.engine_id, ar['engine_id'])
  114
+
73 115
 
38  IPython/parallel/tests/test_lbview.py
@@ -58,6 +58,44 @@ def f(x):
58 58
         r = self.view.map_sync(f, data)
59 59
         self.assertEquals(r, map(f, data))
60 60
 
  61
+    def test_map_unordered(self):
  62
+        def f(x):
  63
+            return x**2
  64
+        def slow_f(x):
  65
+            import time
  66
+            time.sleep(0.05*x)
  67
+            return x**2
  68
+        data = range(16,0,-1)
  69
+        reference = map(f, data)
  70
+        
  71
+        amr = self.view.map_async(slow_f, data, ordered=False)
  72
+        self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
  73
+        # check individual elements, retrieved as they come
  74
+        # list comprehension uses __iter__
  75
+        astheycame = [ r for r in amr ]
  76
+        # Ensure that at least one result came out of order:
  77
+        self.assertNotEquals(astheycame, reference, "should not have preserved order")
  78
+        self.assertEquals(sorted(astheycame, reverse=True), reference, "result corrupted")
  79
+
  80
+    def test_map_ordered(self):
  81
+        def f(x):
  82
+            return x**2
  83
+        def slow_f(x):
  84
+            import time
  85
+            time.sleep(0.05*x)
  86
+            return x**2
  87
+        data = range(16,0,-1)
  88
+        reference = map(f, data)
  89
+        
  90
+        amr = self.view.map_async(slow_f, data)
  91
+        self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
  92
+        # check individual elements, retrieved as they come
  93
+        # list(amr) uses __iter__
  94
+        astheycame = list(amr)
  95
+        # Ensure that results came in order
  96
+        self.assertEquals(astheycame, reference)
  97
+        self.assertEquals(amr.result, reference)
  98
+    
61 99
     def test_abort(self):
62 100
         view = self.view
63 101
         ar = self.client[:].apply_async(time.sleep, .5)
9  IPython/zmq/heartbeat.py
@@ -31,15 +31,14 @@ class Heartbeat(Thread):
31 31
     def __init__(self, context, addr=(LOCALHOST, 0)):
32 32
         Thread.__init__(self)
33 33
         self.context = context
34  
-        self.addr = addr
35  
-        self.ip = addr[0]
36  
-        self.port = addr[1]
  34
+        self.ip, self.port = addr
37 35
         if self.port == 0:
38 36
             s = socket.socket()
39  
-            s.bind(self.addr)
  37
+            # '*' means all interfaces to 0MQ, which is '' to socket.socket
  38
+            s.bind(('' if self.ip == '*' else self.ip, 0))
40 39
             self.port = s.getsockname()[1]
41 40
             s.close()
42  
-            self.addr = (self.ip, self.port)
  41
+        self.addr = (self.ip, self.port)
43 42
         self.daemon = True
44 43
 
45 44
     def run(self):
3  IPython/zmq/kernelmanager.py
@@ -710,6 +710,9 @@ def _context_default(self):
710 710
     # The addresses for the communication channels.
711 711
     connection_file = Unicode('')
712 712
     ip = Unicode(LOCALHOST)
  713
+    def _ip_changed(self, name, old, new):
  714
+        if new == '*':
  715
+            self.ip = '0.0.0.0'
713 716
     shell_port = Int(0)
714 717
     iopub_port = Int(0)
715 718
     stdin_port = Int(0)
61  docs/examples/parallel/customresults.py
... ...
@@ -0,0 +1,61 @@
  1
+"""An example for handling results in a way that AsyncMapResult doesn't provide
  2
+
  3
+Specifically, out-of-order results with some special handing of metadata.
  4
+
  5
+This just submits a bunch of jobs, waits on the results, and prints the stdout
  6
+and results of each as they finish.
  7
+
  8
+Authors
  9
+-------
  10
+* MinRK
  11
+"""
  12
+import time
  13
+import random
  14
+
  15
+from IPython import parallel
  16
+
  17
+# create client & views
  18
+rc = parallel.Client()
  19
+dv = rc[:]
  20
+v = rc.load_balanced_view()
  21
+
  22
+
  23
+# scatter 'id', so id=0,1,2 on engines 0,1,2
  24
+dv.scatter('id', rc.ids, flatten=True)
  25
+print dv['id']
  26
+
  27
+
  28
+def sleep_here(count, t):
  29
+    """simple function that takes args, prints a short message, sleeps for a time, and returns the same args"""
  30
+    import time,sys
  31
+    print "hi from engine %i" % id
  32
+    sys.stdout.flush()
  33
+    time.sleep(t)
  34
+    return count,t
  35
+
  36
+amr = v.map(sleep_here, range(100), [ random.random() for i in range(100) ], chunksize=2)
  37
+
  38
+pending = set(amr.msg_ids)
  39
+while pending:
  40
+    try:
  41
+        rc.wait(pending, 1e-3)
  42
+    except parallel.TimeoutError:
  43
+        # ignore timeouterrors, since they only mean that at least one isn't done
  44
+        pass
  45
+    # finished is the set of msg_ids that are complete
  46
+    finished = pending.difference(rc.outstanding)
  47
+    # update pending to exclude those that just finished
  48
+    pending = pending.difference(finished)
  49
+    for msg_id in finished:
  50
+        # we know these are done, so don't worry about blocking
  51
+        ar = rc.get_result(msg_id)
  52
+        print "job id %s finished on engine %i" % (msg_id, ar.engine_id)
  53
+        print "with stdout:"
  54
+        print '    ' + ar.stdout.replace('\n', '\n    ').rstrip()
  55
+        print "and results:"
  56
+        
  57
+        # note that each job in a map always returns a list of length chunksize
  58
+        # even if chunksize == 1
  59
+        for (count,t) in ar.result:
  60
+            print "  item %i: slept for %.2fs" % (count, t)
  61
+
83  docs/examples/parallel/iopubwatcher.py
... ...
@@ -0,0 +1,83 @@
  1
+"""A script for watching all traffic on the IOPub channel (stdout/stderr/pyerr) of engines.
  2
+
  3
+This connects to the default cluster, or you can pass the path to your ipcontroller-client.json
  4
+
  5
+Try running this script, and then running a few jobs that print (and call sys.stdout.flush),
  6
+and you will see the print statements as they arrive, notably not waiting for the results
  7
+to finish.
  8
+
  9
+You can use the zeromq SUBSCRIBE mechanism to only receive information from specific engines,
  10
+and easily filter by message type.
  11
+
  12
+Authors
  13
+-------
  14
+* MinRK
  15
+"""
  16
+
  17
+import os
  18
+import sys
  19
+import json
  20
+import zmq
  21
+
  22
+from IPython.zmq.session import Session
  23
+from IPython.parallel.util import disambiguate_url
  24
+from IPython.utils.py3compat import str_to_bytes
  25
+from IPython.utils.path import get_security_file
  26
+
  27
+def main(connection_file):
  28
+    """watch iopub channel, and print messages"""
  29
+    
  30
+    ctx = zmq.Context.instance()
  31
+    
  32
+    with open(connection_file) as f:
  33
+        cfg = json.loads(f.read())
  34
+    
  35
+    location = cfg['location']
  36
+    reg_url = cfg['url']
  37
+    session = Session(key=str_to_bytes(cfg['exec_key']))
  38
+    
  39
+    query = ctx.socket(zmq.DEALER)
  40
+    query.connect(disambiguate_url(cfg['url'], location))
  41
+    session.send(query, "connection_request")
  42
+    idents,msg = session.recv(query, mode=0)
  43
+    c = msg['content']
  44
+    iopub_url = disambiguate_url(c['iopub'], location)
  45
+    sub = ctx.socket(zmq.SUB)
  46
+    # This will subscribe to all messages:
  47
+    sub.setsockopt(zmq.SUBSCRIBE, b'')
  48
+    # replace with b'' with b'engine.1.stdout' to subscribe only to engine 1's stdout
  49
+    # 0MQ subscriptions are simple 'foo*' matches, so 'engine.1.' subscribes
  50
+    # to everything from engine 1, but there is no way to subscribe to
  51
+    # just stdout from everyone.
  52
+    # multiple calls to subscribe will add subscriptions, e.g. to subscribe to
  53
+    # engine 1's stderr and engine 2's stdout:
  54
+    # sub.setsockopt(zmq.SUBSCRIBE, b'engine.1.stderr')
  55
+    # sub.setsockopt(zmq.SUBSCRIBE, b'engine.2.stdout')
  56
+    sub.connect(iopub_url)
  57
+    while True:
  58
+        try:
  59
+            idents,msg = session.recv(sub, mode=0)
  60
+        except KeyboardInterrupt:
  61
+            return
  62
+        # ident always length 1 here
  63
+        topic = idents[0]
  64
+        if msg['msg_type'] == 'stream':
  65
+            # stdout/stderr
  66
+            # stream names are in msg['content']['name'], if you want to handle
  67
+            # them differently
  68
+            print "%s: %s" % (topic, msg['content']['data'])
  69
+        elif msg['msg_type'] == 'pyerr':
  70
+            # Python traceback
  71
+            c = msg['content']
  72
+            print topic + ':'
  73
+            for line in c['traceback']:
  74
+                # indent lines
  75
+                print '    ' + line
  76
+
  77
+if __name__ == '__main__':
  78
+    if len(sys.argv) > 1:
  79
+        cf = sys.argv[1]
  80
+    else:
  81
+        # This gets the security file for the default profile:
  82
+        cf = get_security_file('ipcontroller-client.json')
  83
+    main(cf)
65  docs/examples/parallel/itermapresult.py
... ...
@@ -0,0 +1,65 @@
  1
+"""Example of iteration through AsyncMapResults, without waiting for all results
  2
+
  3
+When you call view.map(func, sequence), you will receive a special AsyncMapResult
  4
+object.  These objects are used to reconstruct the results of the split call.
  5
+One feature AsyncResults provide is that they are iterable *immediately*, so
  6
+you can iterate through the actual results as they complete.
  7
+
  8
+This is useful if you submit a large number of tasks that may take some time,
  9
+but want to perform logic on elements in the result, or even abort subsequent
  10
+tasks in cases where you are searching for the first affirmative result.
  11
+
  12
+By default, the results will match the ordering of the submitted sequence, but
  13
+if you call `map(...ordered=False)`, then results will be provided to the iterator
  14
+on a first come first serve basis.
  15
+
  16
+Authors
  17
+-------
  18
+* MinRK
  19
+"""
  20
+import time
  21
+
  22
+from IPython import parallel
  23
+
  24
+# create client & view
  25
+rc = parallel.Client()
  26
+dv = rc[:]
  27
+v = rc.load_balanced_view()
  28
+
  29
+# scatter 'id', so id=0,1,2 on engines 0,1,2
  30
+dv.scatter('id', rc.ids, flatten=True)
  31
+print "Engine IDs: ", dv['id']
  32
+
  33
+# create a Reference to `id`. This will be a different value on each engine
  34
+ref = parallel.Reference('id')
  35
+print "sleeping for `id` seconds on each engine"
  36
+tic = time.time()
  37
+ar = dv.apply(time.sleep, ref)
  38
+for i,r in enumerate(ar):
  39
+    print "%i: %.3f"%(i, time.time()-tic)
  40
+
  41
+def sleep_here(t):
  42
+    import time
  43
+    time.sleep(t)
  44
+    return id,t
  45
+
  46
+# one call per task
  47
+print "running with one call per task"
  48
+amr = v.map(sleep_here, [.01*t for t in range(100)])
  49
+tic = time.time()
  50
+for i,r in enumerate(amr):
  51
+    print "task %i on engine %i: %.3f" % (i, r[0], time.time()-tic)
  52
+
  53
+print "running with four calls per task"
  54
+# with chunksize, we can have four calls per task
  55
+amr = v.map(sleep_here, [.01*t for t in range(100)], chunksize=4)
  56
+tic = time.time()
  57
+for i,r in enumerate(amr):
  58
+    print "task %i on engine %i: %.3f" % (i, r[0], time.time()-tic)
  59
+
  60
+print "running with two calls per task, with unordered results"
  61
+# We can even iterate through faster results first, with ordered=False
  62
+amr = v.map(sleep_here, [.01*t for t in range(100,0,-1)], ordered=False, chunksize=2)
  63
+tic = time.time()
  64
+for i,r in enumerate(amr):
  65
+    print "slept %.2fs on engine %i: %.3f" % (r[1], r[0], time.time()-tic)
36  docs/source/parallel/dag_dependencies.txt
@@ -10,7 +10,7 @@ for working with Graphs is NetworkX_.  Here, we will walk through a demo mapping
10 10
 a nx DAG to task dependencies.
11 11
 
12 12
 The full script that runs this demo can be found in
13  
-:file:`docs/examples/newparallel/dagdeps.py`.
  13
+:file:`docs/examples/parallel/dagdeps.py`.
14 14
 
15 15
 Why are DAGs good for task dependencies?
16 16
 ----------------------------------------
@@ -30,7 +30,8 @@ A Sample DAG
30 30
 
31 31
 Here, we have a very simple 5-node DAG:
32 32
 
33  
-.. figure:: simpledag.*
  33
+.. figure:: figs/simpledag.*
  34
+    :width: 600px
34 35
 
35 36
 With NetworkX, an arrow is just a fattened bit on the edge. Here, we can see that task 0
36 37
 depends on nothing, and can run immediately. 1 and 2 depend on 0; 3 depends on
@@ -80,7 +81,7 @@ The code to generate the simple DAG:
80 81
 For demonstration purposes, we have a function that generates a random DAG with a given
81 82
 number of nodes and edges.
82 83
 
83  
-.. literalinclude:: ../../examples/newparallel/dagdeps.py
  84
+.. literalinclude:: ../../examples/parallel/dagdeps.py
84 85
     :language: python
85 86
     :lines: 20-36
86 87
 
@@ -117,11 +118,13 @@ on which it depends:
117 118
     In [6]: results = {}
118 119
     
119 120
     In [7]: for node in G.topological_sort():
120  
-       ...:     # get list of AsyncResult objects from nodes
121  
-       ...:     # leading into this one as dependencies
122  
-       ...:     deps = [ results[n] for n in G.predecessors(node) ]
123  
-       ...:     # submit and store AsyncResult object
124  
-       ...:     results[node] = view.apply_with_flags(jobs[node], after=deps, block=False)
  121
+       ...:    # get list of AsyncResult objects from nodes
  122
+       ...:    # leading into this one as dependencies
  123
+       ...:    deps = [ results[n] for n in G.predecessors(node) ]
  124
+       ...:    # submit and store AsyncResult object
  125
+       ...:    with view.temp_flags(after=deps, block=False):
  126
+       ...:         results[node] = view.apply_with_flags(jobs[node])
  127
+
125 128
 
126 129
 Now that we have submitted all the jobs, we can wait for the results:
127 130
 
@@ -137,7 +140,7 @@ These objects store a variety of metadata about each task, including various tim
137 140
 We can validate that the dependencies were respected by checking that each task was
138 141
 started after all of its predecessors were completed:
139 142
 
140  
-.. literalinclude:: ../../examples/newparallel/dagdeps.py
  143
+.. literalinclude:: ../../examples/parallel/dagdeps.py
141 144
     :language: python
142 145
     :lines: 64-70
143 146
 
@@ -155,16 +158,17 @@ will be at the top, and quick, small tasks will be at the bottom.
155 158
     In [12]: pos = {}; colors = {}
156 159
     
157 160
     In [12]: for node in G:
158  
-        ...:    md = results[node].metadata
159  
-        ...:    start = date2num(md.started)
160  
-        ...:    runtime = date2num(md.completed) - start
161  
-        ...:    pos[node] = (start, runtime)
162  
-        ...:    colors[node] = md.engine_id
  161
+       ....:    md = results[node].metadata
  162
+       ....:    start = date2num(md.started)
  163
+       ....:    runtime = date2num(md.completed) - start
  164
+       ....:    pos[node] = (start, runtime)
  165
+       ....:    colors[node] = md.engine_id
163 166
     
164 167
     In [13]: nx.draw(G, pos, node_list=colors.keys(), node_color=colors.values(),
165  
-        ...:    cmap=gist_rainbow)
  168
+       ....:    cmap=gist_rainbow)
166 169
 
167  
-.. figure:: dagdeps.*
  170
+.. figure:: figs/dagdeps.*
  171
+    :width: 600px
168 172
 
169 173
     Time started on x, runtime on y, and color-coded by engine-id (in this case there
170 174
     were four engines). Edges denote dependencies.
0  docs/source/parallel/asian_call.pdf → docs/source/parallel/figs/asian_call.pdf
File renamed without changes
0  docs/source/parallel/asian_call.png → docs/source/parallel/figs/asian_call.png
File renamed without changes
0  docs/source/parallel/asian_put.pdf → docs/source/parallel/figs/asian_put.pdf
File renamed without changes
0  docs/source/parallel/asian_put.png → docs/source/parallel/figs/asian_put.png
File renamed without changes
0  docs/source/parallel/dagdeps.pdf → docs/source/parallel/figs/dagdeps.pdf
File renamed without changes
0  docs/source/parallel/dagdeps.png → docs/source/parallel/figs/dagdeps.png
File renamed without changes
0  docs/source/parallel/hpc_job_manager.pdf → docs/source/parallel/figs/hpc_job_manager.pdf
File renamed without changes
0  docs/source/parallel/hpc_job_manager.png → docs/source/parallel/figs/hpc_job_manager.png
File renamed without changes
0  docs/source/parallel/ipcluster_create.pdf → docs/source/parallel/figs/ipcluster_create.pdf
File renamed without changes
0  docs/source/parallel/ipcluster_create.png → docs/source/parallel/figs/ipcluster_create.png
File renamed without changes
0  docs/source/parallel/ipcluster_start.pdf → docs/source/parallel/figs/ipcluster_start.pdf
File renamed without changes
0  docs/source/parallel/ipcluster_start.png → docs/source/parallel/figs/ipcluster_start.png
File renamed without changes
0  docs/source/parallel/ipython_shell.pdf → docs/source/parallel/figs/ipython_shell.pdf
File renamed without changes
0  docs/source/parallel/ipython_shell.png → docs/source/parallel/figs/ipython_shell.png
File renamed without changes
0  docs/source/parallel/mec_simple.pdf → docs/source/parallel/figs/mec_simple.pdf
File renamed without changes
0  docs/source/parallel/mec_simple.png → docs/source/parallel/figs/mec_simple.png
File renamed without changes
0  docs/source/parallel/parallel_pi.pdf → docs/source/parallel/figs/parallel_pi.pdf
File renamed without changes
0  docs/source/parallel/parallel_pi.png → docs/source/parallel/figs/parallel_pi.png
File renamed without changes
0 