<font size=6> <b> Advanced Python : week #6</b> </font>
<div class="alert alert-block alert-success">
   Advanced Python features <br>
    <ol>
        <li> multi-threading / multi-processing </li>
    </ol>
</div>

<p style="text-align:right;"> sumyeon@gmail.com </p>

# Multi-threading / Multi-processing

> GIL (Global Interpreter Lock) 
- GIL prevents mulitple threads of Python code from running simultaneously
- In python, only one thread can acquire the GIL, and other threads must acquire it before running. => no parallel execution
- Howver, most external C-based functions release the GIL before running => most I/O related functions are for threading

> Multi-threading vs. Multi-processing
- it is very complicated whether to use parallelism and also which way - ex) multi-threading vs. multi-processing
- the simplest way to determine the way is "multi-threading for I/O intensive works and multi-processing for computation intensive works"

## How to use - Summary

### basic API
- multiprocessing : multiprocessing.Process
 > process create, start, waiting <br>
 > NOTE) communication among processes : Queue/Pipe
- threading : threading.Thread
 > thread create, start, wait
 

### simple API
- multiprocessing.Pool : Processing Pool
 > processing pool api for simpler use of process
- multiprocesssing.pool.ThreadPool (= multiprocessing.dummy.Pool) : ThreadPool
 > threading pool api for simpler use of thread
 

### simplest API
- concurrent.futures.ThreadPoolExecutor/ProcessingPoolExecutor : Processing/Thread Pool Executor
 > simplest multi-processing/threading API 

## multiprocessing 
- a package that supports spawning processing 
 > multiprocessing.Process for spawning a new process <br>
 > (process.)start() for start, and (process.)join() to wait for the end of execution <br>

In [1]:
%%writefile multiprocessing_sample.py
import time
import multiprocessing


def calc_square(numbers):
    for i in numbers:
        time.sleep(3)  # artificial time-delay
        print('square: ', str(i * i))


def calc_cube(numbers):
    for i in numbers:
        time.sleep(3)
        print('cube: ', str(i * i * i))


if __name__ == "__main__":
    arr = [2, 3, 8, 9]
    
    # creating two Process here p1 & p2
    p1 = multiprocessing.Process(target=calc_square, args=(arr,))
    p2 = multiprocessing.Process(target=calc_cube, args=(arr,))
    
    start = time.time()
    # starting Processes here parallel by using start function.
    p1.start()
    p2.start()
    
    # this join() will wait until the calc_square() function is finished.
    p1.join()
    # this join() will wait unit the calc_cube() function is finished.
    p2.join()
    
    print("Successes!")
    print("Elapsed time: ", time.time() - start)

Overwriting multiprocessing_sample.py


#### please open a terminal and run the multiprocessing_sample.py

### threading
- a package that supports spawning threads
 > threading.Thread for spawning a new process <br>
 > (therad.)start() for start, and (thread.)join() to wait for the end of execution <br>

In [2]:
import time
import threading


def calc_square(numbers):
    print("Calculate square numbers: ")
    for i in numbers:
        time.sleep(2)  # artificial time-delay
        print('square: ', str(i * i))

def calc_cube(numbers):
    print("Calculate cube numbers: ")
    for i in numbers:
        time.sleep(2)
        print('cube: ', str(i * i * i))


if __name__ == "__main__":
    arr = [2, 3, 8, 9]
    
    # creating two threads here t1 & t2
    t1 = threading.Thread(target=calc_square, args=(arr,))
    t2 = threading.Thread(target=calc_cube, args=(arr,))
    
    start = time.time()
    # starting threads here parallel by using start function.
    t1.start()
    t2.start()
    
    # this join() will wait until the cal_square() function is finished.
    t1.join()
    # this join() will wait unit the cal_cube() function is finished.
    t2.join()
    
    print("Successes!")
    print("Elapsed time: ", time.time() - start)

Calculate square numbers: 
Calculate cube numbers: 
square:  4
cube:  8
square:  9
cube:  27
square:  64
cube:  512
square:  81
cube:  729
Successes!
Elapsed time:  8.043993949890137


### multiprocessing.Pool
- a convenient way of parallelizing the execution of a fuction across multiple input values, distributing the input data across processes

In [3]:
%%writefile mppool_example.py
from multiprocessing import Pool
import time

iterable = [1,2,3,4,5]

def f(x):
    time.sleep(2)
    return x*x

if __name__ == '__main__':
    start = time.time()
    with Pool(5) as pool:
        print(pool.map(f, iterable))  # pool.map return the list of return values
    print("Elapsed time: ", time.time() - start)

Overwriting mppool_example.py


#### please open a terminal and run the mppool_example.py

### multiprocesssing.pool.ThreadPool (= multiprocessing.dummy.Pool) 
- a convenient way of parallelizing the execution of a fuction across multiple input values, distributing the input data across threads

In [4]:
from multiprocessing.pool import ThreadPool
import time

iterable = [1,2,3,4,5]

def f(x):
    time.sleep(2)
    return x*x

if __name__ == '__main__':
    start = time.time()
    with ThreadPool(5) as p:
        results = p.map(f, iterable)
        print(results)
    print("Elapsed time: ", time.time() - start)

Overwriting mpthreadpool.py


#### please open a terminal and run the mpthreadpool.py

### Concurrent Futures
- simpler interface that works very much the same for both multi-threading & multi-processing
- in the below, we will show only ThreadPoolExecutor. way of using ProcessPoolExcutor will be pretty much same as that.

#### ThreadPoolExecutor
- provides useful methods to execute callables asynchrhonously
- <b>submit</b> : schedule the callable and return a Future which represents the execution of the callable
- <b>as_completed</b> : Future generator which yield fututures as ordered by their completion
- <b>map</b>    : schedule several execution of the callable on the given iterable, and then return an iterator of the results itself
- <b>shutdown</b> : signal the executor to free up resources it is using when the currently pening futures are done

In [5]:
import time
import concurrent

In [8]:
executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)

def f(x):
    time.sleep(10-x)
    print('f(',x,') completed')
    return x

start =  time.time()
a =  executor.submit(f, 1)
b =  executor.submit(f, 2)
c =  executor.submit(f, 3)
d =  executor.submit(f, 4)
print(f"total time {time.time()-start}")     

total time 0.0039942264556884766
f( 4 ) completed
f( 3 ) completed
f( 2 ) completed
f( 1 ) completed


In [9]:
executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)

start =  time.time()
a =  executor.submit(f, 1)
b =  executor.submit(f, 2)
c =  executor.submit(f, 3)
d =  executor.submit(f, 4)

print("f(4) result is ", d.result())
print(f"total time {time.time()-start}")     


f( 4 ) completed
f(4) result is  4
total time 6.018961191177368
f( 3 ) completed
f( 2 ) completed
f( 1 ) completed


### concurrent.futures.as_completed
- return future iterator which return the Future in the order of when they are done

In [10]:
import concurrent.futures

iterable = [1,2,3,4]
def f(x):
    time.sleep(10-x)
    return x

start = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    result_futures = list(map(lambda x : executor.submit(f,x), iterable))  # submit return Future
    for future in result_futures:
        print(future.result())
print(f"total time {time.time()-start}")        

1
2
3
4
total time 9.008634090423584


In [11]:
start = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    result_futures = list(map(lambda x : executor.submit(f,x), iterable))  # submit return Future
    for future in concurrent.futures.as_completed(result_futures):
        print(future.result())
print(f"total time {time.time()-start}")

4
3
2
1
total time 9.009706497192383


### map
- map apply the executor to an iterble and return the generator of return value

In [12]:
start = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    result_gen = executor.map(f,iterable)  # submit return Future
    for future in result_gen:
        print(future)
print(f"total time {time.time()-start}")

1
2
3
4
total time 9.005383968353271


#### ProcessPoolExecutor
-exactly same as ThreadPoolExecutor only except concurrent.futures.ProcessPoolExecutor

In [13]:
%%writefile processpool_sample.py

import time
import concurrent.futures

iterable = [1,2,3,4]

def f(x):
    time.sleep(10-x)
    return x

def main():
    start = time.time()
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
        result_futures = list(map(lambda x : executor.submit(f,x), iterable))  # submit return Future
        for future in concurrent.futures.as_completed(result_futures):
            print(future.result())
    print(f"total time {time.time()-start}")

if __name__ == '__main__':
    main()

Overwriting processpool_sample.py


#### Please open a terminal and run the command <it>"python processpool_sample.py"</it>

# [Example] Using multiprocessing/threading to make Pandas apply functions faster

In [14]:
import random
import pandas as pd
import numpy as np
from multiprocessing.pool import ThreadPool

def add_features(df):
    # computing intensive processing
    df['feature3'] = df['feature1'].apply(lambda x : x*x*x*x*x)
    df['feature4'] = df['feature2'].apply(lambda x : x*100)
    return df

datadf = pd.DataFrame({'feature1': [1,2,3,4,5,6,7,8], 
                       'feature2' : [11,12,13,14,15,16,17,18]}, 
                      index=['a','b','c','d','e','f','g','h'])

def parallelize_dataframe(df, func, n_cores=4):
    df_split = np.array_split(df, n_cores)
    pool = ThreadPool(n_cores)
    df = pd.concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    return df

complexdf = parallelize_dataframe(datadf, add_features)
complexdf

Unnamed: 0,feature1,feature2,feature3,feature4
a,1,11,1,1100
b,2,12,32,1200
c,3,13,243,1300
d,4,14,1024,1400
e,5,15,3125,1500
f,6,16,7776,1600
g,7,17,16807,1700
h,8,18,32768,1800


In [15]:
%%writefile fasterpandas.py

import random
import pandas as pd
import numpy as np
from multiprocessing import  Pool

def add_features(df):
    # computing intensive processing
    df['feature3'] = df['feature1'].apply(lambda x : x*x*x*x*x)
    df['feature4'] = df['feature2'].apply(lambda x : x*100)
    return df

datadf = pd.DataFrame({'feature1': [1,2,3,4,5,6,7,8], 'feature2' : [11,12,13,14,15,16,17,18]}, index=['a','b','c','d','e','f','g','h'])

def parallelize_dataframe(df, func, n_cores=4):
    df_split = np.array_split(df, n_cores)
    pool = Pool(n_cores)
    df = pd.concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    return df

if __name__ == '__main__':
    complexdf = parallelize_dataframe(datadf, add_features)
    print(complexdf)

Overwriting fasterpandas.py


# [Reference] Communication among processes

<div class="alert alert-block alert-info">
    <b>Communication Examples Among Processes</b> <br>
    - queue 
<pre>
    from multiprocessing import Process, Queue <br>
                                                <br>
    def f(q): <br>
        q.put([42, None, 'hello']) <br>
                                                <br>
    if __name__ == '__main__': <br>
        q = Queue() <br>
        p = Process(target=f, args=(q,)) <br>
        p.start() <br>
        print(q.get())    # prints "[42, None, 'hello']" <br>
        p.join() <br>
</pre>
    <br>
    - pipes <br>
<pre>
   from multiprocessing import Process, Pipe <br>
                                               <br>
    def f(conn): <br>
        conn.send([42, None, 'hello']) <br>
        conn.close() <br>
        <br>
    if __name__ == '__main__': <br>
        parent_conn, child_conn = Pipe() <br>
        p = Process(target=f, args=(child_conn,)) <br>
        p.start() <br>
        print(parent_conn.recv())   # prints "[42, None, 'hello']" <br>
        p.join()  <br>
</pre>
    <br>
    - Shared Memory <br>
<pre>
    from multiprocessing import Process, Value, Array <br>
                    <br>
    def f(n, a):<br>
        n.value = 3.1415927<br>
        for i in range(len(a)):<br>
            a[i] = -a[i]<br>
<br>
    if __name__ == '__main__':<br>
        num = Value('d', 0.0)<br>
        arr = Array('i', range(10))<br>
<br>
        p = Process(target=f, args=(num, arr))<br>
        p.start()<br>
        p.join()<br>
<br>
        print(num.value)<br>
        print(arr[:])<br>
</pre>
    <br>
    - Server process <br>
<pre>
    from multiprocessing import Process, Manager<br>
<br>
    def f(d, l):<br>
        d[1] = '1'<br>
        d['2'] = 2<br>
        d[0.25] = None<br>
        l.reverse()<br>
<br>
    if __name__ == '__main__':<br>
        with Manager() as manager:<br>
            d = manager.dict()<br>
            l = manager.list(range(10))<br>
<br>
            p = Process(target=f, args=(d, l))<br>
            p.start()<br>
            p.join()<br>
<br>
            print(d)<br>
            print(l)<br>
</pre>
</div>