# Embarisingly Parallel for loops using Joblib
Example from documentation

In [1]:
from math import sqrt
[sqrt(i ** 2) for i in range(10)]

[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

In [2]:
from joblib import Parallel, delayed
Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in range(10))

[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

The progress meter: the higher the value of verbose, the more messages:
If verbose > 50 then message a for every task is returned.

In [3]:
from time import sleep
from joblib import Parallel, delayed
# n_jobs=1 turns off the parallel code for debuging.
r = Parallel(n_jobs=1, verbose=1)(delayed(sleep)(.1) for _ in range(100)) 

[Parallel(n_jobs=1)]: Done  49 tasks       | elapsed:    4.9s
[Parallel(n_jobs=1)]: Done 100 out of 100 | elapsed:   10.0s finished


In [4]:
r = Parallel(n_jobs=2, verbose=5)(delayed(sleep)(.1) for _ in range(100)) 

[Parallel(n_jobs=2)]: Done  24 tasks      | elapsed:    1.2s
[Parallel(n_jobs=2)]: Done 100 out of 100 | elapsed:    5.0s finished


In [5]:
r = Parallel(n_jobs=-1, verbose=10)(delayed(sleep)(.1) for _ in range(100)) 

[Parallel(n_jobs=-1)]: Batch computation too fast (0.1037s.) Setting batch_size=2.
[Parallel(n_jobs=-1)]: Done   5 tasks      | elapsed:    0.2s
[Parallel(n_jobs=-1)]: Done  12 tasks      | elapsed:    0.4s
[Parallel(n_jobs=-1)]: Done  26 tasks      | elapsed:    0.8s
[Parallel(n_jobs=-1)]: Done  40 tasks      | elapsed:    1.0s
[Parallel(n_jobs=-1)]: Done  58 tasks      | elapsed:    1.6s
[Parallel(n_jobs=-1)]: Done  76 tasks      | elapsed:    2.0s
[Parallel(n_jobs=-1)]: Done 100 out of 100 | elapsed:    2.6s finished


Running the same code on the simple sleep function shows the effect of increasing the number of seperate jobs/processes.

## Reusing a pool of workers

In [6]:
def test_reuse():
    """Test Reusing a pool of workers """
    with Parallel(n_jobs=2) as parallel:
        accumulator = 0.
        n_iter = 0
        while accumulator < 1000:
            results = parallel(delayed(sqrt)(accumulator + i ** 2) for i in range(5))
            accumulator += sum(results)  # synchronization barrier
            n_iter += 1

In [7]:
def test_no_reuse():
    """Test Showing Parallel overhead by not Reusing a pool of workers"""
    accumulator = 0.
    n_iter = 0
    while accumulator < 1000:
        results = Parallel(n_jobs=2)(delayed(sqrt)(accumulator + i ** 2) for i in range(5))
        accumulator += sum(results)  # synchronization barrier
        n_iter += 1

In [8]:
%timeit test_reuse()

10 loops, best of 3: 120 ms per loop


In [9]:
%timeit test_no_reuse()

1 loop, best of 3: 1.68 s per loop


# Generators:
Similar to comprehension lists but is effeicent with memory. When you create a comprehension list you need to store it in memory. This can be a problem if you use very large arrays.

The generator only creates one value at a time and then when it has used that value it forgets about it. Thus saving memory. As a result they can be used for iteration but only once.
You create a generator by using normal brackets "()" instead of square brackets "[]".

In [12]:
List = [x ** 2 for x in range(10) if (x%3) is 0]
print(List)
for val in List:
    print(val)

[0, 9, 36, 81]
0
9
36
81


In [13]:
gen = (x ** 2 for x in range(10) if (x%3) is 0)
print(gen)
for val in gen:
    print(val)

<generator object <genexpr> at 0x7f9484065fa0>
0
9
36
81


In [14]:
print("Another-iteration") 
print(List)
for val in List:
    print(val)
    
# Re-iteration of generator does not return any more values
print(gen)    
for val in gen:
    print(val)


Another-iteration
[0, 9, 36, 81]
0
9
36
81
<generator object <genexpr> at 0x7f9484065fa0>


For large arrays it may be more effeicent to create a generator function more than once instead of have a large list saved in memory.

## File Processing thought example

Say you had many files that needed information out of or some processing/transformation.
Say you wanted to extract some information from each of the files e.g. the time, coordinates, some other header information
Or you wanted to normalize spectra (spectra.fits) and save the result to a new file (spectra_normalised.fits)

If all the files (input and output) are independant and your processing automatic then you would probably loop over the files. This should be able to be parallelized.

### Warning nested parallel processes are probably not a good idea. 

In [None]:
#1/2 Code 1/2 Psudocode for many file example:
filenames = ["file1.txt", "file2.txt", ...., "fileN.txt"]
#Serial example
for fname in filenames:
    # Open file and load in data
    with open(fname,"r") as f:
        # Read in data
        data = f.readlines()
    # Do task
    ans = calculations(data)
    
    #Exctract some information and/or # Save to a file
    with open(savefile, "w") as g:
        # Output to file
        g.write(ans)
        
    return ans


# Turn the code inside the loop into its own function
def file_processing(filename, *args):
     # Open file and load in data
    with open(fname,"r") as f:
        # Read in data
        data = f.readlines()
    # Do task
    ans = calculations(data)
    
    #Exctract some information and/or # Save to a file
    with open(savefile, "w") as g:
        # Output to file
        g.write(ans)
        
    return ans


# Serial example with function  
for fname in filenames:
    file_processing(filename, *args)
# or as comprehension list
[file_processing(fname, *args) for fname in filenames]


# Parallel with joblib.
Parallel(n_jobs=2)(delayed(file_processing)(fname, *args) for fname in filenames)

# If you need to then you can write code to extract the results from all the separate savefiles


# Convolution Example
Convolution without loss of information from interpolation because we don't interpolate to equidistant wavelenght points. 

Adapted from code from Pedro Figueira

First off define some functions needed

In [21]:
import matplotlib.pyplot as plt

def wav_selector(wav, flux, wav_min, wav_max):
    """
    function that returns wavelength and flux withn a giving range
    """    
    wav_sel = np.array([value for value in wav if(wav_min < value < wav_max)], dtype="float64")
    flux_sel = np.array([value[1] for value in zip(wav,flux) if(wav_min < value[0] < wav_max)], dtype="float64")
    
    return [wav_sel, flux_sel]


def unitary_Gauss(x, center, FWHM):
    """
    Gaussian_function of area=1

    p[0] = A;
    p[1] = mean;
    p[2] = FWHM;
    """
    
    sigma = np.abs(FWHM) /( 2 * np.sqrt(2 * np.log(2)) );
    Amp = 1.0 / (sigma*np.sqrt(2*np.pi))
    tau = -((x - center)**2) / (2*(sigma**2))
    result = Amp * np.exp( tau );
    
    return result

def chip_selector(wav, flux, chip):
    chip = str(chip)
    if(chip in ["ALL", "all", "","0"]):
        chipmin = float(hdr1["HIERARCH ESO INS WLEN STRT1"])  # Wavelength start on detector [nm]
        chipmax = float(hdr1["HIERARCH ESO INS WLEN END4"])   # Wavelength end on detector [nm]
        #return [wav, flux]
    elif(chip == "1"):
        chipmin = float(hdr1["HIERARCH ESO INS WLEN STRT1"])  # Wavelength start on detector [nm]
        chipmax = float(hdr1["HIERARCH ESO INS WLEN END1"])   # Wavelength end on detector [nm]
    elif(chip == "2"):
        chipmin = float(hdr1["HIERARCH ESO INS WLEN STRT2"])  # Wavelength start on detector [nm]
        chipmax = float(hdr1["HIERARCH ESO INS WLEN END2"])   # Wavelength end on detector [nm]
    elif(chip == "3"):   
        chipmin = float(hdr1["HIERARCH ESO INS WLEN STRT3"])  # Wavelength start on detector [nm]
        chipmax = float(hdr1["HIERARCH ESO INS WLEN END3"])   # Wavelength end on detector [nm]
    elif(chip == "4"):   
        chipmin = float(hdr1["HIERARCH ESO INS WLEN STRT4"])  # Wavelength start on detector [nm]
        chipmax = float(hdr1["HIERARCH ESO INS WLEN END4"])   # Wavelength end on detector [nm]
    elif(chip == "Joblib_small"):   
        chipmin = float(2118)  # Wavelength start on detector [nm]
        chipmax = float(2119)  # Wavelength end on detector [nm]
    elif(chip == "Joblib_large"):   
        chipmin = float(2149)  # Wavelength start on detector [nm]
        chipmax = float(2157)  # Wavelength end on detector [nm]
    else:
        print("Unrecognized chip tag.")
        exit()
    
    #select values form the chip  
    wav_chip, flux_chip = wav_selector(wav, flux, chipmin, chipmax)
    
    return [wav_chip, flux_chip]



### Serial version of convolution
The computationally heavy part is the for loop over each wavelenght value

In [22]:

def convolution_serial(wav, flux, chip, R, FWHM_lim=5.0, plot=True):
    """Convolution code adapted from pedros code"""
    
    wav_chip, flux_chip = chip_selector(wav, flux, chip)
    #we need to calculate the FWHM at this value in order to set the starting point for the convolution
 
    FWHM_min = wav_chip[0]/R    #FWHM at the extremes of vector
    FWHM_max = wav_chip[-1]/R       
    
    
    #wide wavelength bin for the resolution_convolution
    wav_extended, flux_extended = wav_selector(wav, flux, wav_chip[0]-FWHM_lim*FWHM_min, wav_chip[-1]+FWHM_lim*FWHM_max) 
    wav_extended = np.array(wav_extended, dtype="float64")
    flux_extended = np.array(flux_extended, dtype="float64")
    
    print("Starting the Resolution convolution...")
    
    flux_conv_res = []
    counter = 0    
    for wav in wav_chip:
        # select all values such that they are within the FWHM limits
        FWHM = wav/R
        #print("FWHM of {0} calculated for wavelength {1}".format(FWHM, wav))
        indexes = [ i for i in range(len(wav_extended)) if ((wav - FWHM_lim*FWHM) < wav_extended[i] < (wav + FWHM_lim*FWHM))]
        flux_2convolve = flux_extended[indexes[0]:indexes[-1]+1]
        IP = unitary_Gauss(wav_extended[indexes[0]:indexes[-1]+1], wav, FWHM)
        flux_conv_res.append(np.sum(IP*flux_2convolve))
        if(len(flux_conv_res)%(len(wav_chip)//100 ) == 0):
            counter = counter+1
            print("Resolution Convolution at {}%%...".format(counter))
    flux_conv_res = np.array(flux_conv_res, dtype="float64")
    print("Done.\n")
    
    if(plot):
        fig=plt.figure(1)
        plt.xlabel(r"wavelength [ $\mu$m ])")
        plt.ylabel(r"flux [counts] ")
        plt.plot(wav_chip, flux_chip/np.max(flux_chip), color ='k', linestyle="-", label="Original spectra")
        plt.plot(wav_chip, flux_conv_res/np.max(flux_conv_res), color ='b', linestyle="-", label="Spectrum observed at and R=%d ." % (R))
        plt.legend(loc='best')
        plt.show() 
    return wav_chip, flux_conv_res


### Parallel version of convolution

In [23]:
# Function around bottleneck
def convolve(wav, R, wav_extended, flux_extended, FWHM_lim):
        # select all values such that they are within the FWHM limits
        FWHM = wav/R
        indexes = [ i for i in range(len(wav_extended)) if ((wav - FWHM_lim*FWHM) < wav_extended[i] < (wav + FWHM_lim*FWHM))]
        flux_2convolve = flux_extended[indexes[0]:indexes[-1]+1]
        IP = unitary_Gauss(wav_extended[indexes[0]:indexes[-1]+1], wav, FWHM)
        val = np.sum(IP*flux_2convolve) 
        unitary_val = np.sum(IP*np.ones_like(flux_2convolve))  # Effect of convolution onUnitary. For changing number of points
        return val/unitary_val
    
def convolution_parallel(wav, flux, chip, R, FWHM_lim=5.0, n_jobs=-1, verbose=5):
    """Convolution code adapted from pedros code"""
    
    wav_chip, flux_chip = chip_selector(wav, flux, chip)
    #we need to calculate the FWHM at this value in order to set the starting point for the convolution
    
    #print(wav_chip)
    #print(flux_chip)
    FWHM_min = wav_chip[0]/R    #FWHM at the extremes of vector
    FWHM_max = wav_chip[-1]/R       
    
    #wide wavelength bin for the resolution_convolution
    wav_extended, flux_extended = wav_selector(wav, flux, wav_chip[0]-FWHM_lim*FWHM_min, wav_chip[-1]+FWHM_lim*FWHM_max) 
    wav_extended = np.array(wav_extended, dtype="float64")
    flux_extended = np.array(flux_extended, dtype="float64")
    
    print("Starting the Parallel Resolution convolution...")
    
    parallel_result = Parallel(n_jobs=n_jobs, verbose=verbose)(delayed(convolve)(wav,R,wav_extended, flux_extended,FWHM_lim) for wav in wav_chip)
    flux_conv_res = np.array(parallel_result, dtype="float64")
    print("Done.\n")
    

    return wav_chip, flux_conv_res 



In [27]:
# Load data
import numpy as np
#wl, flux = np.loadtxt("Joblib_tapas.txt")  # 2117-2120 nm
wl, flux = np.loadtxt("Joblib_tapas_large.txt")  # 2145-2160 nm

Time a serial convolution

In [28]:
import time
import datetime
start = time.time()

# "Joblib_small"  # "Joblib_large"
x, y = convolution_serial(wl, flux, "Joblib_large", 50000, FWHM_lim=5.0, plot=False)
  
done = time.time()
elapsed = done - start
print("Convolution time = ", elapsed)

Starting the Resolution convolution...
Resolution Convolution at 1%%...
Resolution Convolution at 2%%...
Resolution Convolution at 3%%...
Resolution Convolution at 4%%...
Resolution Convolution at 5%%...
Resolution Convolution at 6%%...
Resolution Convolution at 7%%...
Resolution Convolution at 8%%...
Resolution Convolution at 9%%...
Resolution Convolution at 10%%...
Resolution Convolution at 11%%...
Resolution Convolution at 12%%...
Resolution Convolution at 13%%...
Resolution Convolution at 14%%...
Resolution Convolution at 15%%...
Resolution Convolution at 16%%...
Resolution Convolution at 17%%...
Resolution Convolution at 18%%...
Resolution Convolution at 19%%...
Resolution Convolution at 20%%...
Resolution Convolution at 21%%...
Resolution Convolution at 22%%...
Resolution Convolution at 23%%...
Resolution Convolution at 24%%...
Resolution Convolution at 25%%...
Resolution Convolution at 26%%...
Resolution Convolution at 27%%...
Resolution Convolution at 28%%...
Resolution Convolu

Time a parallel convolution

In [29]:
start = time.time()
# "Joblib_small", "Joblib_large"

x_par, y_par = convolution_parallel(wl, flux, "Joblib_large", 50000, FWHM_lim=5.0)
  
done = time.time()
elapsed = done - start
print("Convolution time = ", elapsed)

Starting the Parallel Resolution convolution...


[Parallel(n_jobs=-1)]: Done  88 tasks      | elapsed:    0.3s
[Parallel(n_jobs=-1)]: Done 2248 tasks      | elapsed:    4.2s
[Parallel(n_jobs=-1)]: Done 5848 tasks      | elapsed:   10.6s
[Parallel(n_jobs=-1)]: Done 10888 tasks      | elapsed:   18.7s


Done.

('Convolution time = ', 25.072216033935547)


[Parallel(n_jobs=-1)]: Done 15287 out of 15287 | elapsed:   24.9s finished


# Logging
Traceback example, note how the line of the error is indicated as well as the values of the parameter passed to the function that triggered the exception, even though the traceback happens in the child process.


In [31]:
from heapq import nlargest
from joblib import Parallel, delayed
Parallel(n_jobs=3)(delayed(nlargest)(2, n) for n in (range(4), 'abcde', 3)) 

JoblibTypeError: JoblibTypeError
___________________________________________________________________________
Multiprocessing exception:
...........................................................................
/home/jneal/anaconda2/lib/python2.7/runpy.py in _run_module_as_main(mod_name='ipykernel.__main__', alter_argv=1)
    157     pkg_name = mod_name.rpartition('.')[0]
    158     main_globals = sys.modules["__main__"].__dict__
    159     if alter_argv:
    160         sys.argv[0] = fname
    161     return _run_code(code, main_globals, None,
--> 162                      "__main__", fname, loader, pkg_name)
        fname = '/home/jneal/anaconda2/lib/python2.7/site-packages/ipykernel/__main__.py'
        loader = <pkgutil.ImpLoader instance>
        pkg_name = 'ipykernel'
    163 
    164 def run_module(mod_name, init_globals=None,
    165                run_name=None, alter_sys=False):
    166     """Execute a module's code without importing it

...........................................................................
/home/jneal/anaconda2/lib/python2.7/runpy.py in _run_code(code=<code object <module> at 0x7f94973d1eb0, file "/...2.7/site-packages/ipykernel/__main__.py", line 1>, run_globals={'__builtins__': <module '__builtin__' (built-in)>, '__doc__': None, '__file__': '/home/jneal/anaconda2/lib/python2.7/site-packages/ipykernel/__main__.py', '__loader__': <pkgutil.ImpLoader instance>, '__name__': '__main__', '__package__': 'ipykernel', 'app': <module 'ipykernel.kernelapp' from '/home/jneal/...python2.7/site-packages/ipykernel/kernelapp.pyc'>}, init_globals=None, mod_name='__main__', mod_fname='/home/jneal/anaconda2/lib/python2.7/site-packages/ipykernel/__main__.py', mod_loader=<pkgutil.ImpLoader instance>, pkg_name='ipykernel')
     67         run_globals.update(init_globals)
     68     run_globals.update(__name__ = mod_name,
     69                        __file__ = mod_fname,
     70                        __loader__ = mod_loader,
     71                        __package__ = pkg_name)
---> 72     exec code in run_globals
        code = <code object <module> at 0x7f94973d1eb0, file "/...2.7/site-packages/ipykernel/__main__.py", line 1>
        run_globals = {'__builtins__': <module '__builtin__' (built-in)>, '__doc__': None, '__file__': '/home/jneal/anaconda2/lib/python2.7/site-packages/ipykernel/__main__.py', '__loader__': <pkgutil.ImpLoader instance>, '__name__': '__main__', '__package__': 'ipykernel', 'app': <module 'ipykernel.kernelapp' from '/home/jneal/...python2.7/site-packages/ipykernel/kernelapp.pyc'>}
     73     return run_globals
     74 
     75 def _run_module_code(code, init_globals=None,
     76                     mod_name=None, mod_fname=None,

...........................................................................
/home/jneal/anaconda2/lib/python2.7/site-packages/ipykernel/__main__.py in <module>()
      1 
      2 
----> 3 
      4 if __name__ == '__main__':
      5     from ipykernel import kernelapp as app
      6     app.launch_new_instance()
      7 
      8 
      9 
     10 

...........................................................................
/home/jneal/anaconda2/lib/python2.7/site-packages/traitlets/config/application.py in launch_instance(cls=<class 'ipykernel.kernelapp.IPKernelApp'>, argv=None, **kwargs={})
    591         
    592         If a global instance already exists, this reinitializes and starts it
    593         """
    594         app = cls.instance(**kwargs)
    595         app.initialize(argv)
--> 596         app.start()
        app.start = <bound method IPKernelApp.start of <ipykernel.kernelapp.IPKernelApp object>>
    597 
    598 #-----------------------------------------------------------------------------
    599 # utility functions, for convenience
    600 #-----------------------------------------------------------------------------

...........................................................................
/home/jneal/anaconda2/lib/python2.7/site-packages/ipykernel/kernelapp.py in start(self=<ipykernel.kernelapp.IPKernelApp object>)
    437         
    438         if self.poller is not None:
    439             self.poller.start()
    440         self.kernel.start()
    441         try:
--> 442             ioloop.IOLoop.instance().start()
    443         except KeyboardInterrupt:
    444             pass
    445 
    446 launch_new_instance = IPKernelApp.launch_instance

...........................................................................
/home/jneal/anaconda2/lib/python2.7/site-packages/zmq/eventloop/ioloop.py in start(self=<zmq.eventloop.ioloop.ZMQIOLoop object>)
    157             PollIOLoop.configure(ZMQIOLoop)
    158         return PollIOLoop.current(*args, **kwargs)
    159     
    160     def start(self):
    161         try:
--> 162             super(ZMQIOLoop, self).start()
        self.start = <bound method ZMQIOLoop.start of <zmq.eventloop.ioloop.ZMQIOLoop object>>
    163         except ZMQError as e:
    164             if e.errno == ETERM:
    165                 # quietly return on ETERM
    166                 pass

...........................................................................
/home/jneal/anaconda2/lib/python2.7/site-packages/tornado/ioloop.py in start(self=<zmq.eventloop.ioloop.ZMQIOLoop object>)
    878                 self._events.update(event_pairs)
    879                 while self._events:
    880                     fd, events = self._events.popitem()
    881                     try:
    882                         fd_obj, handler_func = self._handlers[fd]
--> 883                         handler_func(fd_obj, events)
        handler_func = <function null_wrapper>
        fd_obj = <zmq.sugar.socket.Socket object>
        events = 1
    884                     except (OSError, IOError) as e:
    885                         if errno_from_exception(e) == errno.EPIPE:
    886                             # Happens when the client closes the connection
    887                             pass

...........................................................................
/home/jneal/anaconda2/lib/python2.7/site-packages/tornado/stack_context.py in null_wrapper(*args=(<zmq.sugar.socket.Socket object>, 1), **kwargs={})
    270         # Fast path when there are no active contexts.
    271         def null_wrapper(*args, **kwargs):
    272             try:
    273                 current_state = _state.contexts
    274                 _state.contexts = cap_contexts[0]
--> 275                 return fn(*args, **kwargs)
        args = (<zmq.sugar.socket.Socket object>, 1)
        kwargs = {}
    276             finally:
    277                 _state.contexts = current_state
    278         null_wrapper._wrapped = True
    279         return null_wrapper

...........................................................................
/home/jneal/anaconda2/lib/python2.7/site-packages/zmq/eventloop/zmqstream.py in _handle_events(self=<zmq.eventloop.zmqstream.ZMQStream object>, fd=<zmq.sugar.socket.Socket object>, events=1)
    435             # dispatch events:
    436             if events & IOLoop.ERROR:
    437                 gen_log.error("got POLLERR event on ZMQStream, which doesn't make sense")
    438                 return
    439             if events & IOLoop.READ:
--> 440                 self._handle_recv()
        self._handle_recv = <bound method ZMQStream._handle_recv of <zmq.eventloop.zmqstream.ZMQStream object>>
    441                 if not self.socket:
    442                     return
    443             if events & IOLoop.WRITE:
    444                 self._handle_send()

...........................................................................
/home/jneal/anaconda2/lib/python2.7/site-packages/zmq/eventloop/zmqstream.py in _handle_recv(self=<zmq.eventloop.zmqstream.ZMQStream object>)
    467                 gen_log.error("RECV Error: %s"%zmq.strerror(e.errno))
    468         else:
    469             if self._recv_callback:
    470                 callback = self._recv_callback
    471                 # self._recv_callback = None
--> 472                 self._run_callback(callback, msg)
        self._run_callback = <bound method ZMQStream._run_callback of <zmq.eventloop.zmqstream.ZMQStream object>>
        callback = <function null_wrapper>
        msg = [<zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>]
    473                 
    474         # self.update_state()
    475         
    476 

...........................................................................
/home/jneal/anaconda2/lib/python2.7/site-packages/zmq/eventloop/zmqstream.py in _run_callback(self=<zmq.eventloop.zmqstream.ZMQStream object>, callback=<function null_wrapper>, *args=([<zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>],), **kwargs={})
    409         close our socket."""
    410         try:
    411             # Use a NullContext to ensure that all StackContexts are run
    412             # inside our blanket exception handler rather than outside.
    413             with stack_context.NullContext():
--> 414                 callback(*args, **kwargs)
        callback = <function null_wrapper>
        args = ([<zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>],)
        kwargs = {}
    415         except:
    416             gen_log.error("Uncaught exception, closing connection.",
    417                           exc_info=True)
    418             # Close the socket on an uncaught exception from a user callback

...........................................................................
/home/jneal/anaconda2/lib/python2.7/site-packages/tornado/stack_context.py in null_wrapper(*args=([<zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>],), **kwargs={})
    270         # Fast path when there are no active contexts.
    271         def null_wrapper(*args, **kwargs):
    272             try:
    273                 current_state = _state.contexts
    274                 _state.contexts = cap_contexts[0]
--> 275                 return fn(*args, **kwargs)
        args = ([<zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>],)
        kwargs = {}
    276             finally:
    277                 _state.contexts = current_state
    278         null_wrapper._wrapped = True
    279         return null_wrapper

...........................................................................
/home/jneal/anaconda2/lib/python2.7/site-packages/ipykernel/kernelbase.py in dispatcher(msg=[<zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>])
    271         if self.control_stream:
    272             self.control_stream.on_recv(self.dispatch_control, copy=False)
    273 
    274         def make_dispatcher(stream):
    275             def dispatcher(msg):
--> 276                 return self.dispatch_shell(stream, msg)
        msg = [<zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>, <zmq.sugar.frame.Frame object>]
    277             return dispatcher
    278 
    279         for s in self.shell_streams:
    280             s.on_recv(make_dispatcher(s), copy=False)

...........................................................................
/home/jneal/anaconda2/lib/python2.7/site-packages/ipykernel/kernelbase.py in dispatch_shell(self=<ipykernel.ipkernel.IPythonKernel object>, stream=<zmq.eventloop.zmqstream.ZMQStream object>, msg={'buffers': [], 'content': {u'allow_stdin': True, u'code': u"from heapq import nlargest\nfrom joblib import...largest)(2, n) for n in (range(4), 'abcde', 3)) ", u'silent': False, u'stop_on_error': True, u'store_history': True, u'user_expressions': {}}, 'header': {'date': '2016-05-27T14:10:02.361629', u'msg_id': u'CF7C816B154B4D37B3912A33FB1A01B4', u'msg_type': u'execute_request', u'session': u'A599ABCE7EC34540BD77D366D6ED8603', u'username': u'username', u'version': u'5.0'}, 'metadata': {}, 'msg_id': u'CF7C816B154B4D37B3912A33FB1A01B4', 'msg_type': u'execute_request', 'parent_header': {}})
    223             self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
    224         else:
    225             self.log.debug("%s: %s", msg_type, msg)
    226             self.pre_handler_hook()
    227             try:
--> 228                 handler(stream, idents, msg)
        handler = <bound method IPythonKernel.execute_request of <ipykernel.ipkernel.IPythonKernel object>>
        stream = <zmq.eventloop.zmqstream.ZMQStream object>
        idents = ['A599ABCE7EC34540BD77D366D6ED8603']
        msg = {'buffers': [], 'content': {u'allow_stdin': True, u'code': u"from heapq import nlargest\nfrom joblib import...largest)(2, n) for n in (range(4), 'abcde', 3)) ", u'silent': False, u'stop_on_error': True, u'store_history': True, u'user_expressions': {}}, 'header': {'date': '2016-05-27T14:10:02.361629', u'msg_id': u'CF7C816B154B4D37B3912A33FB1A01B4', u'msg_type': u'execute_request', u'session': u'A599ABCE7EC34540BD77D366D6ED8603', u'username': u'username', u'version': u'5.0'}, 'metadata': {}, 'msg_id': u'CF7C816B154B4D37B3912A33FB1A01B4', 'msg_type': u'execute_request', 'parent_header': {}}
    229             except Exception:
    230                 self.log.error("Exception in message handler:", exc_info=True)
    231             finally:
    232                 self.post_handler_hook()

...........................................................................
/home/jneal/anaconda2/lib/python2.7/site-packages/ipykernel/kernelbase.py in execute_request(self=<ipykernel.ipkernel.IPythonKernel object>, stream=<zmq.eventloop.zmqstream.ZMQStream object>, ident=['A599ABCE7EC34540BD77D366D6ED8603'], parent={'buffers': [], 'content': {u'allow_stdin': True, u'code': u"from heapq import nlargest\nfrom joblib import...largest)(2, n) for n in (range(4), 'abcde', 3)) ", u'silent': False, u'stop_on_error': True, u'store_history': True, u'user_expressions': {}}, 'header': {'date': '2016-05-27T14:10:02.361629', u'msg_id': u'CF7C816B154B4D37B3912A33FB1A01B4', u'msg_type': u'execute_request', u'session': u'A599ABCE7EC34540BD77D366D6ED8603', u'username': u'username', u'version': u'5.0'}, 'metadata': {}, 'msg_id': u'CF7C816B154B4D37B3912A33FB1A01B4', 'msg_type': u'execute_request', 'parent_header': {}})
    386         if not silent:
    387             self.execution_count += 1
    388             self._publish_execute_input(code, parent, self.execution_count)
    389 
    390         reply_content = self.do_execute(code, silent, store_history,
--> 391                                         user_expressions, allow_stdin)
        user_expressions = {}
        allow_stdin = True
    392 
    393         # Flush output before sending the reply.
    394         sys.stdout.flush()
    395         sys.stderr.flush()

...........................................................................
/home/jneal/anaconda2/lib/python2.7/site-packages/ipykernel/ipkernel.py in do_execute(self=<ipykernel.ipkernel.IPythonKernel object>, code=u"from heapq import nlargest\nfrom joblib import...largest)(2, n) for n in (range(4), 'abcde', 3)) ", silent=False, store_history=True, user_expressions={}, allow_stdin=True)
    194 
    195         reply_content = {}
    196         # FIXME: the shell calls the exception handler itself.
    197         shell._reply_content = None
    198         try:
--> 199             shell.run_cell(code, store_history=store_history, silent=silent)
        shell.run_cell = <bound method ZMQInteractiveShell.run_cell of <ipykernel.zmqshell.ZMQInteractiveShell object>>
        code = u"from heapq import nlargest\nfrom joblib import...largest)(2, n) for n in (range(4), 'abcde', 3)) "
        store_history = True
        silent = False
    200         except:
    201             status = u'error'
    202             # FIXME: this code right now isn't being used yet by default,
    203             # because the run_cell() call above directly fires off exception

...........................................................................
/home/jneal/anaconda2/lib/python2.7/site-packages/IPython/core/interactiveshell.py in run_cell(self=<ipykernel.zmqshell.ZMQInteractiveShell object>, raw_cell=u"from heapq import nlargest\nfrom joblib import...largest)(2, n) for n in (range(4), 'abcde', 3)) ", store_history=True, silent=False, shell_futures=True)
   2718                 self.displayhook.exec_result = result
   2719 
   2720                 # Execute the user code
   2721                 interactivity = "none" if silent else self.ast_node_interactivity
   2722                 self.run_ast_nodes(code_ast.body, cell_name,
-> 2723                    interactivity=interactivity, compiler=compiler, result=result)
        interactivity = 'last_expr'
        compiler = <IPython.core.compilerop.CachingCompiler instance>
   2724 
   2725                 # Reset this so later displayed values do not modify the
   2726                 # ExecutionResult
   2727                 self.displayhook.exec_result = None

...........................................................................
/home/jneal/anaconda2/lib/python2.7/site-packages/IPython/core/interactiveshell.py in run_ast_nodes(self=<ipykernel.zmqshell.ZMQInteractiveShell object>, nodelist=[<_ast.ImportFrom object>, <_ast.ImportFrom object>, <_ast.Expr object>], cell_name='<ipython-input-31-03aeede4f6e0>', interactivity='last', compiler=<IPython.core.compilerop.CachingCompiler instance>, result=<IPython.core.interactiveshell.ExecutionResult object>)
   2826                     return True
   2827 
   2828             for i, node in enumerate(to_run_interactive):
   2829                 mod = ast.Interactive([node])
   2830                 code = compiler(mod, cell_name, "single")
-> 2831                 if self.run_code(code, result):
        self.run_code = <bound method ZMQInteractiveShell.run_code of <ipykernel.zmqshell.ZMQInteractiveShell object>>
        code = <code object <module> at 0x7f9451174730, file "<ipython-input-31-03aeede4f6e0>", line 3>
        result = <IPython.core.interactiveshell.ExecutionResult object>
   2832                     return True
   2833 
   2834             # Flush softspace
   2835             if softspace(sys.stdout, 0):

...........................................................................
/home/jneal/anaconda2/lib/python2.7/site-packages/IPython/core/interactiveshell.py in run_code(self=<ipykernel.zmqshell.ZMQInteractiveShell object>, code_obj=<code object <module> at 0x7f9451174730, file "<ipython-input-31-03aeede4f6e0>", line 3>, result=<IPython.core.interactiveshell.ExecutionResult object>)
   2880         outflag = 1  # happens in more places, so it's easier as default
   2881         try:
   2882             try:
   2883                 self.hooks.pre_run_code_hook()
   2884                 #rprint('Running code', repr(code_obj)) # dbg
-> 2885                 exec(code_obj, self.user_global_ns, self.user_ns)
        code_obj = <code object <module> at 0x7f9451174730, file "<ipython-input-31-03aeede4f6e0>", line 3>
        self.user_global_ns = {'In': ['', u'from math import sqrt\n[sqrt(i ** 2) for i in range(10)]', u'from joblib import Parallel, delayed\nParallel...obs=2)(delayed(sqrt)(i ** 2) for i in range(10))', u'from time import sleep\nfrom joblib import Par...bose=1)(delayed(sleep)(.1) for _ in range(100)) ', u'r = Parallel(n_jobs=2, verbose=5)(delayed(sleep)(.1) for _ in range(100)) ', u'r = Parallel(n_jobs=-1, verbose=10)(delayed(sleep)(.1) for _ in range(100)) ', u'def test_reuse():\n    """Test Reusing a pool ...synchronization barrier\n            n_iter += 1', u'def test_no_reuse():\n    """Test Showing Para...  # synchronization barrier\n        n_iter += 1', u"get_ipython().magic(u'timeit test_reuse()')", u"get_ipython().magic(u'timeit test_no_reuse()')", u'List = [x ** 2 for x in range(10) if (x%3) is 0]\nprint(List)\nfor val in List:\n    print(val)', u'gen = (x ** 2 for x in range(10) if (x%3) is 0)\nprint(gen)\nfor val in gen:\n    print(val)', u'List = [x ** 2 for x in range(10) if (x%3) is 0]\nprint(List)\nfor val in List:\n    print(val)', u'gen = (x ** 2 for x in range(10) if (x%3) is 0)\nprint(gen)\nfor val in gen:\n    print(val)', u'print("Another-iteration") \nprint(List)\nfor ...nprint(gen)    \nfor val in gen:\n    print(val)', u'import matplotlib.pyplot as plt\n\ndef wav_sel...chipmax)\n    \n    return [wav_chip, flux_chip]', u'\ndef convolution_serial(wav, flux, chip, R, F... plt.show() \n    return wav_chip, flux_conv_res', u'\ndef convolve(wav, R, wav_extended, flux_exte...n")\n    \n\n    return wav_chip, flux_conv_res ', u'# Load data\nimport numpy as np\nwl, flux = np...= np.loadtxt("Joblib_tapas.txt")  # 2145-2160 nm', u'# Function around bottleneck\ndef convolve(wav...n")\n    \n\n    return wav_chip, flux_conv_res ', ...], 'List': [0, 9, 36, 81], 'Out': {1: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0], 2: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]}, 'Parallel': <class 'joblib.parallel.Parallel'>, '_': [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0], '_1': [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0], '_2': [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0], '__': [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0], '___': '', '__builtin__': <module '__builtin__' (built-in)>, ...}
        self.user_ns = {'In': ['', u'from math import sqrt\n[sqrt(i ** 2) for i in range(10)]', u'from joblib import Parallel, delayed\nParallel...obs=2)(delayed(sqrt)(i ** 2) for i in range(10))', u'from time import sleep\nfrom joblib import Par...bose=1)(delayed(sleep)(.1) for _ in range(100)) ', u'r = Parallel(n_jobs=2, verbose=5)(delayed(sleep)(.1) for _ in range(100)) ', u'r = Parallel(n_jobs=-1, verbose=10)(delayed(sleep)(.1) for _ in range(100)) ', u'def test_reuse():\n    """Test Reusing a pool ...synchronization barrier\n            n_iter += 1', u'def test_no_reuse():\n    """Test Showing Para...  # synchronization barrier\n        n_iter += 1', u"get_ipython().magic(u'timeit test_reuse()')", u"get_ipython().magic(u'timeit test_no_reuse()')", u'List = [x ** 2 for x in range(10) if (x%3) is 0]\nprint(List)\nfor val in List:\n    print(val)', u'gen = (x ** 2 for x in range(10) if (x%3) is 0)\nprint(gen)\nfor val in gen:\n    print(val)', u'List = [x ** 2 for x in range(10) if (x%3) is 0]\nprint(List)\nfor val in List:\n    print(val)', u'gen = (x ** 2 for x in range(10) if (x%3) is 0)\nprint(gen)\nfor val in gen:\n    print(val)', u'print("Another-iteration") \nprint(List)\nfor ...nprint(gen)    \nfor val in gen:\n    print(val)', u'import matplotlib.pyplot as plt\n\ndef wav_sel...chipmax)\n    \n    return [wav_chip, flux_chip]', u'\ndef convolution_serial(wav, flux, chip, R, F... plt.show() \n    return wav_chip, flux_conv_res', u'\ndef convolve(wav, R, wav_extended, flux_exte...n")\n    \n\n    return wav_chip, flux_conv_res ', u'# Load data\nimport numpy as np\nwl, flux = np...= np.loadtxt("Joblib_tapas.txt")  # 2145-2160 nm', u'# Function around bottleneck\ndef convolve(wav...n")\n    \n\n    return wav_chip, flux_conv_res ', ...], 'List': [0, 9, 36, 81], 'Out': {1: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0], 2: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]}, 'Parallel': <class 'joblib.parallel.Parallel'>, '_': [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0], '_1': [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0], '_2': [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0], '__': [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0], '___': '', '__builtin__': <module '__builtin__' (built-in)>, ...}
   2886             finally:
   2887                 # Reset our crash handler in place
   2888                 sys.excepthook = old_excepthook
   2889         except SystemExit as e:

...........................................................................
/home/jneal/Phd/Codes/PC_Joblib_parrallelization/<ipython-input-31-03aeede4f6e0> in <module>()
      1 
      2 
----> 3 
      4 from heapq import nlargest
      5 from joblib import Parallel, delayed
      6 Parallel(n_jobs=3)(delayed(nlargest)(2, n) for n in (range(4), 'abcde', 3)) 
      7 
      8 
      9 
     10 

...........................................................................
/home/jneal/anaconda2/lib/python2.7/site-packages/joblib/parallel.py in __call__(self=Parallel(n_jobs=3), iterable=<generator object <genexpr>>)
    805             if pre_dispatch == "all" or n_jobs == 1:
    806                 # The iterable was consumed all at once by the above for loop.
    807                 # No need to wait for async callbacks to trigger to
    808                 # consumption.
    809                 self._iterating = False
--> 810             self.retrieve()
        self.retrieve = <bound method Parallel.retrieve of Parallel(n_jobs=3)>
    811             # Make sure that we get a last message telling us we are done
    812             elapsed_time = time.time() - self._start_time
    813             self._print('Done %3i out of %3i | elapsed: %s finished',
    814                         (len(self._output), len(self._output),

---------------------------------------------------------------------------
Sub-process traceback:
---------------------------------------------------------------------------
TypeError                                          Fri May 27 14:10:02 2016
PID: 3523                   Python 2.7.11: /home/jneal/anaconda2/bin/python
...........................................................................
/home/jneal/anaconda2/lib/python2.7/site-packages/joblib/parallel.py in __call__(self=<joblib.parallel.BatchedCalls object>)
     67     def __init__(self, iterator_slice):
     68         self.items = list(iterator_slice)
     69         self._size = len(self.items)
     70 
     71     def __call__(self):
---> 72         return [func(*args, **kwargs) for func, args, kwargs in self.items]
        func = <function nlargest>
        args = (2, 3)
        kwargs = {}
        self.items = [(<function nlargest>, (2, 3), {})]
     73 
     74     def __len__(self):
     75         return self._size
     76 

...........................................................................
/home/jneal/anaconda2/lib/python2.7/heapq.py in nlargest(n=2, iterable=3, key=None)
    458         if n >= size:
    459             return sorted(iterable, key=key, reverse=True)[:n]
    460 
    461     # When key is none, use simpler decoration
    462     if key is None:
--> 463         it = izip(iterable, count(0,-1))                    # decorate
        it = undefined
        iterable = 3
    464         result = _nlargest(n, it)
    465         return map(itemgetter(0), result)                   # undecorate
    466 
    467     # General case, slowest method

TypeError: izip argument #1 must support iteration
___________________________________________________________________________

# Joblibs Other tools

## Memory
Example from Joblib documentation showing the caching of input and outputs of the function sqaure().

When it is called with the same parameters again it jsut returns the result without recomputation.

In [32]:
from joblib import Memory
mem = Memory(cachedir='/tmp/joblib')
import numpy as np
a = np.vander(np.arange(101)).astype(np.float)
b = np.vander(np.arange(5)).astype(np.float)
square = mem.cache(np.square)


In [33]:
c = square(a) 

________________________________________________________________________________
[Memory] Calling square...
square(array([[ 0., ...,  1.],
       ..., 
       [ 0., ...,  1.]]))
___________________________________________________________square - 0.1s, 0.0min


In [34]:
d = square(b)

________________________________________________________________________________
[Memory] Calling square...
square(array([[   0.,    0.,    0.,    0.,    1.],
       [   1.,    1.,    1.,    1.,    1.],
       [  16.,    8.,    4.,    2.,    1.],
       [  81.,   27.,    9.,    3.,    1.],
       [ 256.,   64.,   16.,    4.,    1.]]))
___________________________________________________________square - 0.0s, 0.0min


In [35]:
e = square(a) # Does not recomute square(a)

In [36]:
f = square(b) # Does not recomute square(b)

Timing these calls to square shows that the second call of the function with the same inputs give a much faster result.

## Persistance
joblib.dump() and joblib.load() provide a replacement for pickle to work efficiently on Python objects containing large data, in particular large numpy arrays.

Filename is important here, .pkl will make a pickle like persistance
where as .mmap with make a memory map location for parallel process shared access.

In [41]:
from tempfile import mkdtemp
savedir = mkdtemp()
import os
#filename = os.path.join(savedir, 'test.pkl')      # Pickle version  
filename = os.path.join(savedir, 'test.mmap')      # Memmap version

In [42]:
#Then we create an object to be persisted:
import numpy as np
#to_persist = [('a', [1, 2, 3]), ('b', np.arange(10))]
to_persist = np.ones(int(1e6))

In [43]:
#which we save into savedir:
import joblib
joblib.dump(to_persist, filename)  

['/tmp/tmpUFOw5a/test.mmap', '/tmp/tmpUFOw5a/test.mmap_01.npy']

In [44]:
# We can then load the object from the file:
#joblib.load(filename)
pointer = joblib.load(filename, mmap_mode='r+')