# Parallelism

In [1]:
import pandas as pd
import numpy as np

# Silly example

In [2]:
import time

def my_sleep(x):
    '''
    Sleeps for x-seconds and returns the result x
    '''
    print(f'Sleeping for {x} seconds.')
    time.sleep(x)
    print(f'Returning {x}')
    return x

In [3]:
my_sleep(5)

Sleeping for 5 seconds.
Returning 5


5

In [4]:
my_list = [1,2,3,4]

In [5]:
sum(my_list)

10

In [6]:
from tqdm.auto import tqdm

## Serial code

In [9]:
for item in tqdm(my_list):
    my_sleep(item)

HBox(children=(FloatProgress(value=0.0, max=4.0), HTML(value='')))

Sleeping for 1 seconds.
Returning 1
Sleeping for 2 seconds.
Returning 2
Sleeping for 3 seconds.
Returning 3
Sleeping for 4 seconds.
Returning 4



In [10]:
map(my_sleep, my_list)

<map at 0x10eac0040>

In [11]:
# magic commands

In [12]:
%%time

list(map(my_sleep, my_list))

Sleeping for 1 seconds.
Returning 1
Sleeping for 2 seconds.
Returning 2
Sleeping for 3 seconds.
Returning 3
Sleeping for 4 seconds.
Returning 4
CPU times: user 7.04 ms, sys: 2.83 ms, total: 9.88 ms
Wall time: 10 s


[1, 2, 3, 4]

## Parallel code

In [13]:
from multiprocessing import Pool, cpu_count

cpu_count()

4

## You have to create a pool of `n` process.

In [14]:
pool = Pool(processes=4)

### We'll `use the magic function` here to measure the velocity of this code in parallel.

However, if you run this code, watch what happens:

In [25]:
%%time

result = pool.map(my_sleep, my_list)
pool.terminate()

KeyboardInterrupt: 

## This happens because multiprocessing not always (?) work in Jupyter Notebooks. 

_Some versions of linux or macbooks may handle it well (yay unix)_. But certainly it doesn't work for Windows.

### What should we do then? Two solutions.

1. We have to write our functions inside a `.py` file.

2. Install `multiprocess` (note it is different from Python's `multiprocessing` module)

In [27]:
import sys

In [29]:
sys.path.insert(0, '/Users/andreaguiar/Desktop/usr/dist/')

In [30]:
sys.path

['/Users/andreaguiar/Desktop/usr/dist/',
 '/Users/andreaguiar/Desktop/usr/dev/ironhack/ft202002/classes/week4_remote/2_Parallelization',
 '/Library/Frameworks/Python.framework/Versions/3.8/lib/python38.zip',
 '/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8',
 '/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/lib-dynload',
 '',
 '/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages',
 '/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/ironhack-0.0.1-py3.8.egg',
 '/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/termcolor-1.1.0-py3.8.egg',
 '/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/IPython/extensions',
 '/Users/andreaguiar/.ipython']

In [16]:
from sleeper import my_sleep

In [17]:
%%time

result = pool.map(my_sleep, my_list)
pool.terminate()

CPU times: user 3.28 ms, sys: 2.08 ms, total: 5.36 ms
Wall time: 4.03 s


In [36]:
pool = Pool(processes=2)

Sleeping for 1 seconds.
Sleeping for 2 seconds.
Returning 1
Sleeping for 3 seconds.
Returning 2
Sleeping for 4 seconds.
Returning 3
Returning 4


In [37]:
%%time

result = pool.map(my_sleep, my_list)
pool.terminate()

CPU times: user 11.5 ms, sys: 7.43 ms, total: 19 ms
Wall time: 6.03 s


In [20]:
result

[1, 2, 3, 4]

In [None]:
!pip install multiprocess

## using multiprocess


In [7]:
# using multiprocess instead of multiprocessing
from multiprocess import Pool

In [8]:
pool = Pool(processes=4)

Sleeping for 1 seconds.
Sleeping for 3 seconds.
Sleeping for 8 seconds.
Sleeping for 6 seconds.
Returning 1
Returning 3
Returning 6
Returning 8


In [9]:
%%time

result = pool.map(my_sleep, [1,3,6,8])
pool.terminate()

CPU times: user 9.51 ms, sys: 5.81 ms, total: 15.3 ms
Wall time: 8.03 s


In [19]:
print('oi')

oi


# Running Asynchronous code

## What is asynchrony?

- `result.ready()`
- `result.wait()`
- `result.get()`

In [38]:
pool = Pool(processes=4)

Sleeping for 60 seconds.
Sleeping for 60 seconds.
Sleeping for 60 seconds.
Sleeping for 60 seconds.
Returning 60
Returning 60
Returning 60
Returning 60


Process ForkPoolWorker-34:
Process ForkPoolWorker-32:
Process ForkPoolWorker-31:
Process ForkPoolWorker-33:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):


In [39]:
%%time

result = pool.map_async(my_sleep, [60, 60, 60, 60, 60, 60, 60, 60, 60])

CPU times: user 66 µs, sys: 36 µs, total: 102 µs
Wall time: 112 µs


In [72]:
result.ready()

True

In [34]:
print('Do something that doesn"t depend on result')
print('...')
print('Now the time came when the result is needed.')
result.wait()

result_list = result.get()
pool.terminate()
print(f'Now go on and use the results obtained - {result_list}')

Do something that doesn"t depend on result
...
Now the time came when the result is needed.
Now go on and use the results obtained - [1, 3, 5, 8, 9, 10, 12]


# CPU intensive computations

In [75]:
def square(x):
    return x ** 2

In [76]:
n = 1000000

In [77]:
%%timeit
    
result = [square(item) for item in np.random.random(size=n)]

659 ms ± 52.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [None]:
%%time
    
result = [square(item) for item in np.random.random(size=n)]

In [81]:
pool = Pool(processes=4)

Process ForkPoolWorker-49:
Process ForkPoolWorker-50:
Process ForkPoolWorker-47:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/multiprocess/process.py", line 313, in _bootstrap
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/multiprocess/process.py", line 313, in _bootstrap
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/multiprocess/process.py", line 313, in _bootstrap
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/multiprocess/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/multiprocess/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Li

In [None]:
random_numbers = np.random.random(size=n)

In [None]:
pool = Pool(process=4)

In [None]:
%%time

result = pool.map(square, random_numbers)

In [None]:
pool.terminate()

In [None]:
# GIL - global interpreter lock

## profiling tools

In [85]:
%%prun

result = [square(item) for item in np.random.random(size=n)]

 

## Usually, for CPU intensive computations, Pool.map won't speed up your code.

Why? It will spend more time managing process, replicating data and sending data to other process than actually computing it.



In [None]:
## Cython - CPython

In [86]:
%load_ext Cython

In [87]:
%%cython -a
def square_c(x):
    return x ** 2

In [125]:
random_numbers = np.random.random(size=n)

In [91]:
%%timeit

result = [square_c(item) for item in np.random.random(size=n)]

724 ms ± 109 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


# When is multiprocess useful then? 


## I/O bound computations

In [10]:
import pandas as pd

In [11]:
import requests

In [12]:
n_max = 51852

In [14]:
colnames = ['team_a','score_a','score_b','team_b','event','stars']
my_range = range(int(np.ceil(n_max/100)))


for i in tqdm(my_range):
    response = requests.get(f'https://www.hltv.org/results?offset={i * 100}')
    df = pd.concat(pd.read_html(response.text))
    df.to_csv(f'tmp/results_{i}.csv', index=False, sep=',')

HBox(children=(FloatProgress(value=0.0, max=519.0), HTML(value='')))

KeyboardInterrupt: 

In [None]:
def download(i):
    response = requests.get(f'https://www.hltv.org/results?offset={i * 100}')
    df = pd.concat(pd.read_html(response.text))
    df.to_csv(f'tmp/results_{i}.csv', index=False, sep=',')

In [None]:
pool = Pool(4)

In [None]:
%%time

results = pool.map(download, my_range)

In [None]:
pool.terminate()