## Współbieżność i zrównoleglenie
Często spotykamy się z sytuacją, gdy w mowie potocznej oba pojęcia używane są wymiennie. Z punktu widzenia informatyki, istnieje fundamentalna różnica między oboma pojęciami: współbieżność jest własnością kodu, natomiast zrównoleglenie dotyczy sposobu konkretnego wykonania. Można zatem myśleć o współbieżności jako o istnieniu "okazji" do równoległego wykonania kodu, z której w danej sytuacji można, ale nie trzeba korzystać. Implementacja programu może być więc współbieżna, ale na jednordzeniowym procesorze wykonanie siłą rzeczy równoległe nie będzie - program jednak będzie działał całkowicie poprawnie dzięki systemowi dzielenia czasu.

W Pythonie istnieje kilka mechanizmów, które można użyć do stworzenia kodu współbieżnego:
- wątki
- procesy
- generatory, korutyny i programowanie asynchroniczne

Poniżej omówimy po kolei różne podejścia do pisania programów współbieżnych w Pythonie.

### Wątki
Jak wiele innych języków również w Pythonie dostepny jest mechanizm wątków, dzielących wspólną przestrzeń adresową, wspieranych przez system operacyjny (na uniksach z użyciem `pthreads`). Aby stworzyć nowe wątki możemy posłużyć się modułem `threading`:

In [None]:
from threading import Thread, get_ident
import time

def foo():
    print(f"thread with id #{get_ident()} - starting...")
    time.sleep(5)
    print(f"thread: #{get_ident()} - finished!")

threads = []
for _ in range(5):
    t = Thread(target=foo)
    t. start()
    threads.append(t)

for t in threads:
    t.join()
    print(f"main thread: joined thread #{t.ident}")

Tak użyte wątki są jednak problematyczne - łatwo zapomnieć o wywołaniu `join()`, co prowadzi do wycieków pamięci. Również oczekiwanie jest nieintuicyjne - nie wiadomo, który wątek skończy się pierwszy, więc nie wiadomo na który czekać w pierszej kolejności. Na szczęście istnieje łatwiejsze rozwiązanie - moduł standardowej biblioteki `concurrent.futures`:

In [3]:
import concurrent.futures
from threading import get_ident
import time

def foo(threadNo):
    print(f"thread #{threadNo} with id #{get_ident()} - starting...  total active: {threading.active_count()}")
    time.sleep(5 - threadNo)
    print(f"thread #{threadNo} with id #{get_ident()} - finished!    total active: {threading.active_count()}")
    return threadNo
    
results = None
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    results = executor.map(foo, range(5))
print(f"finished all - results are in order of increasing threadno: {list(results)}")

thread #0 with id #140698338707008 - starting...  total active: 9
thread #1 with id #140698330314304 - starting...  total active: 10
thread #2 with id #140698316678720 - starting...  total active: 11
thread #2 with id #140698316678720 - finished!    total active: 11
thread #3 with id #140698316678720 - starting...  total active: 11
thread #1 with id #140698330314304 - finished!    total active: 11
thread #4 with id #140698330314304 - starting...  total active: 11
thread #0 with id #140698338707008 - finished!    total active: 11
thread #4 with id #140698330314304 - finished!    total active: 10
thread #3 with id #140698316678720 - finished!    total active: 9
finished all - results are in order of increasing threadno: [0, 1, 2, 3, 4]


Użycie `ThreadPoolExecutor`a jako managera kontekstu jest teraz zalecanym sposobem tworzenia i zarządzania wątkami, który jest stosunkowo odporny na błędy programistyczne powodujące wycieki zasobów. Są jednak pułapki - przykładowo, jeśli do `.map` podamy funkcję bezargumentową, zostanie rzucony wyjątek, który jednak nie wydostanie się poza `ThreadPoolExecutor`, powodując trudne do debugowania bugi.
Alternatywnie, możemy użyć innych metod executora:

In [None]:
import concurrent.futures
import threading
def foo(threadNo):
    print(f"thread #{threadNo} with id #{get_ident()} - starting...  total active: {threading.active_count()}")
    time.sleep(5 - threadNo)
    print(f"thread #{threadNo} with id #{get_ident()} - finished!    total active: {threading.active_count()}")
    return threadNo

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(foo, threadNo) for threadNo in range(5)]
    for f in concurrent.futures.as_completed(futures):
        print(f"main: finished {f.result()}")
print("finished all")

`max_workers` specyfikuje ilość wątków, które będą wykonywać zadania stworzone przy użyciu`executor.submit`. Argumenty `submit` to funkcja implementująca kod zadania i argumenty, które mają być do niej przekazane.

### Great Interpreter Lock (GIL) 
W odróżnieniu od większości innych języków, Python jest dość ograniczony jeśli chodzi o wykorzystanie wątków - na raz wykonuje się tylko jeden z nich, niezależnie od ilości dostępnych rdzeni. Wynika to głównie z gwarancji koniecznych do zapewnienia poprawnego działania garbage collectora - bez muteksa chroniącego liczniki referencji, niemożliwe byłoby zapewnienie bezpieczeństwa zwalniania zasobów. Ograniczenie to dotyczy jednak tylko "kanonicznej" implementacji Pythona - CPython, nie istnieje natomiast w PyPy czy IronPythonie. Ponieważ jednak zdecydowana większość programów używa CPythona, ograniczenie to sprawiło, że używanie wątków w rozumieniu biblioteki `threading` czy klasy `ThreadPoolExecutor` zalecane jest tylko dla problemów które są ograniczane przez IO tzw. `IO bound`. W przypadku gdy bottleneckiem jest CPU lepszym rozwiązaniem jest stworzenie osobnych procesów i standardowa komunikacja między nimi w oparciu o gniazda (*sockets*), pamięć dzieloną czy rurki (*pipes*). Wiąże się to jednak ze sporym narzutem, w związku z czym [PEP-703](https://peps.python.org/pep-0703/) postuluje przebudowę CPythona tak, by GIL stał się opcjonalny. Jeśli propozycja zostanie oficjalnie przyjęta - a po raz pierwszy od lat jest na to szansa, wątki staną się pełnoprawnym sposobem implementacji dowolnych programów współbieżnych. Póki co, wątki wykonują się pojedynczo, w związku z czym część programów daje poprawne wyniki, nawet jeśli nie jest do końca poprawna w klasycznym rozumieniu współbieżnośći:

In [18]:
from concurrent.futures import ThreadPoolExecutor, as_completed
from contextlib import contextmanager
import time

def bar(threadNo) -> int:
    start = time.monotonic()
    bla = 0
    for _ in range(10_000_000):
        bla += 1
    print(f"{threadNo} took {time.monotonic() - start}")
    return bla

with ThreadPoolExecutor(max_workers=5) as executor:
    futures = [executor.submit(bar, threadNo) for threadNo in range(10)]
    for future in as_completed(futures):
        print(f"got {future.result()}")
    

1 took 1.0267287790047703
got 10000000
0 took 1.3559310490090866
got 10000000
2 took 1.4640954999922542
4 took 1.4282812790042954
3 took 1.4720998650009278
got 10000000
got 10000000
got 10000000
5 took 1.3230580749950605
got 10000000
9 took 1.0640262730012182
got 10000000
8 took 1.197377423013677
got 10000000
7 took 1.2486669439967955
got 10000000
6 took 1.4968561370042153
got 10000000


### Interfejs concurrent.Futures.Executor
`concurrent.futures` definiuje interfejs `Executor`, wyglądający mniej więcej tak:
```
class Executor:
    def submit(fn, *args, **kwargs):
        ...

    def map(func, *iterables, timeout=None, chunksize=1):
        ...

    def shutdown(wait=True, *, cancel_futures=False):
        ...
```

`submit` - zleca wykonanie na którymś z wątków-workerów funkcji `fn` z przekazanymi argumentami. Zwraca obiekt `Future`
`map` - uruchamia funkcję `func` na każdym elemencie  kolekcji przekazanych w `iterables`. `chunksize` reguluje jak duże segmenty `iterables` zostają za jednym razem przekazane do workerów - ma to znaczenie zwłaszcza dla implementacji opartej o procesy, gdzie komunikacja międzyprocesowa może być relatywnie dość wolna. Gdy podamy argument `timeout`, podczas iterowania z użyciem zwracanego tu iteratora po upływie tego czasu `next()` spowoduje rzucenie wyjątku `TimeoutError`
`shutdown` - sygnalizuje do executora, że należy zwolnić zasoby przez niego zaalokowane. Parametry regulują co dzieje się z niewykonanymi jeszcze obiektami `Future`.

Executory implementowane przez bibliotekę `concurrent.futures` są też `contextmanager`ami - nie musimy wołać na nich `shutdown`, bo automatycznie stanie się to jeśli stworzymy executor w bloku `with`:

```
with ThreadPoolExecutor(max_workers=5) as executor:
    ...
# tu już jest po zawołaniu shutdown
```

Razem z biblioteką dostajemy 2 implementacje: znany nam już `ThreadPoolExecutor`, który zlecane zadania wykonuje na `max_workers` wątkach, oraz `ProcessPoolExecutor`, osiągający to samo przy użyciu procesów. Sprawia to, że stosunkowo łatwo jest wymienić niewłaściwie dobraną implementację - jeśli po sprofilowaniu okaże się np.: że ograniczeniem w naszym programie jest IO, możemy się pokusić o użycie `ThreadPoolExecutora`, który ma niższy narzut na stworzenie workerów i jest elastyczniejszy w komunikacji między workerami. Dla odmiany procesy ograniczane przez CPU będą znacząco lepiej wykorzystywały dostepne rdzenie procesora.

### Procesy

In [None]:
from concurrent.futures import ProcessPoolExecutor, as_completed
from contextlib import contextmanager
import time

@contextmanager
def timer(name: str):
    start = time.monotonic()
    try:
        yield
    finally:
        print(f"{name} took {time.monotonic() - start}")

def bar(threadNo) -> int:
    with timer(threadNo):
        bla = 0
        for _ in range(10_000_000):
            bla += 1
        return bla

with ProcessPoolExecutor(max_workers=5) as executor:
    futures = [executor.submit(bar, threadNo) for threadNo in range(10)]
    for future in as_completed(futures):
        print(f"got {future.result()}")

Oczywiście, analogicznie do wątków, istnieje również [niskopoziomowe API](https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing) do zarządzania procesami - służy do tego moduł `multiprocessing`, którego API jest bardzo podobne do `threading`. W wielu przypadkach jest ono jednak trudniejsze do użycia niż `concurrent.futures`, dlatego jeśli można, zaleca się by używać nowszego modułu `concurrent.futures`.

### Wymiana danych między procesami
Najprostsze w użyciu są 2 udostępniane przez bibliotekę multiprocessing sposoby wymiany danych między procesami: `Queue` i `Pipe`. `Queue` są thread- i process safe, pipe nie.`

In [11]:
from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello']) # <- wysyłamy dane

if __name__ == '__main__':
    q = Queue() # jedna współdzielona kolejka - muszę ją podać do funkcji implementującej ciało drugiego procesu
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get()) # <- odbieramy dane
    p.join()

[42, None, 'hello']


In [12]:
from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello']) # <- wysyłamy dane
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe() # dwie końcówki rurki - dla każdego procesu po 1
    p = Process(target=f, args=(child_conn,)) # przekazanie drugiego końca rurki
    p.start()
    print(parent_conn.recv()) # <- odbieramy dane
    p.join()

[42, None, 'hello']


Można również używać pamięci dzielonej, która jest reprezentowana przy użyciu typów `Value` i `Array` - jeśli jednak nie ma takiej konieczności, zazwyczaj lepszym wyborem będzie `Queue`:

In [None]:
from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

### Obiekty Future
Wynikiem zasubmitowania funkcji do wykonania na executorze - niezależnie od wybranej implementacji - jest obiekt klasy Future, który enkapsuluje asynchroniczne jej wykonanie. Interfejs tej klasy wygląda mniej więcej tak:
```
class Future:
    def cancel() -> bool:
        ...

    def cancelled() -> bool:
        ...

    def running() -> bool:
        ...

    def done() -> bool:
        ...

    def result(timeout: int =None) -> Any:
        ...

    def exception(timeout=None) -> Exception:
        ...

    def add_done_callback(fn):
        ...
```
Zazwyczaj jednak korzysta się po prostu z funkcji `as_completed` by iterować po kolejnych wynikach kończących sie asynchronicznych obliczeń:
```
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(foo, threadNo) for threadNo in range(5)]
    for f in concurrent.futures.as_completed(futures):
        ...
```

### Semafory i zmienne warunkowe
Najprostszym typem semafora jest oczywiście mutex, który w Pythonie nazywa się `Lock`


In [None]:
from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

... są również generalniejsze semafory:

In [None]:
max_psql_conns = 5
...
with BoundedSemaphore(value=max_psql_conns):
    conn = connectdb()
    try:
        # ... use connection ...
    finally:
        conn.close()

... i zmienne warunkowe:

In [15]:
def consumer(cond):
    with cond:
        while some_queue.isEmpty():
            cond.wait()
    print('Resource acquired')


def producer(cond):
    print('Making resource available')
    cond.notifyAll()

condition = threading.Condition()

### *Zadanie 1*
Poniżej podany jest algorytm "sita Eratostenesa". Przy użyciu `concurrent.futures` dla każdego `0 < n <= 1 000 000` sprawdź ile jest liczb pierwszych mniejszych od danego `n`. Załóż na potrzeby zadania, że z jakiegoś powodu musisz wykonywać algorytm dla każdej liczby od nowa.
* [ ] napisz `contextmanager`, który zmierzy czas obliczenia wszystkich liczb spełniających warunki zadania
* [ ] użyj `concurrent.futures` by rozdysponować pracę między wątki
* [ ] użyj `concurrent.futures` by rozdysponować pracę między procesy - jaka jest różnica w szybkości programu?
* [ ] zainstaluj `pstree` i zweryfikuj ile procesów lub wątków działa w trakcie wykonania Twojego programu
* [ ] jak zmienia się czas wykonania w zależności od ilości wątków/procesów podanych jako parametr `max_workers` executora?

In [22]:
def sieve(n):
    prime = [True for i in range(n + 1)]
    p = 2
    while (p * p <= n):
        if (prime[p] == True):
            for i in range(p * p, n + 1, p):
                prime[i] = False
        p += 1
    result = []
    for p in range(2, n + 1):
        if prime[p]:
            result.append(p)
    return len(result)

[2, 3, 5]

## Async/await
Oprócz "tradycyjnych" form współbieżności i równoległości wykonania opartych o wątki i procesy, Python3 wprowadził wsparcie dla korutyn i programowania asynchronicznego. Korutyny to funkcje, które na czas trwania pewnej wolnej operacji - zazwyczaj IO - mogą zostać zawieszone, a gdy operacja się zakonczy ponownie wznowione. Historycznie podobną funkcjonalność oferowały generatory i pierwsze podejście do tego tematu w Pythonie opierało się na nich, jednak było ono dośc niewygodne i nieintuicyjne. Podobnie jak w C# i Node.js, od Pythona 3.7 istnieje możliwość definiowania korutyn przy użyciu słów kluczowych `async` i `await`:

In [33]:
def fun(x): # zwykła funkcja
    return x * x

fun(5) # zwykłe zawołanie zwykłej funkcji

async def fun2(x): # asynchroniczna korutyna
    print("fun2")
    return x * x

await fun2(5) # zawołanie korutyny

fun2(6) # zwraca obiekt reprezentujący korutynę która jeszcze nie wykonuje się!

fun2


<coroutine object fun2 at 0x7ff6fc3fedc0>

Normalnie `await` można użyć tylko wewnątrz korutyny - w przypadku użycia go w zwykłej funkcji dostaniemy błąd:
```
SyntaxError: 'await' outside async function
```
Jak w takim razie możemy uruchomić naszą asynchroniczną funkcję w bloku `if __name__ == '__main__'? Musimy stworzyć event loop - przy użyciu modułu `asyncio`:

In [None]:
import asyncio


async def fun(x):
    return x * x


async def main() -> None:
    result = await fun(10)
    print(f'fun(10)={result}')

    result = await fun(5)
    print(f'fun(5)={result}')

if __name__ == '__main__':
    asyncio.run(main())

W Jupyterze nie powyższy kod nie zadziała - Jupyter wewnątrz sam zaimplementowany jest przy użyciu `asyncio`, więc Python nie pozwala na zagnieżdżone stworzenie nowego event loopa - dlatego poprzedni przykład działal :)