# Multiprocessing and Pipelines
https://docs.python.org/3/library/multiprocessing.html

Ordinary sequential execution example:

In [8]:
import time
from os import getpid

def square(x):
    """Dummy method representing a complex computation."""
    print("x: {} pid: {}".format(x, getpid()))
    # simulate a long calculation
    time.sleep(1)
    return x**2

In [7]:
results = []

start = time.time()
results = [square(i) for i in range(5)]
end = time.time()

print("execution time: {}".format(end - start))
print(results)

x: 0 pid: 21073
x: 1 pid: 21073
x: 2 pid: 21073
x: 3 pid: 21073
x: 4 pid: 21073
execution time: 5.0085179805755615
[0, 1, 4, 9, 16]


## Process Pools
https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool

Submit tasks to a pool which then coordinates the execution.

There are 3 main uses for this class:
* apply_async - doing everything by hand
* map - parallel map equivalent
* starmap - like map, but accepts multiple arguments

### Using apply_async

(doing everythin by hand):

In [79]:
from multiprocessing import Pool

results = []
def collect_results(result):
    results.append(result)

start = time.time()
pool = Pool()
for i in range(5):
    pool.apply_async(square, args=(i, ), callback=collect_results)
pool.close() # prevents submitting of additional tasks, start execution
pool.join() # wait for all tasks to finish
end = time.time()

print("execution time: {}".format(end - start))
print(results)

execution time: 1.1349742412567139
[16, 9, 0, 4, 1]
x: 0 pid: 14875
x: 2 pid: 14877
x: 4 pid: 14879
x: 3 pid: 14878
x: 1 pid: 14876


#### Notes

* The results are not sorted!
* The calls of Pool.close() and Pool.join() are neccessary
    * close() prevents submitting of additional tasks and starts the execution of the tasks
    * join() waits for all submitted tasks to finish

### Using map

Map is a builtin funciton which returns an iterable with a given function applied to all elements of a given iterable.

In [4]:
def double_it(x):
    return x * 2

mapped = map(double_it, range(10))

print(mapped)
print(list(mapped))

<map object at 0x7f6c60fb49b0>
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]


There's also a Pool.map() function which resembles it's behaviour, but distributes the computation across multiple processes.

In [77]:
start = time.time()
with Pool() as pool:
    results = pool.map(square, range(5))
end = time.time()

print("execution time: {}".format(end - start))
print(results)

execution time: 1.1291108131408691
[0, 1, 4, 9, 16]
id: 14828
id: 14829
id: 14826
id: 14825
id: 14827


### Things to consider when running code in parallel

* Resources (RAM, HDD-throughput) must be available for x parallel runs
* Parallel tasks should be equal in "size"
* Printing to stdout might not be readable
* Access to variables shared amongh threads must be synchronized with semaphores (https://docs.python.org/3.6/library/multiprocessing.html#multiprocessing.Semaphore)


A fork of multiprocessing is available at:
https://github.com/uqfoundation/multiprocess

#### Dask
http://dask.pydata.org/en/latest/install.html

A dataframe composed by multiple pandas DataFrames. Tasks are distributed across multiple cores

## Pipelines

Until now:
* Tasks were done in separate steps like:
    * Read all csv files
    * Create all dataframes
    
Pipelines let you join subsequent tasks per dataset together. E.g. when analyzing text documents the different tasks could be:
* remove stop words
* determine stem words
* remove dublicates
As soon as the task "remove stop words" for document a is finished task "determine stem words" for document a is started. Same for document b.

## Student Task

Read content from a web source n-times.
Possible sources could be:
* generate x bytes with http://httpbin.org (https://httpbin.org/bytes/:n)
* use "http://www.google.com"

Compare the execution times

In [94]:
import time
from urllib.request import urlopen

ITERATIONS = 10
BYTES_TO_READ = 1024
URL_TO_READ = "https://httpbin.org/bytes/{}".format(BYTES_TO_READ)

def read_page(url=URL_TO_READ):
    with urlopen(url) as f:
        f.read()

start = time.time()
for i in range(ITERATIONS):
    read_page("http://www.google.com")
end = time.time()
print("serial: {}".format(end - start))

start = time.time()
pool = Pool()
for i in range(ITERATIONS):
    pool.apply_async(read_page, ("http://www.google.com", ))
pool.close()
pool.join()
end = time.time()
print("parallel: {}".format(end - start))


serial: 5.78494930267334
parallel: 1.2398285865783691
