Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Adding parallel tutorial notebooks.

  • Loading branch information...
commit 860113bbdd766ca9a55f7c2376374a7a741267f7 1 parent 296eb30
@ellisonbg ellisonbg authored
Showing with 7,213 additions and 0 deletions.
  1. BIN  figures/allconnections.png
  2. BIN  figures/darts.png
  3. BIN  figures/latency.png
  4. BIN  figures/latency2.png
  5. BIN  figures/map.png
  6. BIN  figures/throughput1.png
  7. BIN  figures/throughput2.png
  8. BIN  figures/wideView.png
  9. BIN  figures/wideView400.png
  10. +446 −0 parallel/Part 1 - Overview and Architecture.ipynb
  11. +1,607 −0 parallel/Part 2 - Direct Interface.ipynb
  12. +1,571 −0 parallel/Part 3 - Parallel Magics.ipynb
  13. +1,017 −0 parallel/Part 4 - Load Balancing.ipynb
  14. +224 −0 parallel/Part 5 - All Together Now.ipynb
  15. +517 −0 parallel/Part 6 - Example - Remote Iteration.ipynb
  16. +589 −0 parallel/Part 7 - Working with MPI.ipynb
  17. +752 −0 parallel/Part 8 - Performance.ipynb
  18. +56 −0 parallel/Part 9 - Summary.ipynb
  19. +8 −0 parallel/myscript.py
  20. +5 −0 parallel/soln/italicstr.py
  21. +7 −0 parallel/soln/lnum.py
  22. +21 −0 parallel/soln/nestedloop.py
  23. +16 −0 parallel/soln/remote_iter.py
  24. +17 −0 parallel/soln/remote_iter_hint.py
  25. +16 −0 parallel/soln/remote_iter_slightly_better.py
  26. +3 −0  parallel/soln/taylor.py
  27. +341 −0 parallel/text_analysis.py
View
BIN  figures/allconnections.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
View
BIN  figures/darts.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
View
BIN  figures/latency.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
View
BIN  figures/latency2.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
View
BIN  figures/map.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
View
BIN  figures/throughput1.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
View
BIN  figures/throughput2.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
View
BIN  figures/wideView.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
View
BIN  figures/wideView400.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
View
446 parallel/Part 1 - Overview and Architecture.ipynb
@@ -0,0 +1,446 @@
+{
+ "metadata": {
+ "name": "Part 1 - Overview and Architecture"
+ },
+ "nbformat": 3,
+ "nbformat_minor": 0,
+ "worksheets": [
+ {
+ "cells": [
+ {
+ "cell_type": "heading",
+ "level": 1,
+ "metadata": {},
+ "source": [
+ "Overview and getting started"
+ ]
+ },
+ {
+ "cell_type": "heading",
+ "level": 2,
+ "metadata": {},
+ "source": [
+ "Introduction"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "IPython is a tool for interactive and exploratory computing.\n",
+ "We have seen that IPython's kernel provides a mechanism for interactive\n",
+ "*remote* computation, and we have extended this same mechanism for\n",
+ "interactive remote *parallel* computation, simply by having multiple\n",
+ "kernels."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "![wideview](files/figures/wideView400.png)"
+ ]
+ },
+ {
+ "cell_type": "heading",
+ "level": 2,
+ "metadata": {},
+ "source": [
+ "IPython.parallel Glossary"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "* Controller - collection of processes coordinating an IPython cluster\n",
+ "* Hub - central controller process that monitors and manages the setate of the cluster\n",
+ "* Schedulers - controller processes that relay messages between clients and engines\n",
+ "\n",
+ "* Kernel - an IPython instance where you run your code\n",
+ "* Engine - special case of a Kernel used in IPython.parallel, which connects instead of listens\n",
+ "\n",
+ "* Client - user-level object that manages communication with a cluster\n",
+ "* View - user-level object that presents the API for execution\n",
+ "* AsyncResult - object that wraps pending results\n",
+ "* apply - View method for calling a function on engine(s)\n",
+ "* execute - View method for running code-as-text on engine(s)\n",
+ "* push - View method for sending data\n",
+ "* pull - View method for retrieving data\n",
+ "\n"
+ ]
+ },
+ {
+ "cell_type": "heading",
+ "level": 2,
+ "metadata": {},
+ "source": [
+ "Architecture overview"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "At a high level, there are three basic components to parallel IPython:\n",
+ "\n",
+ "* Engine(s) - the remote or distributed processes where your code runs.\n",
+ "* Client - your interface to running code on Engines.\n",
+ "* Controller - the collection of processes that coordinate Engines and Clients.\n",
+ "\n",
+ "These components live in the `IPython.parallel` package and are installed with IPython."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### IPython engine\n",
+ "\n",
+ "The Engine is simply a remote Python namespace where your code is run,\n",
+ "and is identical to the kernel used elsewhere in IPython.\n",
+ "\n",
+ "It can do all the magics, pylab integration, and everything else you can do in a regular IPython session."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### IPython controller\n",
+ "\n",
+ "The Controller is a collection of processes:\n",
+ "\n",
+ "* Schedulers relay messages between Engines and Clients.\n",
+ "* The Hub monitors the cluster state.\n",
+ "\n",
+ "Together, these processes provide a single connection point for your clients and engines.\n",
+ "Each Scheduler is a small GIL-less function in C provided by pyzmq (the Python load-balancing scheduler being an exception).\n",
+ "\n",
+ "The Hub can be viewed as an \u00fcber-logger,\n",
+ "which monitors all communication between clients and engines,\n",
+ "and can log to a database (e.g. SQLite or MongoDB) for later retrieval or resubmission.\n",
+ "The Hub is not involved in execution in any way,\n",
+ "and a slow Hub cannot slow down submission of tasks."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "#### Schedulers\n",
+ "\n",
+ "All actions that can be performed on the engine go through a Scheduler.\n",
+ "While the engines themselves block when user code is run,\n",
+ "the schedulers hide that from the user to provide a fully asynchronous interface to a set of engines. "
+ ]
+ },
+ {
+ "cell_type": "heading",
+ "level": 2,
+ "metadata": {},
+ "source": [
+ "\u00d8MQ and PyZMQ"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "All of this is implemented with the lovely \u00d8MQ messaging library,\n",
+ "and pyzmq, the lightweight Python bindings, which allows very fast\n",
+ "zero-copy communication of objects like numpy arrays."
+ ]
+ },
+ {
+ "cell_type": "heading",
+ "level": 2,
+ "metadata": {},
+ "source": [
+ "IPython client and views"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "There is one primary object, the `Client`, for connecting to a cluster.\n",
+ "For each execution model there is a corresponding `View`,\n",
+ "and you determine how your work should be executed on the cluster by creating different views\n",
+ "or manipulating attributes of views.\n",
+ "\n",
+ "The two basic views:\n",
+ "\n",
+ "- The `DirectView` class for explicitly running code on particular engine(s).\n",
+ "- The `LoadBalancedView` class for destination-agnostic scheduling.\n",
+ "\n",
+ "You can use as many views of each kind as you like, all at the same time."
+ ]
+ },
+ {
+ "cell_type": "heading",
+ "level": 1,
+ "metadata": {},
+ "source": [
+ "Getting Started"
+ ]
+ },
+ {
+ "cell_type": "heading",
+ "level": 2,
+ "metadata": {},
+ "source": [
+ "Starting the IPython controller and engines"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "To follow along with this tutorial, you will need to start the IPython\n",
+ "controller and four IPython engines. The simplest way of doing this is\n",
+ "to use the `ipcluster` command:\n",
+ "\n",
+ " $ ipcluster start -n 4\n",
+ "\n",
+ "There isn't time to go into it here, but ipcluster can be used to start engines\n",
+ "and the controller with various batch systems including:\n",
+ "\n",
+ "* SGE\n",
+ "* PBS\n",
+ "* LSF\n",
+ "* MPI\n",
+ "* SSH\n",
+ "* WinHPC\n",
+ "\n",
+ "More information on starting and configuring the IPython cluster in \n",
+ "[the IPython.parallel docs](http://ipython.org/ipython-doc/dev/parallel/parallel_process.html).\n",
+ "\n",
+ "Once you have started the IPython controller and one or more engines,\n",
+ "you are ready to use the engines to do something useful. \n",
+ "\n",
+ "To make sure everything is working correctly, let's do a very simple demo:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "from IPython import parallel\n",
+ "rc = parallel.Client()\n",
+ "rc.block = True"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [],
+ "prompt_number": 10
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "rc.ids"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 11,
+ "text": [
+ "[0, 1, 2, 3]"
+ ]
+ }
+ ],
+ "prompt_number": 11
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Let's define a simple function"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "def mul(a,b):\n",
+ " return a*b"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [],
+ "prompt_number": 12
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "mul(5,6)"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 13,
+ "text": [
+ "30"
+ ]
+ }
+ ],
+ "prompt_number": 13
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "What does it look like to call this function remotely?\n",
+ "\n",
+ "Just turn `f(*args, **kwargs)` into `view.apply(f, *args, **kwargs)`!"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "rc[0].apply(mul, 5, 6)"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 14,
+ "text": [
+ "30"
+ ]
+ }
+ ],
+ "prompt_number": 14
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "And the same thing in parallel?"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "rc[:].apply(mul, 5, 6)"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 15,
+ "text": [
+ "[30, 30, 30, 30]"
+ ]
+ }
+ ],
+ "prompt_number": 15
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Python has a builtin map for calling a function with a sequence of arguments"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "map(mul, range(1,10), range(2,11))"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 16,
+ "text": [
+ "[2, 6, 12, 20, 30, 42, 56, 72, 90]"
+ ]
+ }
+ ],
+ "prompt_number": 16
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "So how do we do this in parallel?"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "view = rc.load_balanced_view()\n",
+ "view.map(mul, range(1,10), range(2,11))"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 17,
+ "text": [
+ "[2, 6, 12, 20, 30, 42, 56, 72, 90]"
+ ]
+ }
+ ],
+ "prompt_number": 17
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "I'll go into further detail about different views, and asynchronous communication later.\n",
+ "First, let's peek at the performance of the IPython.parallel infrastructure, \n",
+ "so we can see what level of granularity we can consider using."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Map\n",
+ "\n",
+ "A simple test of how big your tasks need to be is to run a simple map\n",
+ "with minimal tasks of a certain time (e.g. `time.sleep()`).\n",
+ "\n",
+ "These tests were done on [AWS](http://aws.amazon.com/) extra-large\n",
+ "instances with the help of\n",
+ "[StarCluster](http://web.mit.edu/stardev/cluster/), so the IO and CPU\n",
+ "performance are not impressive compared to a physical cluster.\n",
+ "\n",
+ "![map](files/figures/map.png)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "This shows runs for jobs ranging from 1 to 128 ms, on 4,31,and 63\n",
+ "engines. On this system, millisecond jobs are clearly too small, but by\n",
+ "the time individual tasks are \\> 100 ms, IPython overhead is negligible.\n",
+ "\n",
+ "For more details and to see how to generate these metrics on your own systems, \n",
+ "you can view and run the Performance notebook.\n",
+ "\n",
+ "Now let's see how we use it for remote execution."
+ ]
+ }
+ ],
+ "metadata": {}
+ }
+ ]
+}
View
1,607 parallel/Part 2 - Direct Interface.ipynb
@@ -0,0 +1,1607 @@
+{
+ "metadata": {
+ "name": "Part 2 - Direct Interface"
+ },
+ "nbformat": 3,
+ "nbformat_minor": 0,
+ "worksheets": [
+ {
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# The direct interface for remote computation with IPython"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "A Client is the low-level object which manages your connection to the various Schedulers and the Hub.\n",
+ "Everything you do passes through one of these objects, either indirectly or directly."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "import os,sys,time\n",
+ "import numpy\n",
+ "import numpy as np\n",
+ "\n",
+ "from IPython.core.display import display, Math\n",
+ "\n",
+ "from IPython import parallel\n",
+ "rc = parallel.Client()\n",
+ "rc.block = True # let's start synchronous"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [],
+ "prompt_number": 1
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "It has an `ids` property, which is always an up-to-date list of the integer engine IDs currently available."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "rc.ids"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 2,
+ "text": [
+ "[0, 1, 2, 3]"
+ ]
+ }
+ ],
+ "prompt_number": 2
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "The most basic function of the Client is to create the View objects,\n",
+ "which are the interfaces for actual communication with the engines.\n",
+ "\n",
+ "There are two basic models for working with engines: Direct and LoadBalanced.\n",
+ "Let's start with the DirectView.\n",
+ "\n",
+ "The simplest case for remote execution, a DirectView of one engine, and another of all engines:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "# index-access of a client gives us a DirectView\n",
+ "e0 = rc[0]\n",
+ "eall = rc[:]\n",
+ "even = rc[::2]\n",
+ "odd = rc[1::2]\n",
+ "\n",
+ "# this is the one we are going to use the most:\n",
+ "dview = eall\n",
+ "map(display, [e0, eall, even, odd]);"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "display_data",
+ "text": [
+ "<DirectView 0>"
+ ]
+ },
+ {
+ "output_type": "display_data",
+ "text": [
+ "<DirectView [0, 1, 2, 3]>"
+ ]
+ },
+ {
+ "output_type": "display_data",
+ "text": [
+ "<DirectView [0, 2]>"
+ ]
+ },
+ {
+ "output_type": "display_data",
+ "text": [
+ "<DirectView [1, 3]>"
+ ]
+ }
+ ],
+ "prompt_number": 3
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Now, the only difference from single-engine remote execution is that the code we run happens on all of the engines of a given view:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "for view in (e0, eall, even, odd):\n",
+ " print view, view.apply_sync(os.getpid)"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "<DirectView 0> 74967\n",
+ "<DirectView [0, 1, 2, 3]> "
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "[74967, 74968, 74974, 74973]\n",
+ "<DirectView [0, 2]> [74967, 74974]\n",
+ "<DirectView [1, 3]> "
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "[74968, 74973]\n"
+ ]
+ }
+ ],
+ "prompt_number": 4
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "So how do we use these Views to run code?\n",
+ "\n",
+ "It's all about:\n",
+ "\n",
+ " view.apply(f, *args, **kwargs)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "We want the interface for remote and parallel execution to be as natural as possible.\n",
+ "And what's the most natural unit of execution? Code! Simply define a function,\n",
+ "just as you would use locally, and instead of calling it, pass it to `view.apply()`,\n",
+ "with the remaining arguments just as you would have passed them to the function."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "def get_norms(A, levels=[2]):\n",
+ " \"\"\"get all the requested norms for an array\"\"\"\n",
+ " norms = []\n",
+ " for level in levels:\n",
+ " norms.append(numpy.linalg.norm(A, level))\n",
+ " return norms\n",
+ "\n",
+ "A = numpy.linspace(0, numpy.pi, 1024)"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [],
+ "prompt_number": 5
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "get_norms( A, levels=[1,2,3,numpy.inf])"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 6,
+ "text": [
+ "[1608.4954386379741,\n",
+ " 58.055762082012201,\n",
+ " 19.954367610618046,\n",
+ " 3.1415926535897931]"
+ ]
+ }
+ ],
+ "prompt_number": 6
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "To call this remotely, simply replace '`get_norms(`' with '`view.apply(get_norms,`':"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "basic = In[6]\n",
+ "remote = basic.replace('get_norms(', 'e0.apply(get_norms,')\n",
+ "print ' ' * (len(remote) - len(basic)) + basic\n",
+ "print remote\n",
+ "ip = get_ipython()\n",
+ "ip.set_next_input(remote)"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ " get_norms( A, levels=[1,2,3,numpy.inf])\n",
+ "e0.apply(get_norms, A, levels=[1,2,3,numpy.inf])\n"
+ ]
+ }
+ ],
+ "prompt_number": 8
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "e0.apply(get_norms, A, levels=[1,2,3,numpy.inf])"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "ename": "RemoteError",
+ "evalue": "NameError(global name 'numpy' is not defined)",
+ "output_type": "pyerr",
+ "traceback": [
+ "\u001b[1;31m---------------------------------------------------------------------------\u001b[0m",
+ "\u001b[1;31mNameError\u001b[0m Traceback (most recent call last)\u001b[1;32m<string>\u001b[0m in \u001b[0;36m<module>\u001b[1;34m()\u001b[0m",
+ "\u001b[1;32m<ipython-input-5-c22516ec5624>\u001b[0m in \u001b[0;36mget_norms\u001b[1;34m(A, levels)\u001b[0m",
+ "\u001b[1;31mNameError\u001b[0m: global name 'numpy' is not defined"
+ ]
+ }
+ ],
+ "prompt_number": 9
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "This replacement is generally true for turning (almost) any local function call into a remote one.\n",
+ "\n",
+ "Note that this will probably raise a NameError on numpy. The simplest way to import numpy is to do:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "e0.execute(\"import numpy\")"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 10,
+ "text": [
+ "<AsyncResult: finished>"
+ ]
+ }
+ ],
+ "prompt_number": 10
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Or you can use the `%px` magic to import numpy on all engines:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "%px import numpy"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [],
+ "prompt_number": 11
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Parallel magics will be discussed in more detail a little later."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Functions don\u2019t have to be interactively defined, you can call module functions remotely as well:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "e0.apply(numpy.linalg.norm, A, 2)"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 12,
+ "text": [
+ "58.055762082012201"
+ ]
+ }
+ ],
+ "prompt_number": 12
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "e0.apply(os.getpid)"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 13,
+ "text": [
+ "74967"
+ ]
+ }
+ ],
+ "prompt_number": 13
+ },
+ {
+ "cell_type": "heading",
+ "level": 4,
+ "metadata": {},
+ "source": [
+ "In Parallel"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "We can do the same thing with the DirectView of all engines.\n",
+ "\n",
+ "The DirectView can be readily understood as an Engine Multiplexer -\n",
+ "it does the same thing on all of its engines.\n",
+ "\n",
+ "The only difference between running code on a single remote engine\n",
+ "and running code in parallel is how many engines the DirectView is\n",
+ "instructed to use."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "dview.apply(numpy.linalg.norm, A, 2)"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 14,
+ "text": [
+ "[58.055762082012201,\n",
+ " 58.055762082012201,\n",
+ " 58.055762082012201,\n",
+ " 58.055762082012201]"
+ ]
+ }
+ ],
+ "prompt_number": 14
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "dview.apply(os.getpid)"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 15,
+ "text": [
+ "[74967, 74968, 74974, 74973]"
+ ]
+ }
+ ],
+ "prompt_number": 15
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## execute and run\n",
+ "\n",
+ "You can also run files or strings with `run` and `execute`\n",
+ "respectively.\n",
+ "\n",
+ "For instance, I have a script `myscript.py` that defines a function\n",
+ "`mysquare`:\n",
+ "\n",
+ " def mysquare(x):\n",
+ " return x*x\n",
+ "\n",
+ "I can run that remotely, just like I can locally with `%run`, and then I\n",
+ "will have `mysquare()`, and any imports and globals from the script in the\n",
+ "engine's namespace:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "%pycat myscript.py"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [],
+ "prompt_number": 17
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "e0.run(\"myscript.py\")"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 18,
+ "text": [
+ "<AsyncResult: finished>"
+ ]
+ }
+ ],
+ "prompt_number": 18
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "e0.execute(\"b=mysquare(a)\")"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 19,
+ "text": [
+ "<AsyncResult: finished>"
+ ]
+ }
+ ],
+ "prompt_number": 19
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "e0['a']"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 20,
+ "text": [
+ "5"
+ ]
+ }
+ ],
+ "prompt_number": 20
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "e0['b']"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 21,
+ "text": [
+ "25"
+ ]
+ }
+ ],
+ "prompt_number": 21
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "The `%px` and `%%px` magics correspond directly to a View.execute call."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "%px import os\n",
+ "%px pid = os.getpid()"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [],
+ "prompt_number": 22
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "dview['pid']"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 23,
+ "text": [
+ "[74967, 74968, 74974, 74973]"
+ ]
+ }
+ ],
+ "prompt_number": 23
+ },
+ {
+ "cell_type": "heading",
+ "level": 2,
+ "metadata": {},
+ "source": [
+ "Detail: Working with the engine namespace"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "The namespace on the engine is accessible to your functions as `globals`.\n",
+ "So if you want to work with values that persist in the engine namespace,\n",
+ "you just use global variables."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "def inc_a(increment):\n",
+ " global a\n",
+ " a += increment\n",
+ "\n",
+ "print \" %2i\" % e0['a']\n",
+ "for i in range(3):\n",
+ " e0.apply(inc_a, 5)\n",
+ " print \" + 5\"\n",
+ " print \" = %2i\" % e0['a']\n",
+ " print\n"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "And just like the rest of Python, you don\u2019t have to specify global variables if you aren\u2019t assigning to them:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "def mul_by_a(b):\n",
+ " return a*b\n",
+ "\n",
+ "e0.apply(mul_by_a, 10)"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "e0['a'] = 12\n",
+ "e0.apply(mul_by_a, 10)"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "If you want to do multiple actions on data, you obviously don\u2019t want to send it every time.\n",
+ "For this, we have a `Reference` class.\n",
+ "A Reference is just a wrapper for an identifier that gets unserialized by pulling the corresponding object out of the engine namespace."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "def is_it_a(b):\n",
+ " return a is b\n",
+ "\n",
+ "e0.apply(is_it_a, 5)"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "e0.apply(is_it_a, parallel.Reference('a'))"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Which essentially ran the following code on the engine:\n",
+ "\n",
+ " _ref_uuid_a = a\n",
+ " is_it_a(_ref_uuid_a)"
+ ]
+ },
+ {
+ "cell_type": "heading",
+ "level": 2,
+ "metadata": {},
+ "source": [
+ "Moving data around"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "In addition to calling functions and executing code on engines,\n",
+ "you can transfer Python objects to and from your IPython session and the engines.\n",
+ "In IPython, these operations are called `push` (sending an object to the engines)\n",
+ "and `pull` (getting an object from the engines).\n",
+ "\n",
+ "push takes a dictionary, used to update the remote namespace:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "e0.push(dict(a=1.03234,b=3453))"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [],
+ "prompt_number": 24
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "pull takes one or more keys:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "e0.pull('a')"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 25,
+ "text": [
+ "1.03234"
+ ]
+ }
+ ],
+ "prompt_number": 25
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "e0.pull(('b','a'))"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 26,
+ "text": [
+ "[3453, 1.03234]"
+ ]
+ }
+ ],
+ "prompt_number": 26
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Push to multiple engines will push the same data to all engines."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "dview.push(dict(a=1.03234,b=3453,c=123));"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [],
+ "prompt_number": 27
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "And pull returns a list of the same length as the number of engines:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "dview.pull('pid')"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 28,
+ "text": [
+ "[74967, 74968, 74974, 74973]"
+ ]
+ }
+ ],
+ "prompt_number": 28
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "dview.pull(('c','pid'))"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 29,
+ "text": [
+ "[[123, 74967], [123, 74968], [123, 74974], [123, 74973]]"
+ ]
+ }
+ ],
+ "prompt_number": 29
+ },
+ {
+ "cell_type": "heading",
+ "level": 3,
+ "metadata": {},
+ "source": [
+ "Dictionary interface"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "treating a DirectView like a dictionary results in push/pull operations:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "e0['a'] = range(5)\n",
+ "e0.execute('b = a[::-1]')\n",
+ "e0['b']"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 30,
+ "text": [
+ "[4, 3, 2, 1, 0]"
+ ]
+ }
+ ],
+ "prompt_number": 30
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "%px pid = os.getpid()\n",
+ "dview['pid']"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 31,
+ "text": [
+ "[74967, 74968, 74974, 74973]"
+ ]
+ }
+ ],
+ "prompt_number": 31
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "`get()` and `update()` work as well."
+ ]
+ },
+ {
+ "cell_type": "heading",
+ "level": 3,
+ "metadata": {},
+ "source": [
+ "scatter/gather"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "push/pull move whole objects around,\n",
+ "but sometimes you want to partition objects across the engines.\n",
+ "\n",
+ "DirectViews have `scatter()` and `gather()` methods, to help with this.\n",
+ "Pass any container or numpy array, and IPython will partition the object onto the engines wih `scatter`,\n",
+ "or reconstruct the full object in the Client with `gather()`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "dview.scatter('a', range(16))\n",
+ "dview['a']"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 32,
+ "text": [
+ "[[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]]"
+ ]
+ }
+ ],
+ "prompt_number": 32
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "dview.gather('a')"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 33,
+ "text": [
+ "[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]"
+ ]
+ }
+ ],
+ "prompt_number": 33
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "dview.execute(\"asum = sum(a)\")\n",
+ "dview.gather('asum')"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 34,
+ "text": [
+ "[6, 22, 38, 54]"
+ ]
+ }
+ ],
+ "prompt_number": 34
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "We can pass a 'flatten' keyword,\n",
+ "to instruct engines that will only get one item of the list to\n",
+ "get the actual item, rather than a one-element sublist."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "dview.scatter('rank', rc.ids)\n",
+ "dview['rank']"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 35,
+ "text": [
+ "[[0], [1], [2], [3]]"
+ ]
+ }
+ ],
+ "prompt_number": 35
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "dview.scatter('rank', rc.ids, flatten=True)\n",
+ "dview['rank']"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 36,
+ "text": [
+ "[0, 1, 2, 3]"
+ ]
+ }
+ ],
+ "prompt_number": 36
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "This particular call is very common when you want MPI-style switching for the behavior of code depending on then engine."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "A simple scatter/gather of numpy arrays:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "A = np.random.randint(1,10,(16,4))\n",
+ "B = np.random.randint(1,10,(4,16))\n",
+ "display(Math('A ='))\n",
+ "display(A)\n",
+ "display(Math('B ='))\n",
+ "display(B)"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "latex": [
+ "$$A =$$"
+ ],
+ "output_type": "display_data",
+ "text": [
+ "<IPython.core.display.Math at 0x10ba8c650>"
+ ]
+ },
+ {
+ "output_type": "display_data",
+ "text": [
+ "array([[9, 4, 2, 3],\n",
+ " [4, 3, 7, 1],\n",
+ " [5, 7, 5, 2],\n",
+ " [8, 4, 5, 7],\n",
+ " [5, 4, 2, 5],\n",
+ " [4, 4, 7, 7],\n",
+ " [2, 1, 3, 6],\n",
+ " [9, 4, 4, 5],\n",
+ " [2, 2, 4, 7],\n",
+ " [3, 1, 2, 7],\n",
+ " [7, 8, 9, 8],\n",
+ " [1, 9, 5, 2],\n",
+ " [8, 9, 9, 1],\n",
+ " [2, 8, 5, 8],\n",
+ " [3, 8, 4, 9],\n",
+ " [2, 7, 5, 2]])"
+ ]
+ },
+ {
+ "latex": [
+ "$$B =$$"
+ ],
+ "output_type": "display_data",
+ "text": [
+ "<IPython.core.display.Math at 0x10ba8c650>"
+ ]
+ },
+ {
+ "output_type": "display_data",
+ "text": [
+ "array([[4, 5, 8, 8, 7, 4, 7, 6, 3, 5, 2, 7, 5, 9, 3, 7],\n",
+ " [6, 1, 6, 2, 2, 1, 4, 5, 4, 2, 9, 4, 6, 3, 6, 2],\n",
+ " [1, 5, 1, 9, 6, 8, 9, 3, 7, 9, 1, 7, 6, 3, 4, 4],\n",
+ " [3, 6, 5, 1, 8, 2, 7, 4, 9, 1, 7, 3, 6, 9, 1, 1]])"
+ ]
+ }
+ ],
+ "prompt_number": 37
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "dview.scatter('A', A)\n",
+ "dview.scatter('B', B)\n",
+ "display(Math('A_0 ='))\n",
+ "display(e0['A'])\n",
+ "display(Math('B_0 ='))\n",
+ "display(e0['B'])"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "latex": [
+ "$$A_0 =$$"
+ ],
+ "output_type": "display_data",
+ "text": [
+ "<IPython.core.display.Math at 0x10ba82590>"
+ ]
+ },
+ {
+ "output_type": "display_data",
+ "text": [
+ "array([[9, 4, 2, 3],\n",
+ " [4, 3, 7, 1],\n",
+ " [5, 7, 5, 2],\n",
+ " [8, 4, 5, 7]])"
+ ]
+ },
+ {
+ "latex": [
+ "$$B_0 =$$"
+ ],
+ "output_type": "display_data",
+ "text": [
+ "<IPython.core.display.Math at 0x10ba82590>"
+ ]
+ },
+ {
+ "output_type": "display_data",
+ "text": [
+ "array([[4, 5, 8, 8, 7, 4, 7, 6, 3, 5, 2, 7, 5, 9, 3, 7]])"
+ ]
+ }
+ ],
+ "prompt_number": 38
+ },
+ {
+ "cell_type": "heading",
+ "level": 2,
+ "metadata": {},
+ "source": [
+ "Asynchronous execution"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "We have covered the basic methods for running code remotely, but we have been using `block=True`. We can also do non-blocking execution (this wouldn't be especially useful if we couldn't!)."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "e0.block = dview.block = False"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [],
+ "prompt_number": 39
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "In non-blocking mode, `apply` submits the command to be executed and\n",
+ "then returns a `AsyncResult` object immediately. The `AsyncResult`\n",
+ "object gives you a way of getting a result at a later time through its\n",
+ "`get()` method.\n",
+ "\n",
+ "The AsyncResult object provides a superset of the interface in [`multiprocessing.pool.AsyncResult`](http://docs.python.org/library/multiprocessing#multiprocessing.pool.AsyncResult).\n",
+ "See the official Python documentation for more."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "def wait(t):\n",
+ " import time\n",
+ " tic = time.time()\n",
+ " time.sleep(t)\n",
+ " return time.time()-tic"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [],
+ "prompt_number": 40
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "ar = e0.apply(wait, 10)\n",
+ "ar"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 41,
+ "text": [
+ "<AsyncResult: wait>"
+ ]
+ }
+ ],
+ "prompt_number": 41
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "`ar.ready()` tells us if the result is ready"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "ar.ready()"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 42,
+ "text": [
+ "False"
+ ]
+ }
+ ],
+ "prompt_number": 42
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "`ar.get()` blocks until the result is ready, or a timeout is reached, if one is specified"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "%time ar.get(1)"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "ename": "TimeoutError",
+ "evalue": "Result not ready.",
+ "output_type": "pyerr",
+ "traceback": [
+ "\u001b[1;31m---------------------------------------------------------------------------\u001b[0m\n\u001b[1;31mTimeoutError\u001b[0m Traceback (most recent call last)",
+ "\u001b[1;32m<ipython-input-43-17b98a143e5c>\u001b[0m in \u001b[0;36m<module>\u001b[1;34m()\u001b[0m\n\u001b[1;32m----> 1\u001b[1;33m \u001b[0mget_ipython\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mmagic\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;34mu'time ar.get(1)'\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m",
+ "\u001b[1;32m/Users/minrk/Library/Python/2.7/lib/python/site-packages/IPython/core/interactiveshell.pyc\u001b[0m in \u001b[0;36mmagic\u001b[1;34m(self, arg_s)\u001b[0m\n\u001b[0;32m 2156\u001b[0m \u001b[0mmagic_name\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0m_\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mmagic_arg_s\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0marg_s\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mpartition\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;34m' '\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 2157\u001b[0m \u001b[0mmagic_name\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mmagic_name\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mlstrip\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mprefilter\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mESC_MAGIC\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m-> 2158\u001b[1;33m \u001b[1;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mrun_line_magic\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mmagic_name\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mmagic_arg_s\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 2159\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 2160\u001b[0m \u001b[1;31m#-------------------------------------------------------------------------\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
+ "\u001b[1;32m/Users/minrk/Library/Python/2.7/lib/python/site-packages/IPython/core/interactiveshell.pyc\u001b[0m in \u001b[0;36mrun_line_magic\u001b[1;34m(self, magic_name, line)\u001b[0m\n\u001b[0;32m 2082\u001b[0m \u001b[0margs\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mappend\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0msys\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_getframe\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mstack_depth\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mf_locals\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 2083\u001b[0m \u001b[1;32mwith\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mbuiltin_trap\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m-> 2084\u001b[1;33m \u001b[0mresult\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mfn\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m*\u001b[0m\u001b[0margs\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 2085\u001b[0m \u001b[1;32mreturn\u001b[0m \u001b[0mresult\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 2086\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n",
+ "\u001b[1;32m/Users/minrk/Library/Python/2.7/lib/python/site-packages/IPython/core/magics/execution.pyc\u001b[0m in \u001b[0;36mtime\u001b[1;34m(self, parameter_s, user_locals)\u001b[0m\n",
+ "\u001b[1;32m/Users/minrk/Library/Python/2.7/lib/python/site-packages/IPython/core/magic.pyc\u001b[0m in \u001b[0;36m<lambda>\u001b[1;34m(f, *a, **k)\u001b[0m\n\u001b[0;32m 189\u001b[0m \u001b[1;31m# but it's overkill for just that one bit of state.\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 190\u001b[0m \u001b[1;32mdef\u001b[0m \u001b[0mmagic_deco\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0marg\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 191\u001b[1;33m \u001b[0mcall\u001b[0m \u001b[1;33m=\u001b[0m \u001b[1;32mlambda\u001b[0m \u001b[0mf\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;33m*\u001b[0m\u001b[0ma\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;33m**\u001b[0m\u001b[0mk\u001b[0m\u001b[1;33m:\u001b[0m \u001b[0mf\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m*\u001b[0m\u001b[0ma\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;33m**\u001b[0m\u001b[0mk\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 192\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 193\u001b[0m \u001b[1;32mif\u001b[0m \u001b[0mcallable\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0marg\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
+ "\u001b[1;32m/Users/minrk/Library/Python/2.7/lib/python/site-packages/IPython/core/magics/execution.pyc\u001b[0m in \u001b[0;36mtime\u001b[1;34m(self, parameter_s, user_locals)\u001b[0m\n\u001b[0;32m 893\u001b[0m \u001b[1;32mif\u001b[0m \u001b[0mmode\u001b[0m\u001b[1;33m==\u001b[0m\u001b[1;34m'eval'\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 894\u001b[0m \u001b[0mst\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mclock2\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 895\u001b[1;33m \u001b[0mout\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0meval\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mcode\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mglob\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0muser_locals\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 896\u001b[0m \u001b[0mend\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mclock2\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 897\u001b[0m \u001b[1;32melse\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
+ "\u001b[1;32m<timed eval>\u001b[0m in \u001b[0;36m<module>\u001b[1;34m()\u001b[0m\n",
+ "\u001b[1;32m/Users/minrk/Library/Python/2.7/lib/python/site-packages/IPython/parallel/client/asyncresult.pyc\u001b[0m in \u001b[0;36mget\u001b[1;34m(self, timeout)\u001b[0m\n\u001b[0;32m 125\u001b[0m \u001b[1;32mraise\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_exception\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 126\u001b[0m \u001b[1;32melse\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 127\u001b[1;33m \u001b[1;32mraise\u001b[0m \u001b[0merror\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mTimeoutError\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;34m\"Result not ready.\"\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 128\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 129\u001b[0m \u001b[1;32mdef\u001b[0m \u001b[0mready\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
+ "\u001b[1;31mTimeoutError\u001b[0m: Result not ready."
+ ]
+ }
+ ],
+ "prompt_number": 43
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "%time ar.get()"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "CPU times: user 0.16 s, sys: 0.05 s, total: 0.21 s\n",
+ "Wall time: 1.63 s\n"
+ ]
+ },
+ {
+ "output_type": "pyout",
+ "prompt_number": 44,
+ "text": [
+ "10.000641107559204"
+ ]
+ }
+ ],
+ "prompt_number": 44
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "For convenience, you can set block for a single call with the extra sync/async methods:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "e0.apply_sync(os.getpid)"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 45,
+ "text": [
+ "74967"
+ ]
+ }
+ ],
+ "prompt_number": 45
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "e0.apply_async(os.getpid)"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 46,
+ "text": [
+ "<AsyncResult: getpid>"
+ ]
+ }
+ ],
+ "prompt_number": 46
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "_.get()"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 47,
+ "text": [
+ "74967"
+ ]
+ }
+ ],
+ "prompt_number": 47
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "The same difference between single and multiengine applies here\n",
+ "\n",
+ "(in fact, blocking execution is actually non-blocking execution with an implied `asyncresult.get()` at the end!)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "ar = dview.apply_async(os.getpid)\n",
+ "ar"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 48,
+ "text": [
+ "<AsyncResult: getpid>"
+ ]
+ }
+ ],
+ "prompt_number": 48
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "ar.get()"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 49,
+ "text": [
+ "[74967, 74968, 74974, 74973]"
+ ]
+ }
+ ],
+ "prompt_number": 49
+ },
+ {
+ "cell_type": "heading",
+ "level": 2,
+ "metadata": {},
+ "source": [
+ "Map"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "DirectViews have a map method, which behaves just like the builtin map,\n",
+ "but computed in parallel."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "dview.block = True\n",
+ "\n",
+ "serial_result = map(lambda x:x**10, range(32))\n",
+ "parallel_result = dview.map(lambda x:x**10, range(32))\n",
+ "\n",
+ "serial_result == parallel_result"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 50,
+ "text": [
+ "True"
+ ]
+ }
+ ],
+ "prompt_number": 50
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "`DirectView.map` partitions the sequences onto each engine,\n",
+ "and then calls `map` remotely. The result is always a single\n",
+ "IPython task per engine."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "def wait_here(t):\n",
+ " import time\n",
+ " print t\n",
+ " time.sleep(t)\n",
+ " return t"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [],
+ "prompt_number": 51
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "The AsyncResult object is the principal API for working with parallel results, pending or complete.\n",
+ "It has a few helper methods and properties for monitoring your data:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "times = np.linspace(0,1,17)\n",
+ "amr = dview.map_async(wait_here, times)\n",
+ "# wait for the tasks to finish\n",
+ "amr.wait_interactive()\n",
+ "# print some timings\n",
+ "print amr.serial_time\n",
+ "print amr.wall_time"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ " 4/4 tasks finished after 3 s"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "\n",
+ "done\n",
+ "8.571256\n",
+ "3.675041\n"
+ ]
+ }
+ ],
+ "prompt_number": 52
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "# display the output:\n",
+ "amr.display_outputs()"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "[stdout:0] \n",
+ "0.0\n",
+ "0.0625\n",
+ "0.125\n",
+ "0.1875\n",
+ "0.25\n",
+ "[stdout:1] \n",
+ "0.3125\n",
+ "0.375\n",
+ "0.4375\n",
+ "0.5\n",
+ "[stdout:2] \n",
+ "0.5625\n",
+ "0.625\n",
+ "0.6875\n",
+ "0.75\n",
+ "[stdout:3] \n",
+ "0.8125\n",
+ "0.875\n",
+ "0.9375\n",
+ "1.0\n"
+ ]
+ }
+ ],
+ "prompt_number": 53
+ },
+ {
+ "cell_type": "heading",
+ "level": 3,
+ "metadata": {},
+ "source": [
+ "Summary"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "* view.apply calls a function remotely\n",
+ "* view.execute runs code strings remotely\n",
+ "* apply functions access the engine namespace as globals\n",
+ "* push/pull and scatter/gather move data around\n",
+ "* Single Engine views return single objects, MultiEngine views return lists\n",
+ "* AsyncResults wrap results for waiting and metadata.\n"
+ ]
+ }
+ ],
+ "metadata": {}
+ }
+ ]
+}
View
1,571 parallel/Part 3 - Parallel Magics.ipynb
1,571 additions, 0 deletions not shown
View
1,017 parallel/Part 4 - Load Balancing.ipynb
@@ -0,0 +1,1017 @@
+{
+ "metadata": {
+ "name": "Part 4 - Load Balancing"
+ },
+ "nbformat": 3,
+ "nbformat_minor": 0,
+ "worksheets": [
+ {
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# Load-balancing with IPython.parallel"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Basic imports"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "import os,sys,time\n",
+ "import numpy as np\n",
+ "\n",
+ "from IPython.display import display\n",
+ "from IPython import parallel\n",
+ "rc = parallel.Client()\n",
+ "dview = rc[:]"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [],
+ "prompt_number": 1
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "This time, we create a LoadBalancedView"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "lview = rc.load_balanced_view()\n",
+ "lview"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 2,
+ "text": [
+ "<LoadBalancedView None>"
+ ]
+ }
+ ],
+ "prompt_number": 2
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "LoadBalancedViews behave very much like a DirectView on a single engine:\n",
+ "\n",
+ "Each call to `apply()` results in a single remote computation,\n",
+ "and the result (or AsyncResult) of that call is returned directly,\n",
+ "rather than in a list, as in the multi-engine DirectView."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": true,
+ "input": [
+ "e0 = rc[0]"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [],
+ "prompt_number": 3
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "from numpy.linalg import norm\n",
+ "A = np.linspace(0, 2*np.pi, 1024)\n",
+ "\n",
+ "e0.apply_sync(norm, A, 2)"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 6,
+ "text": [
+ "116.1115241640244"
+ ]
+ }
+ ],
+ "prompt_number": 6
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "lview.apply_sync(norm, A, 2)"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 7,
+ "text": [
+ "116.1115241640244"
+ ]
+ }
+ ],
+ "prompt_number": 7
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "However, unlike the DirectView of a single engine, you are letting the IPython Scheduler decide which engine should do the work:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "e0.apply_sync(os.getpid)"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 8,
+ "text": [
+ "74967"
+ ]
+ }
+ ],
+ "prompt_number": 8
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "for i in range(2*len(rc.ids)):\n",
+ " print lview.apply_sync(os.getpid)"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "74968\n",
+ "74973\n",
+ "74967\n",
+ "74974\n",
+ "74968\n",
+ "74973"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "\n",
+ "74967\n",
+ "74974\n"
+ ]
+ }
+ ],
+ "prompt_number": 9
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# Map\n",
+ "\n",
+ "The LoadBalancedView also has a load-balanced version of the builtin `map()`"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "lview.block = True\n",
+ "\n",
+ "serial_result = map(lambda x:x**10, range(32))\n",
+ "parallel_result = lview.map(lambda x:x**10, range(32))\n",
+ "\n",
+ "serial_result == parallel_result"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 10,
+ "text": [
+ "True"
+ ]
+ }
+ ],
+ "prompt_number": 10
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Just like `apply()`, you can use non-blocking map with `block=False` or `map_async`"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "amr = lview.map_async(lambda x:x**10, range(32))\n",
+ "amr"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "pyout",
+ "prompt_number": 11,
+ "text": [
+ "<AsyncMapResult: <lambda>>"
+ ]
+ }
+ ],
+ "prompt_number": 11
+ },
+ {
+ "cell_type": "heading",
+ "level": 2,
+ "metadata": {},
+ "source": [
+ "Map results are iterable!"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "for n in amr:\n",
+ " print n"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "0\n",
+ "1\n",
+ "1024\n",
+ "59049\n",
+ "1048576\n",
+ "9765625\n",
+ "60466176\n",
+ "282475249\n",
+ "1073741824\n",
+ "3486784401\n",
+ "10000000000\n",
+ "25937424601\n",
+ "61917364224\n",
+ "137858491849\n",
+ "289254654976\n",
+ "576650390625\n",
+ "1099511627776\n",
+ "2015993900449\n",
+ "3570467226624\n",
+ "6131066257801\n",
+ "10240000000000\n",
+ "16679880978201\n",
+ "26559922791424\n",
+ "41426511213649\n",
+ "63403380965376\n",
+ "95367431640625\n",
+ "141167095653376\n",
+ "205891132094649\n",
+ "296196766695424\n",
+ "420707233300201\n",
+ "590490000000000\n",
+ "819628286980801\n"
+ ]
+ }
+ ],
+ "prompt_number": 12
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "AsyncResults with multiple results are actually iterable before their\n",
+ "results arrive.\n",
+ "\n",
+ "This means that you can perform map/reduce operations on elements as\n",
+ "they come in:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "lview.block = False"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [],
+ "prompt_number": 13
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "# scatter 'id', so id=0,1,2 on engines 0,1,2\n",
+ "dv = rc[:]\n",
+ "dv.scatter('id', rc.ids, flatten=True)\n",
+ "print dv['id']\n",
+ "\n",
+ "# create a Reference to `id`. This will be a different value on each engine\n",
+ "ref = parallel.Reference('id')\n",
+ "\n",
+ "tic = time.time()\n",
+ "ar = dv.apply(time.sleep, ref)\n",
+ "for i,r in enumerate(ar):\n",
+ " print \"%i: %.3f\"%(i, time.time()-tic)\n",
+ " sys.stdout.flush()"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "[0, 1, 2, 3]\n",
+ "0: 0.030\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "1: 1.017\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "2: 2.028\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "3: 3.026\n"
+ ]
+ }
+ ],
+ "prompt_number": 14
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Now we submit a bunch of tasks of increasing magnitude, and\n",
+ "watch where they happen, iterating through the results as they come."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "collapsed": false,
+ "input": [
+ "def sleep_here(t):\n",
+ " \"\"\"sleep here for a time, return where it happened\"\"\"\n",
+ " import time\n",
+ " time.sleep(t)\n",
+ " return id\n",
+ "\n",
+ "amr = lview.map(sleep_here, [.01*t for t in range(32)])\n",
+ "tic = time.time()\n",
+ "for i,r in enumerate(amr):\n",
+ " print \"task %i on engine %i: %.3f\" % (i, r, time.time()-tic)\n",
+ " sys.stdout.flush()"
+ ],
+ "language": "python",
+ "metadata": {},
+ "outputs": [
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "task 0 on engine 2: 0.001\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "task 1 on engine 3: 0.001\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "task 2 on engine 1: 0.001\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "task 3 on engine 0: 0.002\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "task 4 on engine 2: 0.043\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "task 5 on engine 3: 0.043\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "task 6 on engine 1: 0.096\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "task 7 on engine 0: 0.108\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "task 8 on engine 2: 0.110\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "task 9 on engine 3: 0.179\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "task 10 on engine 1: 0.205\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "task 11 on engine 0: 0.207\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "task 12 on engine 2: 0.245\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "task 13 on engine 3: 0.307\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "task 14 on engine 1: 0.361\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "task 15 on engine 0: 0.370\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "task 16 on engine 2: 0.459\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "task 17 on engine 3: 0.488\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "task 18 on engine 1: 0.544\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "task 19 on engine 0: 0.582\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "task 20 on engine 2: 0.670\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "task 21 on engine 3: 0.719\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "task 22 on engine 1: 0.777\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "task 23 on engine 0: 0.829\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "task 24 on engine 2: 0.923\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "task 25 on engine 3: 0.990\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "task 26 on engine 1: 1.051\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "task 27 on engine 0: 1.107\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "task 28 on engine 2: 1.205\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "task 29 on engine 3: 1.298\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "task 30 on engine 1: 1.355\n"
+ ]
+ },
+ {
+ "output_type": "stream",
+ "stream": "stdout",
+ "text": [
+ "task 31 on engine 0: 1.441\n"
+ ]
+ }
+ ],
+ "prompt_number": 15
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Unlike `DirectView.map()`, which always results in one task per engine,\n",
+ "LoadBalance map defaults to one task per *item* in the sequence. This\n",
+ "can be changed by specifying the `chunksize` keyword arg."
+ ]
+ },
+ {
+ "cell_type": "code",