This notebook pretty much works through every example in the introductions section of [the official multiprocessing docs page](https://docs.python.org/2/library/multiprocessing.html).

In [36]:
import numpy as np
import os
import time

In [37]:
import multiprocessing as mp

# 1. [Introduction](https://docs.python.org/2/library/multiprocessing.html#introduction)

In [38]:
def f( x ):
  return x*x

In [39]:
# Sets up a pool
p = mp.Pool( 5 )

In [40]:
# Maps to the pool
p.map( f, [1, 2, 3] )

[1, 4, 9]

## 1.1 [The `Process` Class](https://docs.python.org/2/library/multiprocessing.html#the-process-class)

### Simple example

In [41]:
def f( name ):
  print 'Hello {}'.format( name )
  time.sleep(3)
  
  return 5

In [42]:
p = mp.Process( target=f, args=('Doug',) )

In [43]:
# Start up the process
p.start()

In [44]:
# When the process is finished close it out and display the results
p.join()

Hello Doug


### Expanded example

In [45]:
def info( title ):
  print( title )
  print( 'module name: {}'.format( __name__ ) )
  print( 'parent process id: {}'.format( os.getppid() ) )
  print( 'process id: {}'.format( os.getpid() ) )

In [46]:
def f( name ):
  info( 'function f' )
  print( 'Hello {}'.format( name ) )

In [47]:
info('main line')
print( '' )
p = mp.Process( target=f, args=('Ja-Net',), )
p.start()
p.join()

main line
module name: __main__
parent process id: 3872
process id: 3909

function f
module name: __main__
parent process id: 3909
process id: 10118
Hello Ja-Net


## 1.2 [Exchanging objects between processes](https://docs.python.org/2/library/multiprocessing.html#exchanging-objects-between-processes)

### Queues
These send things one way.

In [48]:
def f(q):
  q.put( [35, np.nan, 'good bye' ] )

In [49]:
q = mp.Queue()

In [50]:
p = mp.Process( target=f, args=(q,) )

In [51]:
p.start()

In [52]:
q.get()

[35, nan, 'good bye']

In [53]:
p.join()

This is neat, but I think `Pool.map()` usually serves the same purpose, but easier to use.

### Pipes
These can send things both ways

In [54]:
def f( conn ):
  conn.send( [35, np.nan, 'good bye' ] )
  print conn.recv()
  conn.close()

In [55]:
parent_conn, child_conn = mp.Pipe()

In [56]:
p = mp.Process( target=f, args=(child_conn,) )

In [57]:
p.start()

In [58]:
# Get what was sent from the spawned process
parent_conn.recv()

[35, nan, 'good bye']

In [59]:
# Send something to the spawned process
parent_conn.send( [ 'other', 'stuff', np.random.rand(2,2) ] )

In [60]:
p.join()

['other', 'stuff', array([[ 0.09047607,  0.37169938],
       [ 0.46153602,  0.82224875]])]


## 1.3 [Synchronization between processes](https://docs.python.org/2/library/multiprocessing.html#synchronization-between-processes)

### Shared Memory

In [61]:
def f( l, i ):
  l.acquire()
  print( 'Hello world, no. {}'.format( i ) )
  time.sleep(5)
  l.release()

In [62]:
lock = mp.Lock()

In [63]:
p = mp.Process( target=f, args=(lock, 0) )

In [64]:
p1 = mp.Process( target=f, args=(lock, 1) )

In [65]:
p.start()

In [66]:
p1.start()

Hello world, no. 0


In [67]:
%%time
p1.join()

CPU times: user 10 ms, sys: 3 ms, total: 13 ms
Wall time: 9.84 s
Hello world, no. 1


In [68]:
p.join()

As you can see, the wall clock time for the the timed process is $\sim$ twice what the actual sleep time is, because its been locked from moving on.

## 1.4 [Sharing state between processes](https://docs.python.org/2/library/multiprocessing.html#sharing-state-between-processes)

In [69]:
def f( n, a ):
  n.value = 1.666666667
  for i in range(len(a)):
    a[i] = -a[i]

In [70]:
num = mp.Value( 'd', 0.0 )
arr = mp.Array( 'f', np.random.rand(5) )

In [71]:
p = mp.Process( target=f, args=(num, arr) )
p.start()
p.join()

In [72]:
num.value

1.666666667

In [73]:
arr[:]

[-0.7272351980209351,
 -0.22020989656448364,
 -0.956666111946106,
 -0.49456554651260376,
 -0.1517244130373001]

### Custom example

In [74]:
def f( shared_arr, i ):
  shared_arr[i] = float( i + 1 )**2.

In [75]:
n_procs = 2
arr = mp.Array( 'f', np.zeros(n_procs) )

In [76]:
p = mp.Process( target=f, args=(arr, 0 ))
p.start()
p.join()

In [77]:
p = mp.Process( target=f, args=(arr, 1 ))
p.start()
p.join()

In [78]:
arr[:]

[1.0, 4.0]

If you play around with which process to run above, you can see it succesfully only modifies part of the array.

A more general example is below.

In [79]:
n_procs = 5
arr = mp.Array( 'f', np.zeros(n_procs) )

In [80]:
for i in range(n_procs):
  p = mp.Process( target=f, args=(arr, i ))
  p.start()
  p.join()

In [81]:
arr[:]

[1.0, 4.0, 9.0, 16.0, 25.0]

### Shared Value with Pool

In [82]:
def f( args ):
  shared_arr, i = args
  
  shared_arr[i] = float( i + 1 )**2.

In [83]:
pool = mp.Pool( 4 )

In [84]:
arr_size=5
arr = mp.Array( 'f', np.zeros(arr_size) )

In [85]:
args = [ (arr, i) for i in range(arr_size) ]

In [86]:
pool.map( f, args )

RuntimeError: SynchronizedArray objects should only be shared between processes through inheritance

Looks like one can't use `SynchronizedArray` objects in `mp.Pool`.

### Server process
i.e. using `multiprocessing.Manager`.

In [87]:
def f( d,  l ):
  d[1] = 'dog'
  d['2'] = None
  d[0.25] = 15
  
  l.reverse()

In [88]:
manager = mp.Manager()

In [89]:
d = manager.dict( { 'b' : 1. } )

In [90]:
l = manager.list( range(10) )

In [91]:
p = mp.Process( target=f, args=( d, l ) )
p.start()
p.join()

In [92]:
print d

{'2': None, 1: 'dog', 'b': 1.0, 0.25: 15}


In [93]:
print l

[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]


### Dictionary of Shared Arrays
Or my attempt to do so...

In [94]:
def f( d, i ):
  
  d['a'][i] = 2.*float( i + 1 )
  d['b'][i] = float( i + 1 )**2.

In [95]:
size_arrs = 6
d = { 'a' : mp.Array( 'f', np.zeros(size_arrs) ), 'b' : mp.Array( 'f', np.zeros(size_arrs) ) }

In [96]:
ps = [ mp.Process( target=f, args=( d, i ) ) for i in range(size_arrs) ]

In [97]:
[ p.start() for p in ps ]

[None, None, None, None, None, None]

In [98]:
[ p.join() for p in ps ]

[None, None, None, None, None, None]

In [99]:
for key in d.keys():
  print d[key][:]

[2.0, 4.0, 6.0, 8.0, 10.0, 12.0]
[1.0, 4.0, 9.0, 16.0, 25.0, 36.0]


It worked!

## 1.5 [Using a pool of workers](https://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers)

In [100]:
def f( x ):
  return x*x

In [101]:
pool = mp.Pool( processes=4 )

In [102]:
pool.map( f, range(10) )

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

In [103]:
for i in pool.imap_unordered( f, range(10) ):
  print i

0
1
4
9
16
25
36
49
64
81


# `parmap` Examples

In [104]:
import galaxy_diver.utils.mp_utils as mp_utils

## Basic Example

In [105]:
def f( args ):
  
  i, j = args
  
  return i, j

In [106]:
all_args = [ ( i, i**2 ) for i in range( 10 ) ]

In [107]:
mp_utils.parmap( f, all_args, nprocs=2 )

[(0, 0),
 (1, 1),
 (2, 4),
 (3, 9),
 (4, 16),
 (5, 25),
 (6, 36),
 (7, 49),
 (8, 64),
 (9, 81)]

## Shared Array
Here I try to use a shared array.

In [108]:
def f( args ):
  
  arr, i = args
  
  num = float( i + 1 )
  
  arr[i] = num**2.
  
  return i

In [109]:
size_arr = 10
arr = mp.Array( 'f', np.zeros(size_arr ) )

In [110]:
all_args = [ ( arr, i ) for i in range( size_arr ) ]

In [111]:
mp_utils.parmap( f, all_args, nprocs=2 )

RuntimeError: SynchronizedArray objects should only be shared between processes through inheritance

It seems to hang...
This is because trying this throws the following error, just hidden inside q_in:

```RuntimeError: SynchronizedArray objects should only be shared between processes through inheritance```

# Experimentation

## Split Arguments Up Among Processors

In [112]:
import galaxy_diver.utils.utilities as utilities

In [113]:
def f( args_chunk ):
  
  for args in args_chunk:
    d, i = args

    num = float( i + 1 )

    d['a'][i] = num**2.
    d['b'][i] = num*2.

In [114]:
size_arr = 10
d = {
  'a' : mp.Array( 'f', np.zeros(size_arr ) ),
  'b' : mp.Array( 'f', np.zeros(size_arr ) ),
}

In [115]:
n_procs = 3
all_args = [ ( d, i ) for i in range( size_arr ) ]

In [116]:
chunked_args = utilities.chunk_list( all_args, n_procs )

In [117]:
ps = [ mp.Process( target=f, args=(args_chunk,) ) for args_chunk in chunked_args ]

In [118]:
[ p.start() for p in ps ]

[None, None, None]

In [119]:
[ p.join() for p in ps ]

[None, None, None]

In [120]:
for key in d.keys():
  print d[key][:]

[1.0, 4.0, 9.0, 16.0, 25.0, 36.0, 49.0, 64.0, 81.0, 100.0]
[2.0, 4.0, 6.0, 8.0, 10.0, 12.0, 14.0, 16.0, 18.0, 20.0]


That worked quite well! Seems like, if I want to pass a long list of arguments the easiest option is to break this list up.
I've implemented this in `galaxy_diver.utils.mp_utils.apply_among_processors()`.

## Mocking and multiprocessing

In [127]:
from mock import patch, call

import galaxy_diver.utils.mp_utils as mp_utils

In [122]:
def f( x ):
  return x*x + 10

### Unmocked

In [123]:
def test_parmap_and_mock():
   
  results = mp_utils.parmap( f, range(4), 2 )
  
  return results

In [124]:
test_parmap_and_mock()

[10, 11, 14, 19]

### Mocked

In [145]:
@patch( '__main__.f' )
def test_parmap_and_mock( n_results, mock_f ):
  
  mock_f.side_effect = lambda x:  x
  
  results = mp_utils.parmap( f, range( n_results ), 2 )
  
  # Uncommenting this will break the code.
#   calls = [ call(i) for i in range( n_results ) ]
#   mock_f.assert_has_calls( calls, )
  
  return results

[autoreload of galaxy_diver.utils.mp_utils failed: Traceback (most recent call last):
  File "/opt/apps/intel13/python/2.7.9/lib/python2.7/site-packages/IPython/extensions/autoreload.py", line 247, in check
    superreload(m, reload, self.old_objects)
NameError: name 'profile' is not defined
]


In [133]:
test_parmap_and_mock( 10 )

AssertionError: Calls not found.
Expected: [call(0), call(1), call(2), call(3), call(4), call(5), call(6), call(7), call(8), call(9)]
Actual: []

Note that because we can't guarantee the order items are called in for mock, we need to make the side effect a function that depends on the input.
If we used `mock_f.side_effect = range( n_results )`, we would get erroneous results.

Also, one can't straightforwardly use `mock`.

In [190]:
isinstance( q, list )

False

In [193]:
[ set( [ 1, 2, 3,] ) ] + [ set( [ 1, 4, 5] ) ]

[{1, 2, 3}, {1, 4, 5}]

In [194]:
range( 4)

[0, 1, 2, 3]