# Trying out different tool to parallelize operations

## Usual imports

In [1]:
# Disable warnings
import warnings
warnings.filterwarnings('ignore')

# Time profiling
import cProfile
from   timeit import default_timer
import time 

# Combinatorics tool
import itertools

# Usual library
import numpy  as np
import pandas as pd

In [2]:
def CPUcosting_function(a,base=0):
    import numpy as np
    import time
    start = time.time() - base
    res=0
    Nloop=int(a[0]+a[1])
    for i in range(Nloop):
        res+= i**2 
        
    stop = time.time() - base
    return start,stop,res

big_array = [ [x,y] for (x,y) in itertools.combinations(range(2000),2) ]

Nmax=100000
big_array = big_array[0:Nmax]
print( len(big_array) )

# Example of function result on first 10th elements
resEx = [CPUcosting_function(x) for x in big_array[0:10]]
print([r[2] for r in resEx])

100000
[0, 1, 5, 14, 30, 55, 91, 140, 204, 285]


## Interlude: an example with map(func,array)

In [3]:
# Exemple with map to apply a function to each elements
items = [ [1,3], [2,3], [4,5] ]

def sqr(x):
    sum_squared=0
    Nloop = int(x[0]+x[1])
    for i in range( Nloop ):
        sum_squared+=i**2
    return sum_squared

list(map(sqr, items))

[14, 30, 204]

## Testing ipyparallel module

### Setting up workers

In [4]:
# Parallelization with ipyparallel
import ipyparallel

cluster = ipyparallel.Client(profile='default')
print( 'profile:', cluster.profile)
print( 'IDs:'    , cluster.ids    ) # Print process id numbers

('profile:', u'default')
('IDs:', [0, 1, 2, 3])


In [5]:
dview = cluster[:]
print(dview)

<DirectView [0, 1, 2, 3]>


### Time comparison

In [6]:
%timeit -n 1 list(map(CPUcosting_function, big_array))

1 loop, best of 3: 8.39 s per loop


In [7]:
%timeit -n 1 dview.map(CPUcosting_function, big_array)

1 loop, best of 3: 104 ms per loop


In [8]:
%timeit -n 1 np.apply_along_axis(CPUcosting_function, 1, big_array)

1 loop, best of 3: 9.69 s per loop


In [9]:
%timeit -n 1 dview.apply(CPUcosting_function, big_array)

1 loop, best of 3: 411 ms per loop


### Result comparison

In [10]:
res = list(map(CPUcosting_function, big_array))
print([r[2] for r in res][12:18])

[650, 819, 1015, 1240, 1496, 1785]


In [11]:
res = dview.map(CPUcosting_function, big_array)
print([r[2] for r in res][12:18])

[650, 819, 1015, 1240, 1496, 1785]


In [12]:
res = np.apply_along_axis(CPUcosting_function, 1, big_array)
print([int(r[2]) for r in res][12:18])

[650, 819, 1015, 1240, 1496, 1785]


In [13]:
res = dview.apply(CPUcosting_function, big_array)
print(res)
print([r[2] for r in res][12:15])

<AsyncResult: CPUcosting_function>


CompositeError: one or more exceptions from call to method: CPUcosting_function
[0:apply]: TypeError: int() argument must be a string or a number, not 'list'

## Testing concurrent module

This notebook is highly inspired from https://github.com/bfortuner/ml-study/blob/master/multitasking_python.ipynb

In [14]:
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

def multithreading(func, args, workers):
    begin_time = time.time()
    executor = ThreadPoolExecutor(max_workers=workers)
    exec_res = executor.map( func, args, [begin_time for i in range(len(args))] )
    return list(exec_res)

def multiprocessing(func, args, workers):
    begin_time = time.time()
    with ProcessPoolExecutor(max_workers=workers) as executor:
        exec_res = executor.map( func, args, [begin_time for i in range(len(args))] )
    return list(exec_res)

In [15]:
%timeit -n 1 multithreading(CPUcosting_function, big_array, 1)

1 loop, best of 3: 23.8 s per loop


In [16]:
%timeit -n 1 multithreading(CPUcosting_function, big_array, 4)

1 loop, best of 3: 31.5 s per loop


In [17]:
%timeit -n 1 multiprocessing(CPUcosting_function, big_array, 1)

1 loop, best of 3: 25.6 s per loop


In [18]:
%timeit -n 1 multiprocessing(CPUcosting_function, big_array, 4)

1 loop, best of 3: 16.3 s per loop
