# Parallelism

In [1]:
import pandas as pd
import numpy as np

# Silly example

In [2]:
def my_sleep(x):
    '''
    Sleeps for x-seconds and returns the result x
    '''
    import time
    print(f'Sleeping for {x} seconds.')
    time.sleep(x)
    print(f'Returning {x}')
    return x

In [3]:
my_sleep(5)

Sleeping for 5 seconds.
Returning 5


5

In [3]:
my_list = [1,2,3,4,5,6]

In [5]:
sum(my_list)

21

In [4]:
from tqdm.auto import tqdm

In [7]:
for i in my_list:
    print(my_sleep(i))

Sleeping for 1 seconds.
Returning 1
1
Sleeping for 2 seconds.
Returning 2
2
Sleeping for 3 seconds.
Returning 3
3
Sleeping for 4 seconds.
Returning 4
4
Sleeping for 5 seconds.
Returning 5
5
Sleeping for 6 seconds.
Returning 6
6


In [7]:
list(map(my_sleep,my_list))

Sleeping for 1 seconds.
Returning 1
Sleeping for 2 seconds.
Returning 2
Sleeping for 3 seconds.
Returning 3
Sleeping for 4 seconds.
Returning 4
Sleeping for 5 seconds.
Returning 5
Sleeping for 6 seconds.
Returning 6


[1, 2, 3, 4, 5, 6]

## Serial code

In [8]:
for item in tqdm(my_list):
    my_sleep(item)

  0%|          | 0/6 [00:00<?, ?it/s]

Sleeping for 1 seconds.
Returning 1
Sleeping for 2 seconds.
Returning 2
Sleeping for 3 seconds.
Returning 3
Sleeping for 4 seconds.
Returning 4
Sleeping for 5 seconds.
Returning 5
Sleeping for 6 seconds.
Returning 6


In [16]:
list(map(my_sleep, tqdm(my_list)))

  0%|          | 0/6 [00:00<?, ?it/s]

Sleeping for 1 seconds.
Returning 1
Sleeping for 2 seconds.
Returning 2
Sleeping for 3 seconds.
Returning 3
Sleeping for 4 seconds.
Returning 4
Sleeping for 5 seconds.
Returning 5
Sleeping for 6 seconds.
Returning 6


[1, 2, 3, 4, 5, 6]

In [9]:
# magic commands

In [9]:
%%time

list(map(my_sleep, my_list))

Sleeping for 1 seconds.
Returning 1
Sleeping for 2 seconds.
Returning 2
Sleeping for 3 seconds.
Returning 3
Sleeping for 4 seconds.
Returning 4
Sleeping for 5 seconds.
Returning 5
Sleeping for 6 seconds.
Returning 6
Wall time: 21.1 s


[1, 2, 3, 4, 5, 6]

In [13]:
%%prun # O que faz o prun??
list(map(my_sleep, [1,2,3]))

Sleeping for 1 seconds.
Returning 1
Sleeping for 2 seconds.
Returning 2
Sleeping for 3 seconds.
Returning 3
 

         203 function calls in 6.013 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        3    6.012    2.004    6.012    2.004 {built-in method time.sleep}
       16    0.001    0.000    0.001    0.000 socket.py:438(send)
       12    0.000    0.000    0.001    0.000 iostream.py:384(write)
       16    0.000    0.000    0.001    0.000 iostream.py:195(schedule)
        3    0.000    0.000    6.013    2.004 <ipython-input-1-4047f8252921>:1(my_sleep)
        6    0.000    0.000    0.001    0.000 {built-in method builtins.print}
       12    0.000    0.000    0.000    0.000 {built-in method builtins.isinstance}
       12    0.000    0.000    0.000    0.000 iostream.py:308(_is_master_process)
       16    0.000    0.000    0.000    0.000 {method 'acquire' of '_thread.lock' objects}
       16    0.000    0.000    0.000    0.000 threading.py:1092(is_alive)
        1    0.000    0.000    6.013    6.013 {built-in method builtins

## Parallel code

In [5]:
from multiprocessing import Pool, cpu_count # Como funciona o pool?

cpu_count()

4

## You have to create a pool of `n` process.

In [6]:
pool = Pool(processes=cpu_count())

### We'll `use the magic function` here to measure the velocity of this code in parallel.

However, if you run this code, watch what happens:

In [None]:
%%time

result = pool.map(my_sleep, my_list)
pool.terminate()

## This happens because multiprocessing not always (?) work in Jupyter Notebooks. 

_Some versions of linux or macbooks may handle it well (yay unix)_. But certainly it doesn't work for Windows.

### What should we do then? Two solutions.

1. We have to write our functions inside a `.py` file.

2. Install `multiprocess` (note it is different from Python's `multiprocessing` module)

In [7]:
from sleeper import my_sleep_from_file

ModuleNotFoundError: No module named 'sleeper'

In [36]:
%%time
list(map(my_sleep_from_file, tqdm([1,2,3,4,5,6,7,8])))

  0%|          | 0/8 [00:00<?, ?it/s]

CPU times: user 152 ms, sys: 38 ms, total: 190 ms
Wall time: 36.1 s


[1, 2, 3, 4, 5, 6, 7, 8]

In [33]:
%%time

result = pool.map(my_sleep_from_file, tqdm([1,2,3,4,5,6,7,8]))
# pool.terminate()

  0%|          | 0/8 [00:00<?, ?it/s]

CPU times: user 70.7 ms, sys: 15.9 ms, total: 86.6 ms
Wall time: 12.1 s


In [10]:
pool = Pool(processes=2)

In [11]:
%%time

result = pool.map(my_sleep_from_file, my_list)
# pool.terminate()

NameError: name 'my_sleep_from_file' is not defined

In [35]:
result

[1, 2, 3, 4, 5, 6]

## Using multiprocess


In [8]:
!pip3 install multiprocess

Defaulting to user installation because normal site-packages is not writeable
Collecting multiprocess
  Downloading multiprocess-0.70.12.2-py39-none-any.whl (128 kB)
Collecting dill>=0.3.4
  Downloading dill-0.3.4-py2.py3-none-any.whl (86 kB)
Installing collected packages: dill, multiprocess
Successfully installed dill-0.3.4 multiprocess-0.70.12.2


You should consider upgrading via the 'c:\program files\python39\python.exe -m pip install --upgrade pip' command.


In [12]:
# using multiprocess instead of multiprocessing
from multiprocess import Pool, cpu_count

In [13]:
pool = Pool(processes=6)

In [15]:
%%time

result = list(map(lambda x:x**10000000, [1,2,3,4,5,6]))

Wall time: 16.2 s


In [16]:
%%time
pool = Pool(processes=cpu_count())

result = list(pool.map(my_sleep, [1,2,3,4,5,6]))
pool.terminate()

Wall time: 8.28 s


In [17]:
result

[1, 2, 3, 4, 5, 6]

In [18]:
pool = Pool(processes=4)

In [19]:
%%time
result = pool.map(lambda x:x**10000000, [1,2,3,4,5,6])

Wall time: 6.8 s


# Running Asynchronous code

## What is asynchrony?

- `result.ready()`
- `result.wait()`
- `result.get()`

In [20]:
pool = Pool(processes=4)

In [21]:
result = pool.map_async(my_sleep, [10, 10, 10, 10, 10, 10])

In [22]:
#result.wait()

In [24]:
result.ready()

True

In [25]:
print('Do something that doesn"t depend on result')
print('...')
print('Now the time came when the result is needed.')
#result.wait()

result_list = result.get()
pool.terminate()
print(f'Now go on and use the results obtained - {result_list}')

Do something that doesn"t depend on result
...
Now the time came when the result is needed.
Now go on and use the results obtained - [10, 10, 10, 10, 10, 10]


# CPU intensive computations

In [26]:
def square(x):
    return x ** 2

In [27]:
%%time
square(1249415165)

Wall time: 0 ns


1561038254531977225

In [28]:
n = 1000000

In [29]:
%%timeit
    
result = [square(item) for item in np.random.random(size=n)]

488 ms ± 12.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [30]:
%%time
    
result = [square(item) for item in np.random.random(size=n)]

Wall time: 470 ms


In [31]:
pool = Pool(processes=4)

In [32]:
random_numbers = np.random.random(size=n)

In [33]:
%%time

result = pool.map(square, random_numbers)

Wall time: 24.2 s


In [35]:
pool.terminate()

In [None]:
# GIL - global interpreter lock

## profiling tools

In [36]:
%%prun

result = [square(item) for item in np.random.random(size=n)]

 

## Usually, for CPU intensive computations, Pool.map won't speed up your code.

Why? It will spend more time managing process, replicating data and sending data to other process than actually computing it.



In [37]:
!pip install Cython

Defaulting to user installation because normal site-packages is not writeable
Collecting Cython
  Downloading Cython-0.29.24-cp39-cp39-win_amd64.whl (1.7 MB)
Installing collected packages: Cython
Successfully installed Cython-0.29.24


You should consider upgrading via the 'c:\program files\python39\python.exe -m pip install --upgrade pip' command.


In [18]:
## Cython - CPython

In [38]:
%load_ext Cython

In [39]:
%%cython -a
def square_c(x):
    return x**2

In [40]:
random_numbers = np.random.random(size=n)

In [41]:
%%timeit

result = [square_c(item) for item in np.random.random(size=n)]

440 ms ± 13.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [42]:
%%timeit

result = [square(item) for item in np.random.random(size=n)]

472 ms ± 7.13 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [None]:
#https://numba.pydata.org/

# When is multiprocess useful then? 


## I/O bound computations

In [43]:
import pandas as pd

In [44]:
import requests
from bs4 import BeautifulSoup

In [45]:
n_max = 51
my_range = range(1,n_max)

In [47]:
%%time

for i in tqdm(my_range):
    response = requests.get(f'http://books.toscrape.com/catalogue/page-{i}.html')
    html=response.content
    soup = BeautifulSoup(html)
    titles=[s.find_all('a')[0]['title'] for s in soup.find_all('h3')]
    prices = [s.text for s in soup.find_all('p', attrs={'class':'price_color'})]
    stocks = [(True if s.text.strip()=='In stock' else False) for s in soup.find_all('p', attrs={'class':'instock availability'})]
    df_temp=pd.DataFrame({'Title':titles,'Price':prices,'Stock Availability':stocks})
    df_temp.to_csv(f'tmp/results_{i}.csv', index=False, sep=',')

  0%|          | 0/50 [00:00<?, ?it/s]

Wall time: 24.7 s


In [48]:
def download(i):
    import requests
    from bs4 import BeautifulSoup
    import pandas as pd
    response = requests.get(f'http://books.toscrape.com/catalogue/page-{i}.html')
    html=response.content
    soup = BeautifulSoup(html)
    titles=[s.find_all('a')[0]['title'] for s in soup.find_all('h3')]
    prices = [s.text for s in soup.find_all('p', attrs={'class':'price_color'})]
    stocks = [(True if s.text.strip()=='In stock' else False) for s in soup.find_all('p', attrs={'class':'instock availability'})]
    df_temp=pd.DataFrame({'Title':titles,'Price':prices,'Stock Availability':stocks})
    df_temp.to_csv(f'tmp/results_{i}.csv', index=False, sep=',')

In [49]:
pool = Pool(cpu_count())

In [50]:
%%time

results = pool.map(download, tqdm(my_range))

  0%|          | 0/50 [00:00<?, ?it/s]

Wall time: 7.88 s


In [51]:
pool.terminate()

In [52]:
def download_html(i):
    import requests
    from bs4 import BeautifulSoup
    import pandas as pd
    response = requests.get(f'http://books.toscrape.com/catalogue/page-{i}.html')
    html=response.content
    file = open(f'html_books_{i}.html','wb')
    file.write(response.content)

In [53]:
import os

In [54]:
os.mkdir('tmp_2')

In [55]:
os.getcwd()

'C:\\Users\\natmc\\GitRepository\\55_DAPT_nat\\Classes\\Week14'

In [56]:
os.chdir('tmp_2')

In [57]:
os.getcwd()

'C:\\Users\\natmc\\GitRepository\\55_DAPT_nat\\Classes\\Week14\\tmp_2'

In [58]:
pool = Pool(cpu_count())
results = pool.map(download_html, tqdm(my_range))

  0%|          | 0/50 [00:00<?, ?it/s]