https://docs.python.org/3.4/library/multiprocessing.html

In [1]:
import sys
print(sys.version)
import pandas
print('pandas',pandas.__version__)
import numpy
print('numpy',numpy.__version__)
from multiprocessing import Process
from multiprocessing import Pool
import os
import time

3.6.6 | packaged by conda-forge | (default, Oct 12 2018, 14:08:43) 
[GCC 4.8.2 20140120 (Red Hat 4.8.2-15)]
pandas 0.23.4
numpy 1.13.3


# introduction to processes

Your computer automatically assigns a "process ID" (pid) to each process

https://en.wikipedia.org/wiki/Process_identifier

In [23]:
print('process ID =',os.getpid())

process ID = 798


In [33]:
# what launched this program?
print('parent process =', os.getppid())

parent process = 6


before using multiprocessing, inspect the processes of a normal python execution

In [35]:
def proc_info(some_str):
    print('  Now in "proc_info"')
    print('  arg =',some_str) # string that was passed in
#    print('  module name:', __name__)
    print('  process id:', os.getpid())

def myfunc(a_name):
    """ this function calls another function """
    print('Now in "myfunc"')
    print('process ID =',os.getpid())
    proc_info(a_name)
    print('end of "myfunc"')

In [36]:
myfunc('mary')

Now in "myfunc"
process ID = 798
  Now in "proc_info"
  arg = mary
  process id: 798
end of "myfunc"


<BR>
<BR>
<BR>

--> functions run in the current process
<BR>
<BR>


# launch a separate (serial) process<BR> using multiprocessing module

In [44]:
if __name__ == '__main__':
    print('process ID =',os.getpid())
    proc_info('cool stuff') # display info about this process
    
    print('\ncreate process')
    p = Process(target=myfunc, args=('bob',)) # from main, call a function
    
    print('\nstart process')
    p.start()

process ID = 798
  Now in "proc_info"
  arg = cool stuff
  process id: 798

create process

start process
Now in "myfunc"
process ID = 946
  Now in "proc_info"
  arg = bob
  process id: 946
end of "myfunc"


# multiprocessing for concurrent operations
Now that we've explored having multiple processes, use the processes concurrently

https://docs.python.org/3.4/library/multiprocessing.html#using-a-pool-of-workers

In [6]:
from multiprocessing import Pool

## _example_: map function to list

In [7]:
def sq_fun(x):
    """
    square the input
    """
    return x*x

In [47]:
%timeit [sq_fun(x) for x in [1, 2, 3]]

1.82 µs ± 345 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)


In [8]:
%%time

if __name__ == '__main__': # only the parent thread runs the pool
    
    with Pool(5) as p: # use of 'with' means we can avoid use of close
        print(p.map(sq_fun, [1, 2, 3]))

[1, 4, 9]
CPU times: user 33.5 ms, sys: 86.7 ms, total: 120 ms
Wall time: 245 ms


for "microseconds" (us, 1E-6) versus "milliseconds" (ms, 1E-3), see https://en.wikipedia.org/wiki/Metric_prefix#List_of_SI_prefixes

1 ms = 1000 us

--> Why did it take so much longer?

## _example_: dataframe operation

Objective: instead of applying a function to a dataframe row-by-row, 
split the dataframe into chunks and apply the function concurrently using multiple processes

First, create a random dataframe

https://stackoverflow.com/questions/32752292/how-to-create-a-data-frame-of-random-integers-with-pandas

In [9]:
df = pandas.DataFrame(
    numpy.random.randint(0,100,size=(100, 4)), 
    columns=list('ABCD')
    )

In [10]:
print('df shape:',df.shape)
df.head()

df shape: (100, 4)


Unnamed: 0,A,B,C,D
0,72,87,20,17
1,71,60,26,26
2,12,85,4,63
3,71,38,31,88
4,56,14,87,96


http://www.racketracer.com/2016/07/06/pandas-in-parallel/

In [11]:
num_partitions = 7 #number of partitions to split dataframe
num_cores = 4 #number of cores on your machine

In [12]:
def parallelize_dataframe(df, func,num_cores):
    """
    Apply a function to chunks of a dataframe concurrently
    """
    df_split = numpy.array_split(df, num_partitions)
    with Pool(num_cores) as pool:
        df = pandas.concat(pool.map(func, df_split))
    return df

In [13]:
def multiply_columns(data):
    data['K'] = data['A'].apply(lambda x: x*2)
    return data

In [14]:
%time df_modified = parallelize_dataframe(df, multiply_columns,num_cores)

CPU times: user 54.8 ms, sys: 69 ms, total: 124 ms
Wall time: 208 ms


In [15]:
df_modified.head()

Unnamed: 0,A,B,C,D,K
0,72,87,20,17,144
1,71,60,26,26,142
2,12,85,4,63,24
3,71,38,31,88,142
4,56,14,87,96,112


compare the above method of split/apply/combine to just "apply"

In [16]:
%time df['K'] = df['A'].apply(lambda x: x*2)

CPU times: user 14.9 ms, sys: 10 ms, total: 25 ms
Wall time: 60.3 ms


Applying the function to chunks took much more time! 

_Reason_: `multiprocessing` has to serialize the data, send the serialized data to each thread, then apply the transformation, then send the data back to main

# --> When would concurrency be useful?

Need to be able to amortize cost of concurrency

For your reference, here's how I typically structure my concurrency

In [17]:
import multiprocessing
import time

In [18]:
def serial_func(arg1):
    return arg1*2

In [19]:
%time list(map(serial_func,[3,4,5]))

CPU times: user 24 µs, sys: 0 ns, total: 24 µs
Wall time: 34.6 µs


[6, 8, 10]

In [20]:
if __name__ == '__main__':
    start_time=time.time()
    res_list=[]
    with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:   # start worker processes
        res_list = pool.map(serial_func,[3,4,5]) # see https://docs.python.org/3/library/multiprocessing.html

    print('main elapsed:',time.time()-start_time,'seconds')

main elapsed: 0.3887605667114258 seconds


In [21]:
res_list

[6, 8, 10]