# Współbieżność i równoległość

> Concurrency is about dealing with lots of things at once.
>
> Parallelism is about doing lots of things at once.
>
> — Rob Pike

Podstawowe pakiety w Python związane z współbieżnością i zrównoleglaniem zadań:
- pakiety `threading` lub `asyncio` są przydatne, kiedy w programie są operacje blokujące
- pakiet `multiprocessing` pozwala na wydajniejsze wykorzystanie CPU poprzez uruchamianie wielu procesów

Zysk z współbieżności w Python zależy od implementacji. CPython posiada zaimplementowany tzw. GIL (Global Intepreter Lock), który blokuje równoległe wykonanie wątków (poza operacjami typu oczekiwanie na dane), ale dzięki temu zabezpiecza przed błędami synchronizacji.

Najlepiej korzystać z dedykowanych bibliotek w przypadku obliczeń.

## Wykonanie sekwencyjne

In [None]:
from datetime import datetime
import time

# zadanie task_id o złożoności n
def task(task_id, n):
    print(f'→ task {task_id} ', datetime.now().time())
    for x in range(1, n):
        for y in range(1, n):
            x**y
    # operacja blokująca
    # time.sleep(task_id)
    print(f'← task {task_id} ', datetime.now().time())


# sekwencyjne wykonanie zadań
def sequential(tasks):
    for i in range(tasks):    
        task(i, 750)

    
start = datetime.now()
sequential(5)
end = datetime.now()
print("total: ", end - start)

## Wykonanie współbieżne

In [None]:
from datetime import datetime
import time
import threading

# zadanie task_id o złożoności n
def task(task_id, n):
    print(f'→ task {task_id} ', datetime.now().time())
    for x in range(1, n):
        for y in range(1, n):
            x**y
    # operacja blokująca
    # time.sleep(task_id)
    print(f'← task {task_id} ', datetime.now().time())
    

# współbieżne wykonanie zadań
def concurrent(tasks):
    threads = []

    for i in range(tasks):
        # utworzenie, dodanie do listy i uruchomienie wątku
        t = threading.Thread(target=task, args=(i, 750))
        threads.append(t)
        t.start()

    # oczekiwanie na zakończenie wszystkich wątków
    [ t.join() for t in threads ]

    
start = datetime.now()
concurrent(5)
end = datetime.now()
print("total: ", end - start)

## Wykonanie równoległe

In [None]:
from datetime import datetime
import time
import multiprocessing

# zadanie task_id o złożoności n
def task(task_id, n):
    print(f'→ task {task_id} ', datetime.now().time())
    for x in range(1, n):
        for y in range(1, n):
            x**y
    # operacja blokująca
    # time.sleep(task_id)
    print(f'← task {task_id} ', datetime.now().time())
    

# równoległe wykonanie zadań
def parallel(tasks):
    processes = []

    for i in range(tasks):
        # utworzenie, dodanie do listy i uruchomienie odrębnego procesu
        p = multiprocessing.Process(target=task, args=(i, 750))
        processes.append(p)
        p.start()

    # oczekiwanie na zakończenie wszystkich wątków
    [ p.join() for p in processes ]

    
start = datetime.now()
parallel(5)
end = datetime.now()
print("total: ", end - start)

## Pule wątków i procesów

Z punktu widzenia programisty aplikacja ma do wykonania pewne zadania, nie interesuje go ich zrównoleglanie od strony tworzenia wątków lub procesów.

Pakiet `concurrent.futures` umożliwia tworzenie:
- pul wątków
- pul procesów

Pula automatycznie zarządza wątkami/procesami przetwarzając przekazane zadania. Umożliwia również sprawdzenie ich stanu oraz pobranie wyniku.

**Uwaga!** `ProcessPoolExecutor` może [nie działać w konsoli interaktywnej](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor).

In [None]:
from datetime import datetime
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait

# zadanie task_id o złożoności n
def task(task_id, n):
    print(f'→ task {task_id} ', datetime.now().time())
    for x in range(1, n):
        for y in range(1, n):
            x**y
    # operacja blokująca
    # time.sleep(task_id)
    print(f'← task {task_id} ', datetime.now().time())
    return (task_id, n)
    

# wykonanie zadań przez wykonawcę
def executor(tasks):
    # brak parametru utworzy optymalną liczbę procesów w puli
    pool = ProcessPoolExecutor(2)
    futures = []

    for i in range(tasks):
        # utworzenie, dodanie do listy i dodanie zadania do puli procesów
        f = pool.submit(task, i, 750)
        futures.append(f)
        
    # czy zadania są już zakończone
    [ print(f.done()) for f in futures ]
    
    # oczekiwanie na zakończenie wszystkich zadań
    wait(futures)
    
    # pobranie wyników
    [ print(f.result()) for f in futures ]
    
    # pula nadal istnieje i może być używana

    
start = datetime.now()
executor(5)
end = datetime.now()
print("total: ", end - start)

Współbieżność nie jest uniwersalną metodą przyspieszania obliczeń i nie zawsze przynosi zysk.

Warto rozważyć inne techniki:
- optymalizacja kodu poprzez redukcję powtarzalnych operacji lub ich wyeliminowanie jeśli są zbędne
- reużywanie obiektów, przechowywanie wyników obliczeń
- wykorzystanie dedykowanych bibliotek lub innych implementacji Pythona, takich jak `PyPy` lub `Cython`

## Synchronizacja

Dostęp do współdzielonych danych powinien być synchronizowany, choć niektóre z implementacji Pythona będą działać "poprawnie". Podczas wykonania może dojść do wyścigu, którego efektem mogą być niedeterministyczne lub nieoczekiwane wyniki.

In [None]:
import time
import threading

z = 0

# zadanie 1
def task_1():
    global x, y, z
    x = 1
    time.sleep(0)
    y = 1
    time.sleep(0)
    z += 1


# zadanie 2
def task_2():
    global x, y, z
    y = 2
    time.sleep(0)
    x = 2
    time.sleep(0)
    z += 1
    

threads = []
threads.append(threading.Thread(target=task_1))
threads.append(threading.Thread(target=task_2))

[ t.start() for t in threads ]

[ t.join() for t in threads ]

print(f'final values: x={x} y={y} z={z}')

Ręczna synchronizacja jest trudna i często prowadzi do problemów ze współdzieleniem danych takich jak np. zakleszczenia.

In [None]:
import time
import threading

lock_x = threading.Lock()
lock_y = threading.Lock()

# zadanie 1
def task_1():
    global x, y
    print('1: lock_x acquire')
    lock_x.acquire()
    print('1: lock_x acquired')
    time.sleep(0)
    x = 1
    print('1: lock_y acquire')
    lock_y.acquire()
    print('1: lock_y acquired')
    time.sleep(0)
    y = 1
    print('1: lock_y release')
    lock_y.release()
    print('1: lock_y released')
    print('1: lock_x release')
    lock_x.release()
    print('1: lock_x released')
    

# zadanie 2
def task_2():
    global x, y
    print('2: lock_y acquire')
    lock_y.acquire()
    print('2: lock_y acquired')
    time.sleep(0)
    y = 2
    print('2: lock_x acquire')
    lock_x.acquire()
    print('2: lock_x acquired')
    time.sleep(0)
    x = 2
    print('2: lock_x release')
    lock_x.release()
    print('2: lock_x released')
    print('2: lock_y release')
    lock_y.release()
    print('2: lock_y released')
    

threads = []
threads.append(threading.Thread(target=task_1))
threads.append(threading.Thread(target=task_2))

[ t.start() for t in threads ]

[ t.join() for t in threads ]

print(f'final values: x={x} y={y}')

## Dziel i zwyciężaj

Dziel i zwyciężaj (*divide and conquer*) to metoda projektowania algorytmów polegająca na podziale problemu na mniejsze problemy do rozwiązania analogiczne do problemu początkowego.

In [None]:
import random

def find_max(data, left, right):
    # warunek stop: lista jednoelementowa
    if left == right:
        return data[left]

    # środek listy
    mid = int((left + right) / 2)

    # rozwiązanie problemu dla lewej części listy, wywołanie rekurencyjne
    max_left = find_max(data, left, mid)
    # rozwiązanie problemu dla prawej części listy, wywołanie rekurencyjne
    max_right = find_max(data, mid + 1, right)

    # zwracamy mniejszą z wartości
    return max_left if max_left > max_right else max_right


# lista losowych elementów z zakresu <1, n>
n = 1_000_000_000
data = random.sample(range(1, n + 1), n)
# limit rekurencji można zawężać za pomocą sys.setrecursionlimit()
print(f'max = {find_max(data, 0, len(data) - 1)}')