# Lab 12. Przetwarzanie równoległe w Pythonie

## 1. Pakiet `multiprocessing`

Ten pakiet dostarcza API, które pozwala na uruchamianie kodu Python w oddzielnych procesach. Jak zostało wspomniane w poprzednim laboratorium, proces posiada swoją oddzielną pulę pamięci, własną blokadę GIL, i uruchamiany jest jako podprocess w ramach nowej instancji obiektu `multiprocessing.Process`, który korzysta z kolejnego wywołania interpretera Pythona.

API `threading` oraz `multiprocessing` jest w wielu miejscach podobne, ale `multiprocessing` wprowadza dodatkowe elementy pozwalające na obsługę zadań w ramach programowania wieloprocesowego.

_**Przykład 1**_

Ten przykład pochodzi z dokumnetacji pakietu `multiprocessing`: https://docs.python.org/3/library/multiprocessing.html

In [1]:
from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

main line
module name: __main__
parent process: 23908
process id: 21596


Po przeanalizowaniu kodu dojdziemy do wniosku, że w outpucie brakuje kilku linii (wypisywanych przez `print`), które powinny się tu pojawić. Uruchamianie kodu wykorzystującego kod wieloprocesowy odbywa się inaczej w przypadku systemów POISIX oraz Windows. Dodatkowo jeszcze dochodzą niuanse związane z uruchamianiem takiego kodu w środowisku Jupyter Notebook.

O szczegółach można przeczytac między innymi w tym artykule: https://bobswinkels.com/posts/multiprocessing-python-windows-jupyter/

Kod prezentowany tutaj winien być uruchamiany z pominięciem Jupyter Notebook. Dodatkowo wyjaśnienia wymaga pojawienie się tutaj linii `if __name__ == '__main__':`, której obecność jest związana z dobrymi praktykami programowania w samym Pythonie, ale tutaj powodem jest również natura programów wieloprocesowych w Pythonie. Zostało to opisane w dokumentacji modułu `multiprocessing` w sekcji `https://docs.python.org/3/library/multiprocessing.html#programming-guidelines` podpunkt `Safe importing of main module`.

Po umieszczeniu kody w pliku i uruchomieniu wyjście będzie wyglądało tak:

```console
main line
module name: __main__
parent process: 16948
process id: 33600    
function f
module name: __mp_main__
parent process: 33600   
process id: 11788       
hello bob
```

Widać, że sama logika wywołania zadania w ramach nowego procesu nie różni się w tym przykładzie od tej zaprezentowanej w poprzednim laboratorium, a prezentującej wykorzystanie wątków w Pythonie.

Poniżej zamieszczony zostanie przykład wykonania tej samej operacji z wykorzystaniem wątków oraz procesów dla porównania czasu wykonania, ale również przedstawienia podobieństw implementacji dla trywialnych zadań.

_**Przykład 2**_

> UWAGA! Dla spójności wyników kod uruchamiamy poprzez wywołanie pliku z kodem w interpreterze Pythona

In [None]:
# plik: fibo_threads.py

import os
import threading
from time import perf_counter

def fib(n):
    return n if n < 2 else fib(n - 2) + fib(n - 1)

threads = []
print('Uruchamianie wątków...')
t0 = perf_counter()

for _ in range(os.cpu_count()):
    threads.append(threading.Thread(target=fib, args=(25,)))

for t in threads:
    t.start()   

for t in threads:
    t.join()

print(f'Czas wykonania: {perf_counter() - t0}.')

Pojedyncze uruchomienie tego kodu nie da nam faktycznego czasu wykonania tego kodu, więc operację najlepiej powtórzyć więcej razy i czas uśrednić. Przprowadzone testy dały średni czas na poziomie ~ 0.16 sekundy.

In [None]:
# plik fibo_processes.py
import os
import multiprocessing as mp
from time import perf_counter

def fib(n):
    return n if n < 2 else fib(n - 2) + fib(n - 1)


if __name__ == '__main__':

    processes = []
    print('Uruchamianie procesów...')
    t0 = perf_counter()

    for _ in range(os.cpu_count()):
        processes.append(mp.Process(target=fib, args=(25,)))

    for t in processes:
        t.start()   

    for t in processes:
        t.join()

    print(f'Czas wykonania: {perf_counter() - t0}.')

Średni czas wykonania był na poziomie 0.372 sekundy, a więc wolniej. To może dziwić, biorąc pod uwagę, że wątki wykonują się `współbieżnie` współdzieląc między sobą czas procesora, a procesy uruchamiane są `równolegle` na oddzielnych rdzeniach. Tutaj wyjaśnieniem może być dużo większy narzut (dodatkowe operacje) przy inicjalizacji zasobów i obsłudze procesów względem wątków. Jeżeli zwiększymy wartość parametru `n` dla funkcji `fibo()` to zaobserwujemy, że czas wykonania zaczyna się zmieniać na korzyść rozwiązania opartego o procesy.

Dla wywołania `fibo(40)` czas wykonania dla procesów wyniósł ~ 38 sekund, a dla wątków około 207 sekund. Widać zatem, że dla zadań z czasochłonnych ten przyrost wydajności będzie po stronie procesów. W trakcie wykonania kodu z przykładów zwróć uwagę na obciążenie procesora w systemowym monitorze zasobów.

**Zadanie 1**

Napisz rozwiązanie, które wykona test czasu wykonania dwóch powyższych przykładów ze wskazaną ilością powtórzeń (ale weź pod uwagę, że to są dość czasochłonne zadania dobierając tę liczbę) oraz dla wskazanego `n` dla funkcji `fibo()`. Wyniki zapisuj do pliku csv w formacie:
numer_testu, fibo_n, sredni_czas.

Następnie przygotuj wykres w Jupyter Notebooku, na którym porównaj czasy wykonania dla tych dwóch rozwiązań. Każde rozwziązanie to oddzielna seria danych na wykresie  - niech to będzie wykres liniowy.

**Pula procesów oraz komunikacja między procesami**

Jak zostało już wspomniane na poprzednich zajęciach, proces komunikacji między procesami nie jest taki prosty jak w przypadku wątków. Wymaga to serializacji i desieralizacji danych odpowiednio przed ich przesłaniem i po ich odebraniu. To stanowi dość duży koszt w całym procesie wymiany danych. W tej części laboratorium przedstawione zostaną przykłady kolejek (ang. `queues`) oraz potoków (ang. `pipes`), które udostępnia moduł `multiprocessing`. Zaczniemy jednak od przedstawienia przykładu wykorzystującego pulę procesów.

> Dokumentacja klasy Pool: https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool  
> Dokumentacja Queue oraz Pipe:  
> * https://docs.python.org/3/library/multiprocessing.html#exchanging-objects-between-processes
> * https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue
> * https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Pipe

_**Przykład 3**_

W poniższym przykładzie tworzona jest pula procesów, do której przekazywane są kolejne zadania poprzez metodę `map`, która jest zrównolegloną wersją wbudowanej funkcji map. Ta metoda rozdziela obiekt iterowalny na fragmenty i przekazuje kolejnym procesom do wykonania jako argument funkcji uruchamianej w ramach procesu. Podział obiektu iterowanego na fragmenty (ang. chunk's) automatycznie pobiera po jednym elemencie, ale można określić wielkość tego fragmentu (właściwa wielkość będzie przybliżona do podanej) poprzez parametr `chunksize`.

In [None]:
# plik: process_pool_1.py
import multiprocessing as mp
from time import perf_counter


def fib(n):
    return n if n < 2 else fib(n - 2) + fib(n - 1)


if __name__ == '__main__':
    
    print('Uruchamianie procesów...')
    t0 = perf_counter()

    # tworzymy pulę 8 procesów
    with mp.Pool(processes=8) as pool:
        # wyniki są zbierane do listy
        results = pool.map(fib, range(40))
        for i, result in enumerate(results):
            print(f"fib({i}) = {result}")

    print(f'Czas wykonania: {perf_counter() - t0}.')

In [None]:
# plik: process_pool_2.py
import multiprocessing as mp
from time import perf_counter


def sum_nums(*nums):
    return sum(nums)


if __name__ == '__main__':
    
    print('Uruchamianie procesów...')
    t0 = perf_counter()

    # tworzymy pulę 8 procesów
    with mp.Pool(processes=8) as pool:
        # wyniki są zbierane do listy
        results = pool.map(sum_nums, range(40), chunksize=4)
        for i, result in enumerate(results):
            print(f"sum_nums({i}) = {result}")

    print(f'Czas wykonania: {perf_counter() - t0}.')

Poniżej przykład z przekazaniem wielu argumentów do funkcji uruchamianej w ramach procesu poprzez wykorzystanie metody `Pool.starmap`.

_**Przykład 4**_

In [None]:
# plik: process_pool_3.py

import multiprocessing as mp
from time import perf_counter


def open_file(filepath, sep=','):
    return f'Otwieram {filepath} używając separatora {sep}'


if __name__ == '__main__':
    
    print('Uruchamianie procesów...')
    t0 = perf_counter()
    params = [
        ('data.csv', ),
        ('names.csv', ';'),
        ('products.csv', ),
        ('other.csv', '^'),
        ('costam.csv', ',')]

    # tworzymy pulę 4 procesów
    with mp.Pool(processes=4) as pool:
        # wyniki są zbierane do listy
        results = pool.starmap(open_file, params)
        for i, result in enumerate(results):
            print(f"open_file{params[i]} = {result}")

    print(f'Czas wykonania: {perf_counter() - t0}.')

**Przekazywanie informacji między procesami przez obiekt `multiprocessing.Queue`**

_**Przykład 5**_

In [None]:
# plik: queue_1.py

from multiprocessing import Process, Queue, Pool
import time
import os


def f(q):
    q.put(time.time())
    print(f'Aktualny rozmiar kolejki: {q.qsize()}')

if __name__ == '__main__':
    q = Queue()

    print(f'Zostanie uruchomionych {os.cpu_count()} procesów.')
    for _ in range(os.cpu_count()):
        Process(target=f, args=(q,)).start()

    while not q.empty():
        print(q.get())

Wynik tego przykładu może być za każdym razem inny. Dlaczego? Procesy jak wiemy uruchamiane są poza głównym procesem, który przy tym zapisie nie jest blokowany i kod wyświetlający zawartość kolejki wykona się tuż po utworzeniu wszystkich procesów, bez oczekiwania na ich zakończenie.

W poniższym przykładzie czekamy, aż wszystkie procesy się zakończą i dopiero kontynuujemy wykoanie procesu nadrzędnego.

_**Przykład 6**_

In [None]:
# plik: queue_2.py
from multiprocessing import Process, Queue, Pool
import time
import os


def f(q):
    q.put(time.time())
    print(f'Aktualny rozmiar kolejki: {q.qsize()}')

if __name__ == '__main__':
    q = Queue()
    procesess = []

    print(f'Zostanie uruchomionych {os.cpu_count()} procesów.')
    for _ in range(os.cpu_count()):
        procesess.append(Process(target=f, args=(q,)))

    for p in procesess:
        p.start()      
    # czekamy, aż wszystkie procesy zakończą pracę
    for p in procesess:
        p.join()

    while not q.empty():
        print(q.get())


Przykład wykorzystania obiektu `multiprocessing.Pipe` został skopiowany z oficjalnej dokumentacji (https://docs.python.org/3/library/multiprocessing.html#exchanging-objects-between-processes). Potok jest otwierany zawsze w dwóch kierunkach (duplex). Komunikacja obywa się poprzez wywołanie metody `send` oraz `recv`.

In [None]:
from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()

Przedstawione powyżej przykłady to zaledwie wycinek możliwości pakietu `multiprocessing`, który oferuje jeszcze niskopoziomowe możliwości synchronizacji procesów (blokady, bariery 

## 2. Moduł `concurrent.futures`

Ten moduł dostarcza wysokopoziomowych metod na asynchroniczne uruchamianie wątków lub procesów za pośrednictwem interfejsu znajdującego się w abstrakcyjnej klasie `Executor`. Są to odpowiednio `ThreadPoolExecutor` oraz `ProcessPoolExecutor`


The concurrent.futures module provides a high-level interface for asynchronously executing callables.

The asynchronous execution can be performed with threads, using ThreadPoolExecutor, or separate processes, using ProcessPoolExecutor. Both implement the same interface, which is defined by the abstract Executor class.

W poniższym przykładzie czas wykonania będzie zbliżony do przykładu 3, jednak bez blokowania co powoduje, że wyniki nie zostaną wyświetlone na raz, ale będą pojawiały się stopniowo w kolejności ich wykonania.

In [None]:
# plik futures_poolexecutor_1.py
# źródło: https://realpython.com/python-parallel-processing/

from concurrent.futures import ProcessPoolExecutor

def fib(n):
    return n if n < 2 else fib(n - 2) + fib(n - 1)

if __name__ == "__main__":
    with ProcessPoolExecutor(max_workers=4) as executor:
        results = executor.map(fib, range(40))
        for i, result in enumerate(results):
            print(f"fib({i}) = {result}")

## Zadania

Na potrzeby zadania 3 przygotowano kod, który pozwala stworzyć fikcyjny zbiór danych o określonej liczbie obserwacji.

In [11]:
# deklaracja zbiorów wartości dla poszczególnych kolumn przyszłego zbioru danych
header = ['id', 'firstname', 'lastname', 'age', 'salary']
firstnames = ['Adam', 'Katarzyna', 'Krzysztof', 'Marek', 'Aleksandra', 'Zbigniew', 'Wojciech', 'Mieczysław', 'Agata', 'Wisława']
lastnames = ['Mieczykowski', 'Kowalski', 'Malinowski' , 'Szczaw', 'Glut', 'Barański', 'Brzęczyszczykiewicz', 'Wróblewski', 'Wlotka', 'Pysla']
age = {'min': 18, 'max': 68}
salary = {'min': 3200, 'max': 12500}

In [None]:
!pip install tqdm

In [None]:
# funkcja do generowania fikcyjnego datasetu
# n_rows oznacza ilość wierszy, którą chcemy finalnie uzyskać


import random
from tqdm import tqdm

def build_dataset(filename, n_rows=100, chunk_size=100000):
    rows = []
    rows.append(header)
    mu = (salary['max'] + salary['min']) / 2
    sigma = 1000

    with open(filename, 'w', encoding='utf-8') as filehandler:
        
        for id in tqdm(range(1, n_rows + 1), total=n_rows, desc="Building dataset..."):
            row = [
                f'{id}', 
                f'{random.choice(firstnames)}', 
                f'{random.choice(lastnames)}', 
                f"{random.randint(age['min'], age['max'])}",
                f"{round(float(random.normalvariate(mu=mu, sigma=sigma)), 2)}"
            ]
            rows.append(row)
            if id % chunk_size == 0:
                filehandler.writelines([f"{','.join(row)}\n" for row in rows])
                rows = []

In [None]:
# około 750MB zostanie zapisanych w pliku csv, dostosuj ilość rekordów do swoich potrzeb
build_dataset('employee.csv', 20_000_000)

**Zadanie 2**  

Przepisz kod zadania 3 z lab 11 tak, aby wykorzystywał tym razem klasę `ThreadPoolExecutor` wykorzystującą maksymalnie połowę dostępnych rdzeni komputera, na którym jest uruchamiany.

**Zadanie 3**

Korzystając z kodu do generowania fikcyjnego zbioru danych wygeneruj 4 zbiory o rozmiarze 10 milionów rekordów każdy i zapisz do plików. Następnie załaduj te zbiory sekwencyjnie do ramek danych pandas i scal je w jedną ramkę mierząc czas tej operacji (wczytanie + scalenie).

Napisz kod, który wykorzystując `ProcessPoolExecutor` będzie wczytywał kolejne pliki utworzone wcześniej w oddzielnych procesach do ramek pandas, a następnie scal te ramki w jedną. Również zmierz czas tej operacji.

