Skip to content
Browse files

updates to docs and examples

  • Loading branch information...
1 parent 4d0058f commit 45e272d0c02dd7d177a1cf3f3721ec46f0b5f42b @minrk minrk committed Apr 4, 2011
View
5 IPython/config/default/ipclusterz_config.py
@@ -24,7 +24,7 @@
# - SGEControllerLauncher
# - WindowsHPCControllerLauncher
# c.Global.controller_launcher = 'IPython.parallel.launcher.LocalControllerLauncher'
-c.Global.controller_launcher = 'IPython.parallel.launcher.PBSControllerLauncher'
+# c.Global.controller_launcher = 'IPython.parallel.launcher.PBSControllerLauncher'
# Options are:
# - LocalEngineSetLauncher
@@ -157,6 +157,9 @@
# If for some reason the Controller and Engines have different options above, they
# can be set as c.PBSControllerLauncher.<option> etc.
+# PBS and SGE have default templates, but you can specify your own, either as strings
+# or from files, as described here:
+
# The batch submission script used to start the controller. This is where
# environment variables would be setup, etc. This string is interpreted using
# the Itpl module in IPython.external. Basically, you can use ${n} for the
View
73 docs/examples/newparallel/davinci/pwordfreq.py
@@ -0,0 +1,73 @@
+#!/usr/bin/env python
+"""Parallel word frequency counter.
+
+This only works for a local cluster, because the filenames are local paths.
+"""
+
+
+import os
+import urllib
+
+from itertools import repeat
+
+from wordfreq import print_wordfreq, wordfreq
+
+from IPython.parallel import Client, Reference
+
+davinci_url = "http://www.gutenberg.org/cache/epub/5000/pg5000.txt"
+
+def pwordfreq(view, fnames):
+ """Parallel word frequency counter.
+
+ view - An IPython DirectView
+ fnames - The filenames containing the split data.
+ """
+ assert len(fnames) == len(view.targets)
+ view.scatter('fname', fnames, flatten=True)
+ ar = view.apply(wordfreq, Reference('fname'))
+ freqs_list = ar.get()
+ word_set = set()
+ for f in freqs_list:
+ word_set.update(f.keys())
+ freqs = dict(zip(word_set, repeat(0)))
+ for f in freqs_list:
+ for word, count in f.iteritems():
+ freqs[word] += count
+ return freqs
+
+if __name__ == '__main__':
+ # Create a Client and View
+ rc = Client()
+
+ view = rc[:]
+
+ if not os.path.exists('davinci.txt'):
+ # download from project gutenberg
+ print "Downloading Da Vinci's notebooks from Project Gutenberg"
+ urllib.urlretrieve(davinci_url, 'davinci.txt')
+
+ # Run the serial version
+ print "Serial word frequency count:"
+ text = open('davinci.txt').read()
+ freqs = wordfreq(text)
+ print_wordfreq(freqs, 10)
+
+
+ # The parallel version
+ print "\nParallel word frequency count:"
+ # split the davinci.txt into one file per engine:
+ lines = text.splitlines()
+ nlines = len(lines)
+ n = len(rc)
+ block = nlines/n
+ for i in range(n):
+ chunk = lines[i*block:i*(block+1)]
+ with open('davinci%i.txt'%i, 'w') as f:
+ f.write('\n'.join(chunk))
+
+ cwd = os.path.abspath(os.getcwd())
+ fnames = [ os.path.join(cwd, 'davinci%i.txt'%i) for i in range(n)]
+ pfreqs = pwordfreq(view,fnames)
+ print_wordfreq(freqs)
+ # cleanup split files
+ map(os.remove, fnames)
View
68 docs/examples/newparallel/davinci/wordfreq.py
@@ -0,0 +1,68 @@
+"""Count the frequencies of words in a string"""
+
+from __future__ import division
+
+import cmath as math
+
+
+def wordfreq(text, is_filename=False):
+ """Return a dictionary of words and word counts in a string."""
+ if is_filename:
+ with open(text) as f:
+ text = f.read()
+ freqs = {}
+ for word in text.split():
+ lword = word.lower()
+ freqs[lword] = freqs.get(lword, 0) + 1
+ return freqs
+
+
+def print_wordfreq(freqs, n=10):
+ """Print the n most common words and counts in the freqs dict."""
+
+ words, counts = freqs.keys(), freqs.values()
+ items = zip(counts, words)
+ items.sort(reverse=True)
+ for (count, word) in items[:n]:
+ print word, count
+
+
+def wordfreq_to_weightsize(worddict, minsize=25, maxsize=50, minalpha=0.5, maxalpha=1.0):
+ mincount = min(worddict.itervalues())
+ maxcount = max(worddict.itervalues())
+ weights = {}
+ for k, v in worddict.iteritems():
+ w = (v-mincount)/(maxcount-mincount)
+ alpha = minalpha + (maxalpha-minalpha)*w
+ size = minsize + (maxsize-minsize)*w
+ weights[k] = (alpha, size)
+ return weights
+
+
+def tagcloud(worddict, n=10, minsize=25, maxsize=50, minalpha=0.5, maxalpha=1.0):
+ from matplotlib import pyplot as plt
+ import random
+
+ worddict = wordfreq_to_weightsize(worddict, minsize, maxsize, minalpha, maxalpha)
+
+ fig = plt.figure()
+ ax = fig.add_subplot(111)
+ ax.set_position([0.0,0.0,1.0,1.0])
+ plt.xticks([])
+ plt.yticks([])
+
+ words = worddict.keys()
+ alphas = [v[0] for v in worddict.values()]
+ sizes = [v[1] for v in worddict.values()]
+ items = zip(alphas, sizes, words)
+ items.sort(reverse=True)
+ for alpha, size, word in items[:n]:
+ # xpos = random.normalvariate(0.5, 0.3)
+ # ypos = random.normalvariate(0.5, 0.3)
+ xpos = random.uniform(0.0,1.0)
+ ypos = random.uniform(0.0,1.0)
+ ax.text(xpos, ypos, word.lower(), alpha=alpha, fontsize=size)
+ ax.autoscale_view()
+ return ax
+
+
View
17 docs/examples/newparallel/multienginemap.py
@@ -0,0 +1,17 @@
+from IPython.parallel import Client
+
+rc = Client()
+view = rc[:]
+result = view.map_sync(lambda x: 2*x, range(10))
+print "Simple, default map: ", result
+
+ar = view.map_async(lambda x: 2*x, range(10))
+print "Submitted map, got AsyncResult: ", ar
+result = ar.r
+print "Using map_async: ", result
+
+@view.parallel(block=True)
+def f(x): return 2*x
+
+result = f(range(10))
+print "Using a parallel function: ", result
View
123 docs/examples/newparallel/nwmerge.py
@@ -0,0 +1,123 @@
+"""Example showing how to merge multiple remote data streams.
+"""
+# Slightly modified version of:
+# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/511509
+
+import heapq
+from IPython.parallel.error import RemoteError
+
+def mergesort(list_of_lists, key=None):
+ """ Perform an N-way merge operation on sorted lists.
+
+ @param list_of_lists: (really iterable of iterable) of sorted elements
+ (either by naturally or by C{key})
+ @param key: specify sort key function (like C{sort()}, C{sorted()})
+
+ Yields tuples of the form C{(item, iterator)}, where the iterator is the
+ built-in list iterator or something you pass in, if you pre-generate the
+ iterators.
+
+ This is a stable merge; complexity O(N lg N)
+
+ Examples::
+
+ >>> print list(mergesort([[1,2,3,4],
+ ... [2,3.25,3.75,4.5,6,7],
+ ... [2.625,3.625,6.625,9]]))
+ [1, 2, 2, 2.625, 3, 3.25, 3.625, 3.75, 4, 4.5, 6, 6.625, 7, 9]
+
+ # note stability
+ >>> print list(mergesort([[1,2,3,4],
+ ... [2,3.25,3.75,4.5,6,7],
+ ... [2.625,3.625,6.625,9]],
+ ... key=int))
+ [1, 2, 2, 2.625, 3, 3.25, 3.75, 3.625, 4, 4.5, 6, 6.625, 7, 9]
+
+
+ >>> print list(mergesort([[4, 3, 2, 1],
+ ... [7, 6, 4.5, 3.75, 3.25, 2],
+ ... [9, 6.625, 3.625, 2.625]],
+ ... key=lambda x: -x))
+ [9, 7, 6.625, 6, 4.5, 4, 3.75, 3.625, 3.25, 3, 2.625, 2, 2, 1]
+ """
+
+ heap = []
+ for i, itr in enumerate(iter(pl) for pl in list_of_lists):
+ try:
+ item = itr.next()
+ if key:
+ toadd = (key(item), i, item, itr)
+ else:
+ toadd = (item, i, itr)
+ heap.append(toadd)
+ except StopIteration:
+ pass
+ heapq.heapify(heap)
+
+ if key:
+ while heap:
+ _, idx, item, itr = heap[0]
+ yield item
+ try:
+ item = itr.next()
+ heapq.heapreplace(heap, (key(item), idx, item, itr) )
+ except StopIteration:
+ heapq.heappop(heap)
+
+ else:
+ while heap:
+ item, idx, itr = heap[0]
+ yield item
+ try:
+ heapq.heapreplace(heap, (itr.next(), idx, itr))
+ except StopIteration:
+ heapq.heappop(heap)
+
+
+def remote_iterator(view,name):
+ """Return an iterator on an object living on a remote engine.
+ """
+ view.execute('it%s=iter(%s)'%(name,name), block=True)
+ while True:
+ try:
+ result = view.apply_sync(lambda x: x.next(), Reference('it'+name))
+ # This causes the StopIteration exception to be raised.
+ except RemoteError, e:
+ if e.ename == 'StopIteration':
+ raise StopIteration
+ else:
+ raise e
+ else:
+ yield result
+
+# Main, interactive testing
+if __name__ == '__main__':
+
+ from IPython.parallel import Client, Reference
+ rc = Client()
+ view = rc[:]
+ print 'Engine IDs:', rc.ids
+
+ # Make a set of 'sorted datasets'
+ a0 = range(5,20)
+ a1 = range(10)
+ a2 = range(15,25)
+
+ # Now, imagine these had been created in the remote engines by some long
+ # computation. In this simple example, we just send them over into the
+ # remote engines. They will all be called 'a' in each engine.
+ rc[0]['a'] = a0
+ rc[1]['a'] = a1
+ rc[2]['a'] = a2
+
+ # And we now make a local object which represents the remote iterator
+ aa0 = remote_iterator(rc[0],'a')
+ aa1 = remote_iterator(rc[1],'a')
+ aa2 = remote_iterator(rc[2],'a')
+
+ # Let's merge them, both locally and remotely:
+ print 'Merge the local datasets:'
+ print list(mergesort([a0,a1,a2]))
+
+ print 'Locally merge the remote sets:'
+ print list(mergesort([aa0,aa1,aa2]))
View
40 docs/examples/newparallel/phistogram.py
@@ -0,0 +1,40 @@
+"""Parallel histogram function"""
+import numpy
+from IPython.utils.pickleutil import Reference
+
+def phistogram(view, a, bins=10, rng=None, normed=False):
+ """Compute the histogram of a remote array a.
+
+ Parameters
+ ----------
+ view
+ IPython DirectView instance
+ a : str
+ String name of the remote array
+ bins : int
+ Number of histogram bins
+ rng : (float, float)
+ Tuple of min, max of the range to histogram
+ normed : boolean
+ Should the histogram counts be normalized to 1
+ """
+ nengines = len(view.targets)
+
+ # view.push(dict(bins=bins, rng=rng))
+ with view.sync_imports():
+ import numpy
+ rets = view.apply_sync(lambda a, b, rng: numpy.histogram(a,b,rng), Reference(a), bins, rng)
+ hists = [ r[0] for r in rets ]
+ lower_edges = [ r[1] for r in rets ]
+ # view.execute('hist, lower_edges = numpy.histogram(%s, bins, rng)' % a)
+ lower_edges = view.pull('lower_edges', targets=0)
+ hist_array = numpy.array(hists).reshape(nengines, -1)
+ # hist_array.shape = (nengines,-1)
+ total_hist = numpy.sum(hist_array, 0)
+ if normed:
+ total_hist = total_hist/numpy.sum(total_hist,dtype=float)
+ return total_hist, lower_edges
+
+
+
+
View
57 docs/examples/newparallel/plotting/plotting_backend.py
@@ -0,0 +1,57 @@
+"""An example of how to use IPython for plotting remote parallel data
+
+The two files plotting_frontend.py and plotting_backend.py go together.
+
+This file (plotting_backend.py) performs the actual computation. For this
+example, the computation just generates a set of random numbers that
+look like a distribution of particles with 2D position (x,y) and
+momentum (px,py). In a real situation, this file would do some time
+consuming and complicated calculation, and could possibly make calls
+to MPI.
+
+One important feature is that this script can also be run standalone without
+IPython. This is nice as it allows it to be run in more traditional
+settings where IPython isn't being used.
+
+When used with IPython.parallel, this code is run on the engines. Because this
+code doesn't make any plots, the engines don't have to have any plotting
+packages installed.
+"""
+
+# Imports
+import numpy as N
+import time
+import random
+
+# Functions
+def compute_particles(number):
+ x = N.random.standard_normal(number)
+ y = N.random.standard_normal(number)
+ px = N.random.standard_normal(number)
+ py = N.random.standard_normal(number)
+ return x, y, px, py
+
+def downsample(array, k):
+ """Choose k random elements of array."""
+ length = array.shape[0]
+ indices = random.sample(xrange(length), k)
+ return array[indices]
+
+# Parameters of the run
+number = 100000
+d_number = 1000
+
+# The actual run
+
+time.sleep(0) # Pretend it took a while
+x, y, px, py = compute_particles(number)
+# Now downsample the data
+downx = downsample(x, d_number)
+downy = downsample(x, d_number)
+downpx = downsample(px, d_number)
+downpy = downsample(py, d_number)
+
+print "downx: ", downx[:10]
+print "downy: ", downy[:10]
+print "downpx: ", downpx[:10]
+print "downpy: ", downpy[:10]
View
60 docs/examples/newparallel/plotting/plotting_frontend.py
@@ -0,0 +1,60 @@
+"""An example of how to use IPython1 for plotting remote parallel data
+
+The two files plotting_frontend.py and plotting_backend.py go together.
+
+To run this example, first start the IPython controller and 4
+engines::
+
+ ipclusterz start -n 4
+
+Then start ipython in pylab mode::
+
+ ipython -pylab
+
+Then a simple "run plotting_frontend.py" in IPython will run the
+example. When this is done, all the variables (such as number, downx, etc.)
+are available in IPython, so for example you can make additional plots.
+"""
+
+import numpy as N
+from pylab import *
+from IPython.parallel import Client
+
+# Connect to the cluster
+rc = Client()
+view = rc[:]
+
+# Run the simulation on all the engines
+view.run('plotting_backend.py')
+
+# Bring back the data. These are all AsyncResult objects
+number = view.pull('number')
+d_number = view.pull('d_number')
+downx = view.gather('downx')
+downy = view.gather('downy')
+downpx = view.gather('downpx')
+downpy = view.gather('downpy')
+
+# but we can still iterate through AsyncResults before they are done
+print "number: ", sum(number)
+print "downsampled number: ", sum(d_number)
+
+
+# Make a scatter plot of the gathered data
+# These calls to matplotlib could be replaced by calls to pygist or
+# another plotting package.
+figure(1)
+# wait for downx/y
+downx = downx.get()
+downy = downy.get()
+scatter(downx, downy)
+xlabel('x')
+ylabel('y')
+figure(2)
+# wait for downpx/y
+downpx = downpx.get()
+downpy = downpy.get()
+scatter(downpx, downpy)
+xlabel('px')
+ylabel('py')
+show()
View
56 docs/examples/newparallel/rmt/rmt.ipy
@@ -0,0 +1,56 @@
+#-------------------------------------------------------------------------------
+# Driver code that the client runs.
+#-------------------------------------------------------------------------------
+# To run this code start a controller and engines using:
+# ipcluster -n 2
+# Then run the scripts by doing irunner rmt.ipy or by starting ipython and
+# doing run rmt.ipy.
+
+from rmtkernel import *
+import numpy
+from IPython.parallel import Client
+
+
+def wignerDistribution(s):
+ """Returns (s, rho(s)) for the Wigner GOE distribution."""
+ return (numpy.pi*s/2.0) * numpy.exp(-numpy.pi*s**2/4.)
+
+
+def generateWignerData():
+ s = numpy.linspace(0.0,4.0,400)
+ rhos = wignerDistribution(s)
+ return s, rhos
+
+
+def serialDiffs(num, N):
+ diffs = ensembleDiffs(num, N)
+ normalizedDiffs = normalizeDiffs(diffs)
+ return normalizedDiffs
+
+
+def parallelDiffs(rc, num, N):
+ nengines = len(rc.targets)
+ num_per_engine = num/nengines
+ print "Running with", num_per_engine, "per engine."
+ ar = rc.apply_async(ensembleDiffs, num_per_engine, N)
+ return numpy.array(ar.get()).flatten()
+
+
+# Main code
+if __name__ == '__main__':
+ rc = Client()
+ view = rc[:]
+ print "Distributing code to engines..."
+ view.run('rmtkernel.py')
+ view.block = False
+
+ # Simulation parameters
+ nmats = 100
+ matsize = 30
+ # tic = time.time()
+ %timeit -r1 -n1 serialDiffs(nmats,matsize)
+ %timeit -r1 -n1 parallelDiffs(view, nmats, matsize)
+
+ # Uncomment these to plot the histogram
+ # import pylab
+ # pylab.hist(parallelDiffs(rc,matsize,matsize))
View
44 docs/examples/newparallel/rmt/rmtkernel.py
@@ -0,0 +1,44 @@
+#-------------------------------------------------------------------------------
+# Core routines for computing properties of symmetric random matrices.
+#-------------------------------------------------------------------------------
+
+import numpy
+ra = numpy.random
+la = numpy.linalg
+
+def GOE(N):
+ """Creates an NxN element of the Gaussian Orthogonal Ensemble"""
+ m = ra.standard_normal((N,N))
+ m += m.T
+ return m
+
+
+def centerEigenvalueDiff(mat):
+ """Compute the eigvals of mat and then find the center eigval difference."""
+ N = len(mat)
+ evals = numpy.sort(la.eigvals(mat))
+ diff = evals[N/2] - evals[N/2-1]
+ return diff.real
+
+
+def ensembleDiffs(num, N):
+ """Return an array of num eigenvalue differences for the NxN GOE
+ ensemble."""
+ diffs = numpy.empty(num)
+ for i in xrange(num):
+ mat = GOE(N)
+ diffs[i] = centerEigenvalueDiff(mat)
+ return diffs
+
+
+def normalizeDiffs(diffs):
+ """Normalize an array of eigenvalue diffs."""
+ return diffs/diffs.mean()
+
+
+def normalizedEnsembleDiffs(num, N):
+ """Return an array of num *normalized eigenvalue differences for the NxN
+ GOE ensemble."""
+ diffs = ensembleDiffs(num, N)
+ return normalizeDiffs(diffs)
+
View
70 docs/examples/newparallel/task_profiler.py
@@ -0,0 +1,70 @@
+#!/usr/bin/env python
+"""Test the performance of the task farming system.
+
+This script submits a set of tasks via a LoadBalancedView. The tasks
+are basically just a time.sleep(t), where t is a random number between
+two limits that can be configured at the command line. To run
+the script there must first be an IPython controller and engines running::
+
+ ipclusterz start -n 16
+
+A good test to run with 16 engines is::
+
+ python task_profiler.py -n 128 -t 0.01 -T 1.0
+
+This should show a speedup of 13-14x. The limitation here is that the
+overhead of a single task is about 0.001-0.01 seconds.
+"""
+import random, sys
+from optparse import OptionParser
+
+from IPython.utils.timing import time
+from IPython.parallel import Client
+
+def main():
+ parser = OptionParser()
+ parser.set_defaults(n=100)
+ parser.set_defaults(tmin=1)
+ parser.set_defaults(tmax=60)
+ parser.set_defaults(profile='default')
+
+ parser.add_option("-n", type='int', dest='n',
+ help='the number of tasks to run')
+ parser.add_option("-t", type='float', dest='tmin',
+ help='the minimum task length in seconds')
+ parser.add_option("-T", type='float', dest='tmax',
+ help='the maximum task length in seconds')
+ parser.add_option("-p", '--profile', type='str', dest='profile',
+ help="the cluster profile [default: 'default']")
+
+ (opts, args) = parser.parse_args()
+ assert opts.tmax >= opts.tmin, "tmax must not be smaller than tmin"
+
+ rc = Client()
+ view = rc.load_balanced_view()
+ print view
+ rc.block=True
+ nengines = len(rc.ids)
+ rc[:].execute('from IPython.utils.timing import time')
+
+ # the jobs should take a random time within a range
+ times = [random.random()*(opts.tmax-opts.tmin)+opts.tmin for i in range(opts.n)]
+ stime = sum(times)
+
+ print "executing %i tasks, totalling %.1f secs on %i engines"%(opts.n, stime, nengines)
+ time.sleep(1)
+ start = time.time()
+ amr = view.map(time.sleep, times)
+ amr.get()
+ stop = time.time()
+
+ ptime = stop-start
+ scale = stime/ptime
+
+ print "executed %.1f secs in %.1f secs"%(stime, ptime)
+ print "%.3fx parallel performance on %i engines"%(scale, nengines)
+ print "%.1f%% of theoretical max"%(100*scale/nengines)
+
+
+if __name__ == '__main__':
+ main()
View
2 docs/examples/newparallel/wave2D/wavesolver.py
@@ -12,8 +12,6 @@
"""
import time
-from mpi4py import MPI
-mpi = MPI.COMM_WORLD
from numpy import exp, zeros, newaxis, sqrt, arange
def iseq(start=0, stop=None, inc=1):
View
17 docs/source/parallelz/parallel_intro.txt
@@ -217,10 +217,16 @@ everything is working correctly, try the following commands:
Out[5]: [ 'Hello, World', 'Hello, World', 'Hello, World', 'Hello, World' ]
-When a client is created with no arguments, the client tries to find the corresponding
-JSON file in the local `~/.ipython/clusterz_default/security` directory. If it finds it,
-you are set. If you have put the JSON file in a different location or it has a different
-name, create the client like this:
+When a client is created with no arguments, the client tries to find the corresponding JSON file
+in the local `~/.ipython/clusterz_default/security` directory. Or if you specified a profile,
+you can use that with the Client. This should cover most cases:
+
+.. sourcecode:: ipython
+
+ In [2]: c = Client(profile='myprofile')
+
+If you have put the JSON file in a different location or it has a different name, create the
+client like this:
.. sourcecode:: ipython
@@ -237,6 +243,9 @@ then you would connect to it with:
Where 'myhub.example.com' is the url or IP address of the machine on
which the Hub process is running (or another machine that has direct access to the Hub's ports).
+The SSH server may already be specified in ipcontroller-client.json, if the controller was
+instructed at its launch time.
+
You are now ready to learn more about the :ref:`Direct
<parallel_multiengine>` and :ref:`LoadBalanced <parallel_task>` interfaces to the
controller.
View
49 docs/source/parallelz/parallel_multiengine.txt
@@ -193,7 +193,7 @@ Creating a view is simple: index-access on a client creates a :class:`.DirectVie
Out[4]: <DirectView [1, 2]>
In [5]: view.apply<tab>
- view.apply view.apply_async view.apply_sync view.apply_with_flags
+ view.apply view.apply_async view.apply_sync
For convenience, you can set block temporarily for a single call with the extra sync/async methods.
@@ -233,7 +233,7 @@ method:
In [7]: rc[1::2].execute('c=a-b')
- In [8]: rc[:]['c'] # shorthand for rc[:].pull('c', block=True)
+ In [8]: dview['c'] # shorthand for dview.pull('c', block=True)
Out[8]: [15, -5, 15, -5]
@@ -478,7 +478,7 @@ Here are some examples of how you use :meth:`push` and :meth:`pull`:
In [39]: dview.pull('a')
Out[39]: [ 1.03234, 1.03234, 1.03234, 1.03234]
- In [40]: rc[0].pull('b')
+ In [40]: dview.pull('b', targets=0)
Out[40]: 3453
In [41]: dview.pull(('a','b'))
@@ -551,13 +551,54 @@ basic effect using :meth:`scatter` and :meth:`gather`:
In [67]: %px y = [i**10 for i in x]
Parallel execution on engines: [0, 1, 2, 3]
- Out[67]:
+ Out[67]:
In [68]: y = dview.gather('y')
In [69]: print y
[0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...]
+Remote imports
+--------------
+
+Sometimes you will want to import packages both in your interactive session
+and on your remote engines. This can be done with the :class:`ContextManager`
+created by a DirectView's :meth:`sync_imports` method:
+
+.. sourcecode:: ipython
+
+ In [69]: with dview.sync_imports():
+ ...: import numpy
+ importing numpy on engine(s)
+
+Any imports made inside the block will also be performed on the view's engines.
+sync_imports also takes a `local` boolean flag that defaults to True, which specifies
+whether the local imports should also be performed. However, support for `local=False`
+has not been implemented, so only packages that can be imported locally will work
+this way.
+
+You can also specify imports via the ``@require`` decorator. This is a decorator
+designed for use in Dependencies, but can be used to handle remote imports as well.
+Modules or module names passed to ``@require`` will be imported before the decorated
+function is called. If they cannot be imported, the decorated function will never
+execution, and will fail with an UnmetDependencyError.
+
+.. sourcecode:: ipython
+
+ In [69]: from IPython.parallel import require
+
+ In [70]: @requre('re'):
+ ...: def findall(pat, x):
+ ...: # re is guaranteed to be available
+ ...: return re.findall(pat, x)
+
+ # you can also pass modules themselves, that you already have locally:
+ In [71]: @requre(time):
+ ...: def wait(t):
+ ...: time.sleep(t)
+ ...: return t
+
+
Parallel exceptions
-------------------
View
29 docs/source/parallelz/parallel_process.txt
@@ -199,16 +199,16 @@ and engines:
c.Global.controller_launcher = 'IPython.parallel.launcher.PBSControllerLauncher'
c.Global.engine_launcher = 'IPython.parallel.launcher.PBSEngineSetLauncher'
-To use this mode, you first need to create a PBS script template that will be
-used to start the engines. Here is a sample PBS script template:
+IPython does provide simple default batch templates for PBS and SGE, but you may need
+to specify your own. Here is a sample PBS script template:
.. sourcecode:: bash
#PBS -N ipython
#PBS -j oe
#PBS -l walltime=00:10:00
#PBS -l nodes=${n/4}:ppn=4
- #PBS -q parallel
+ #PBS -q $queue
cd $$PBS_O_WORKDIR
export PATH=$$HOME/usr/local/bin
@@ -225,11 +225,12 @@ There are a few important points about this template:
expressions like ``${n/4}`` in the template to indicate the number of
nodes. There will always be a ${n} and ${cluster_dir} variable passed to the template.
These allow the batch system to know how many engines, and where the configuration
- files reside.
+ files reside. The same is true for the batch queue, with the template variable ``$queue``.
3. Because ``$`` is a special character used by the template engine, you must
escape any ``$`` by using ``$$``. This is important when referring to
- environment variables in the template.
+ environment variables in the template, or in SGE, where the config lines start
+ with ``#$``, which will have to be ``#$$``.
4. Any options to :command:`ipenginez` can be given in the batch script
template, or in :file:`ipenginez_config.py`.
@@ -245,7 +246,7 @@ The controller template should be similar, but simpler:
#PBS -j oe
#PBS -l walltime=00:10:00
#PBS -l nodes=1:ppn=4
- #PBS -q parallel
+ #PBS -q $queue
cd $$PBS_O_WORKDIR
export PATH=$$HOME/usr/local/bin
@@ -258,15 +259,23 @@ Once you have created these scripts, save them with names like
.. sourcecode:: python
- with open("pbs.engine.template") as f:
- c.PBSEngineSetLauncher.batch_template = f.read()
+ c.PBSEngineSetLauncher.batch_template_file = "pbs.engine.template"
- with open("pbs.controller.template") as f:
- c.PBSControllerLauncher.batch_template = f.read()
+ c.PBSControllerLauncher.batch_template_file = "pbs.controller.template"
Alternately, you can just define the templates as strings inside :file:`ipclusterz_config`.
+Whether you are using your own templates or our defaults, the extra configurables available are
+the number of engines to launch (``$n``, and the batch system queue to which the jobs are to be
+submitted (``$queue``)). These are configurables, and can be specified in
+:file:`ipclusterz_config`:
+
+.. sourcecode:: python
+
+ c.PBSLauncher.queue = 'veryshort.q'
+ c.PBSEngineSetLauncher.n = 64
+
Note that assuming you are running PBS on a multi-node cluster, the Controller's default behavior
of listening only on localhost is likely too restrictive. In this case, also assuming the
nodes are safely behind a firewall, you can simply instruct the Controller to listen for
View
3 docs/source/parallelz/parallel_task.txt
@@ -158,11 +158,10 @@ you specify are importable:
In [10]: @require('numpy', 'zmq')
...: def myfunc():
- ...: import numpy,zmq
...: return dostuff()
Now, any time you apply :func:`myfunc`, the task will only run on a machine that has
-numpy and pyzmq available.
+numpy and pyzmq available, and when :func:`myfunc` is called, numpy and zmq will be imported.
@depend
*******
View
231 docs/source/parallelz/parallel_transition.txt
@@ -0,0 +1,231 @@
+.. _parallel_transition:
+
+============================================================
+Transitioning from IPython.kernel to IPython.zmq.newparallel
+============================================================
+
+
+We have rewritten our parallel computing tools to use 0MQ_ and Tornado_. The redesign
+has resulted in dramatically improved performance, as well as (we think), an improved
+interface for executing code remotely. This doc is to help users of IPython.kernel
+transition their codes to the new code.
+
+.. _0MQ: http://zeromq.org
+.. _Tornado: https://github.com/facebook/tornado
+
+
+Processes
+=========
+
+The process model for the new parallel code is very similar to that of IPython.kernel. There is
+still a Controller, Engines, and Clients. However, the the Controller is now split into multiple
+processes, and can even be split across multiple machines. There does remain a single
+ipcontroller script for starting all of the controller processes.
+
+
+.. note::
+
+ TODO: fill this out after config system is updated
+
+
+.. seealso::
+
+ Detailed :ref:`Parallel Process <parallel_process>` doc for configuring and launching
+ IPython processes.
+
+Creating a Client
+=================
+
+Creating a client with default settings has not changed much, though the extended options have.
+One significant change is that there are no longer multiple Client classes to represent the
+various execution models. There is just one low-level Client object for connecting to the
+cluster, and View objects are created from that Client that provide the different interfaces
+for execution.
+
+
+To create a new client, and set up the default direct and load-balanced objects:
+
+.. sourcecode:: ipython
+
+ # old
+ In [1]: from IPython.kernel import client as kclient
+
+ In [2]: mec = kclient.MultiEngineClient()
+
+ In [3]: tc = kclient.TaskClient()
+
+ # new
+ In [1]: from IPython.parallel import Client
+
+ In [2]: rc = Client()
+
+ In [3]: dview = rc[:]
+
+ In [4]: lbview = rc.load_balanced_view()
+
+Apply
+=====
+
+The main change to the API is the addition of the :meth:`apply` to the View objects. This is a
+method that takes `view.apply(f,*args,**kwargs)`, and calls `f(*args, **kwargs)` remotely on one
+or more engines, returning the result. This means that the natural unit of remote execution
+is no longer a string of Python code, but rather a Python function.
+
+* non-copying sends (track)
+* remote References
+
+The flags for execution have also changed. Previously, there was only `block` denoting whether
+to wait for results. This remains, but due to the addition of fully non-copying sends of
+arrays and buffers, there is also a `track` flag, which instructs PyZMQ to produce a :class:`MessageTracker` that will let you know when it is safe again to edit arrays in-place.
+
+The result of a non-blocking call to `apply` is now an AsyncResult_ object, described below.
+
+MultiEngine
+===========
+
+The multiplexing interface previously provided by the MultiEngineClient is now provided by the
+DirectView. Once you have a Client connected, you can create a DirectView with index-access
+to the client (``view = client[1:5]``). The core methods for
+communicating with engines remain: `execute`, `run`, `push`, `pull`, `scatter`, `gather`. These
+methods all behave in much the same way as they did on a MultiEngineClient.
+
+
+.. sourcecode:: ipython
+
+ # old
+ In [2]: mec.execute('a=5', targets=[0,1,2])
+
+ # new
+ In [2]: view.execute('a=5', targets=[0,1,2])
+ # or
+ In [2]: rc[0,1,2].execute('a=5')
+
+
+This extends to any method that communicates with the engines.
+
+Requests of the Hub (queue status, etc.) are no-longer asynchronous, and do not take a `block`
+argument.
+
+
+* :meth:`get_ids` is now the property :attr:`ids`, which is passively updated by the Hub (no
+ need for network requests for an up-to-date list).
+* :meth:`barrier` has been renamed to :meth:`wait`, and now takes an optional timeout. :meth:`flush` is removed, as it is redundant with :meth:`wait`
+* :meth:`zip_pull` has been removed
+* :meth:`keys` has been removed, but is easily implemented as::
+
+ dview.apply(lambda : globals().keys())
+
+* :meth:`push_function` and :meth:`push_serialized` are removed, as :meth:`push` handles
+ functions without issue.
+
+.. seealso::
+
+ :ref:`Our Direct Interface doc <parallel_multiengine>` for a simple tutorial with the
+ DirectView.
+
+
+
+
+The other major difference is the use of :meth:`apply`. When remote work is simply functions,
+the natural return value is the actual Python objects. It is no longer the recommended pattern
+to use stdout as your results, due to stream decoupling and the asynchronous nature of how the
+stdout streams are handled in the new system.
+
+Task
+====
+
+Load-Balancing has changed more than Multiplexing. This is because there is no longer a notion
+of a StringTask or a MapTask, there are simply Python functions to call. Tasks are now
+simpler, because they are no longer composites of push/execute/pull/clear calls, they are
+a single function that takes arguments, and returns objects.
+
+The load-balanced interface is provided by the :class:`LoadBalancedView` class, created by the client:
+
+.. sourcecode:: ipython
+
+ In [10]: lbview = rc.load_balanced_view()
+
+ # load-balancing can also be restricted to a subset of engines:
+ In [10]: lbview = rc.load_balanced_view([1,2,3])
+
+A simple task would consist of sending some data, calling a function on that data, plus some
+data that was resident on the engine already, and then pulling back some results. This can
+all be done with a single function.
+
+
+Let's say you want to compute the dot product of two matrices, one of which resides on the
+engine, and another resides on the client. You might construct a task that looks like this:
+
+.. sourcecode:: ipython
+
+ In [10]: st = kclient.StringTask("""
+ import numpy
+ C=numpy.dot(A,B)
+ """,
+ push=dict(B=B),
+ pull='C'
+ )
+
+ In [11]: tid = tc.run(st)
+
+ In [12]: tr = tc.get_task_result(tid)
+
+ In [13]: C = tc['C']
+
+In the new code, this is simpler:
+
+.. sourcecode:: ipython
+
+ In [10]: import numpy
+
+ In [11]: from IPython.parallel import Reference
+
+ In [12]: ar = lbview.apply(numpy.dot, Reference('A'), B)
+
+ In [13]: C = ar.get()
+
+Note the use of ``Reference`` This is a convenient representation of an object that exists
+in the engine's namespace, so you can pass remote objects as arguments to your task functions.
+
+Also note that in the kernel model, after the task is run, 'A', 'B', and 'C' are all defined on
+the engine. In order to deal with this, there is also a `clear_after` flag for Tasks to prevent
+pollution of the namespace, and bloating of engine memory. This is not necessary with the new
+code, because only those objects explicitly pushed (or set via `globals()`) will be resident on
+the engine beyond the duration of the task.
+
+.. seealso::
+
+ Dependencies also work very differently than in IPython.kernel. See our :ref:`doc on Dependencies<parallel_dependencies>` for details.
+
+.. seealso::
+
+ :ref:`Our Task Interface doc <parallel_task>` for a simple tutorial with the
+ LoadBalancedView.
+
+
+.. _AsyncResult:
+
+PendingResults
+==============
+
+Since we no longer use Twisted, we also lose the use of Deferred objects. The results of
+non-blocking calls were represented as PendingDeferred or PendingResult objects. The object used
+for this in the new code is an AsyncResult object. The AsyncResult object is based on the object
+of the same name in the built-in :py-mod:`multiprocessing.pool` module. Our version provides a
+superset of that interface.
+
+Some things that behave the same:
+
+.. sourcecode:: ipython
+
+ # old
+ In [5]: pr = mec.pull('a', targets=[0,1], block=False)
+ In [6]: pr.r
+ Out[6]: [5, 5]
+
+ # new
+ In [5]: ar = rc[0,1].pull('a', block=False)
+ In [6]: ar.r
+ Out[6]: [5, 5]
+
+

0 comments on commit 45e272d

Please sign in to comment.
Something went wrong with that request. Please try again.