## Współbieżność i zrównoleglenie
Często spotykamy się z sytuacją, gdy w mowie potocznej oba pojęcia używane są wymiennie. Z punktu widzenia informatyki, istnieje fundamentalna różnica między oboma pojęciami: współbieżność jest własnością kodu, natomiast zrównoleglenie dotyczy sposobu konkretnego wykonania. Można zatem myśleć o współbieżności jako o istnieniu "okazji" do równoległego wykonania kodu, z której w danej sytuacji można, ale nie trzeba korzystać. Implementacja programu może być więc współbieżna, ale na jednordzeniowym procesorze wykonanie siłą rzeczy równoległe nie będzie - program jednak będzie działał całkowicie poprawnie dzięki systemowi dzielenia czasu.

W Pythonie istnieje kilka mechanizmów, które można użyć do stworzenia kodu współbieżnego:
- wątki
- procesy
- generatory, korutyny i programowanie asynchroniczne

Poniżej omówimy po kolei różne podejścia do pisania programów współbieżnych w Pythonie.

### Wątki
Jak wiele innych języków również w Pythonie dostepny jest mechanizm wątków, dzielących wspólną przestrzeń adresową, wspieranych przez system operacyjny (na uniksach z użyciem `pthreads`). Aby stworzyć nowe wątki możemy posłużyć się modułem `threading`:

In [2]:
from threading import Thread, get_ident
import time

def foo():
    print(f"thread with id #{get_ident()} - starting...")
    time.sleep(5)
    print(f"thread: #{get_ident()} - finished!")

threads = []
for _ in range(5):
    t = Thread(target=foo)
    t. start()
    threads.append(t)

for t in threads:
    t.join()
    print(f"main thread: joined thread #{t.ident}")

thread with id #123145474007040 - starting...
thread with id #123145490796544 - starting...
thread with id #123145507586048 - starting...
thread with id #123145524375552 - starting...
thread with id #123145541165056 - starting...
thread: #123145474007040 - finished!thread: #123145490796544 - finished!

thread: #123145524375552 - finished!
thread: #123145507586048 - finished!
main thread: joined thread #123145474007040
main thread: joined thread #123145490796544
main thread: joined thread #123145507586048
main thread: joined thread #123145524375552
thread: #123145541165056 - finished!
main thread: joined thread #123145541165056


Tak użyte wątki są jednak problematyczne - łatwo zapomnieć o wywołaniu `join()`, co prowadzi do wycieków pamięci. Również oczekiwanie jest nieintuicyjne - nie wiadomo, który wątek skończy się pierwszy, więc nie wiadomo na który czekać w pierszej kolejności. Na szczęście istnieje łatwiejsze rozwiązanie - moduł standardowej biblioteki `concurrent.futures`:

In [4]:
import concurrent.futures
from threading import get_ident, active_count
import time


def foo(threadNo):
    print(f"thread #{threadNo} with id #{get_ident()} - starting...  total active: {active_count()}")
    time.sleep(5 - threadNo)
    print(f"thread #{threadNo} with id #{get_ident()} - finished!    total active: {active_count()}")
    return threadNo
    
results = None
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    results = executor.map(foo, range(5))
print(f"finished all - results are in order of increasing threadno: {list(results)}")

thread #0 with id #123145474007040 - starting...  total active: 9thread #1 with id #123145490796544 - starting...  total active: 10

thread #2 with id #123145507586048 - starting...  total active: 11
thread #2 with id #123145507586048 - finished!    total active: 11
thread #3 with id #123145507586048 - starting...  total active: 11
thread #1 with id #123145490796544 - finished!    total active: 11
thread #4 with id #123145490796544 - starting...  total active: 11
thread #0 with id #123145474007040 - finished!    total active: 11
thread #4 with id #123145490796544 - finished!    total active: 10
thread #3 with id #123145507586048 - finished!    total active: 9
finished all - results are in order of increasing threadno: [0, 1, 2, 3, 4]


`active_count` daje tu zawyżone liczby bo dodatkowo Jupyter ma aktywne wątki w tym samym interpreterze. Użycie `ThreadPoolExecutor`a jako managera kontekstu jest teraz zalecanym sposobem tworzenia i zarządzania wątkami, który jest stosunkowo odporny na błędy programistyczne powodujące wycieki zasobów. Są jednak pułapki - przykładowo, jeśli do `.map` podamy funkcję bezargumentową, zostanie rzucony wyjątek, który jednak nie wydostanie się poza `ThreadPoolExecutor`, powodując trudne do debugowania bugi.
Alternatywnie, możemy użyć innych metod executora:

In [None]:
import concurrent.futures
import threading
def foo(threadNo):
    print(f"thread #{threadNo} with id #{get_ident()} - starting...  total active: {threading.active_count()}")
    time.sleep(5 - threadNo)
    print(f"thread #{threadNo} with id #{get_ident()} - finished!    total active: {threading.active_count()}")
    return threadNo

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(foo, threadNo) for threadNo in range(5)]
    for f in concurrent.futures.as_completed(futures):
        print(f"main: finished {f.result()}")
print("finished all")

`max_workers` specyfikuje ilość wątków, które będą wykonywać zadania stworzone przy użyciu`executor.submit`. Argumenty `submit` to funkcja implementująca kod zadania i argumenty, które mają być do niej przekazane.

### Great Interpreter Lock (GIL) 
W odróżnieniu od większości innych nowoczesnych języków (poza np. Ruby), Python jest dość ograniczony jeśli chodzi o wykorzystanie wątków - na raz wykonuje się tylko jeden z nich, niezależnie od ilości dostępnych rdzeni. Wynika to głównie z gwarancji koniecznych do zapewnienia poprawnego działania garbage collectora - bez muteksa chroniącego liczniki referencji, niemożliwe byłoby zapewnienie bezpieczeństwa zwalniania zasobów. Ograniczenie to dotyczy jednak tylko "kanonicznej" implementacji Pythona - CPython, nie istnieje natomiast w PyPy czy IronPythonie. Ponieważ jednak zdecydowana większość programów używa CPythona, ograniczenie to sprawiło, że używanie wątków w rozumieniu biblioteki `threading` czy klasy `ThreadPoolExecutor` zalecane jest tylko dla problemów które są ograniczane przez IO tzw. `IO bound`. W przypadku gdy bottleneckiem jest CPU lepszym rozwiązaniem jest stworzenie osobnych procesów i standardowa komunikacja między nimi w oparciu o gniazda (*sockets*), pamięć dzieloną czy rurki (*pipes*). Wiąże się to jednak ze sporym narzutem, w związku z czym [PEP-703](https://peps.python.org/pep-0703/) postuluje przebudowę CPythona tak, by GIL stał się opcjonalny. Jeśli propozycja zostanie oficjalnie przyjęta - a po raz pierwszy od lat jest na to szansa, wątki staną się pełnoprawnym sposobem implementacji dowolnych programów współbieżnych. Póki co, wątki wykonują się pojedynczo, w związku z czym część programów daje poprawne wyniki, nawet jeśli nie jest do końca poprawna w klasycznym rozumieniu współbieżnośći:

In [None]:
from concurrent.futures import ThreadPoolExecutor, as_completed
from contextlib import contextmanager
import time

def bar(threadNo) -> int:
    start = time.monotonic()
    bla = 0
    for _ in range(10_000_000):
        bla += 1
    print(f"{threadNo} took {time.monotonic() - start}")
    return bla

with ThreadPoolExecutor(max_workers=5) as executor:
    futures = [executor.submit(bar, threadNo) for threadNo in range(10)]
    for future in as_completed(futures):
        print(f"got {future.result()}")
    

### Wpływ GIL na problemy CPU-bound i na problemy IO-bound
### Strategie radzenia sobie z ograniczeniami powodowanymi przez GIL

#### Ogółem:
1. Procesy zamiast wątków:
Używaj wielu procesów zamiast wątków. Moduł multiprocessing w standardowej bibliotece Pythona pozwala na tworzenie procesów, które działają niezależnie i mogą być równocześnie wykonywane na wielordzeniowych procesorach, omijając GIL.
2. Użyj innej implementacji Pythona:
Nie wszystkie implementacje Pythona mają GIL. Na przykład, Jython (Python na JVM) i IronPython (Python na .NET) nie mają GIL. Alternatywą jest też PyPy, choć ma on własną wersję GIL, jest znacznie bardziej wydajny w wielu scenariuszach.
3. Rozszerzenia w języku C/C++:
Jeśli masz krytyczny dla wydajności fragment kodu, możesz napisać go jako rozszerzenie w C lub C++. W trakcie wykonywania kodu napisanego w C, GIL może być zwolniony.
4. Asynchroniczność:
Korzystaj z asynchronicznych bibliotek i frameworków, takich jak asyncio. Choć nadal podlegają one GIL, umożliwiają efektywne wykorzystanie jednego wątku poprzez asynchroniczne I/O i współprogramy.
5. Zewnętrzne usługi:
Jeśli ograniczenia GIL stają się problematyczne dla konkretnego zadania (np. obliczeń równoległych), można rozważyć offloading tych zadań do zewnętrznych usług lub systemów, które nie mają tych ograniczeń.
6. Użyj wątków tylko dla operacji I/O:
W wielu przypadkach GIL nie jest problemem, gdy wątki są używane głównie do operacji I/O (np. odczytu i zapisu do plików, komunikacji sieciowej), ponieważ GIL jest często zwalniany podczas takich operacji.

#### Zwalnianie GIL w zewnętrznych bibliotekach
Zewnętrzne biblioteki mogą używać dowolnej ilości wątków w sposób przezroczysty dla Pythona, pod warunkiem, że wykonają po swojej stronie całą synchronizację i interfejs z Pythonem będzie jednowatkowy. Jest to znaczące ułatwienie, choć nie całkowite remedium. Przykładowo, karty graficzne mają obecnie tak wielką wydajność i przepustowość pamięci, że samo odbieranie danych z powrotem do procesu Pythonowego jest sporym ograniczeniem. W wielu przypadkach jest to rozwiązanie, którego wpływ łatwo zmierzyć, toteż jest on relatywnie popularny. Przykładowo `NumPy` zawsze stara się pod spodem korzystać z bibliotek zaimplementowanych w C takich jak `OpenBLAS`czy `LAPACK`, które nie stronią od wykorzystania `OpenMP` i wielu wątków. Można to łatwo sprawdzić:

In [None]:
import numpy as np
np.show_config()

Idąc dalej można nawet zmierzyć wpływ ilości wątków skonfigurowanych w OpenMP, choćby na przykładzie mnożenia macierzy. Wykonajmy nastepujący program zmieniając wartość zmiennej środowiskowej `OMP_NUM_THREADS`

In [None]:
import numpy as np

rng = np.random.default_rng()
m = rng.random(size=(7000, 7000))
m @ m

```
$ export OMP_NUM_THREADS=1
$ time python test.py

real    0m13.782s
user    0m13.702s
sys     0m0.078s
$ export OMP_NUM_THREADS=4
$ time python test.py

real    0m4.542s
user    0m17.016s
sys     0m0.558s
```
Niestety, nie każda biblioteka została zaimplementowana w C/C++ i my bindingi dla Pythona, choć ekosystem jest niezwykle bogaty.

#### Własne rozszerzenie w C
```
#include <Python.h> // nagłówek z typami, funkcjami i makrami Pythona (dostępny zaraz po instalacji interpretera)

int fibonacci(int n) { // ta funkcja nie może wywoływać API Pythona!!!
    if (n == 0 || n == 1) {
        return 1
    } else {
        return fibonacci(n - 1) + fibonacci(n - 2)
    }
}

static PyObject* wrapper(PyObject* self, PyObject* args) { // łączenie między Pythonem a C
    int n, result;

    if (!PyArg_ParseTuple(args, "i", &n)) {
        return NULL;
    }

    Py_BEGIN_ALLOW_THREADS // Pozwalamy interpreterowi robić co innego kiedy fibonacci się wykonuje
    result = fibonacci(n);
    Py_END_ALLOW_THREADS // .. a tu wracamy

    return Py_BuildValue("i", result);
}

static PyMethodDef fibonacci_c_methods[] = {
    {"fibonacci", wrrapper, METH_VARARGS, "Liczy nta liczbe Fibonacciego"},
    {NULL, NULL, 0, NULL}
};

static struct PyModuleDef fibonacci_c_module = { // definicja modułu
    PyModuleDef_HEAD_INIT,
    "fibonacci_c", // tą nazwe importujemy
    "Moje rozszerzenie do liczenia fibonacciego",
    -1, // nie wspieramy subinterpreterów (PEP-554)
    fibonacci_c_methods
};

PyMODINIT_FUNC PyInit_fibonacci_cmodule(void) { // odpalane podczas ładowania modułu
    return PyModule_Create(&fibonacci_c_module);
}
```

Możemy to teraz skompilować (zakładając, że nagłówki Pythona są w `/usr/include/Python3.11`):
```
$ gcc -I/usr/include/python3.11 -shared -fPIC -O3 -o fibonacci_c.so fibonacci_c.c
```
Po kompilacji możemy normalnie zaimportować nasz moduł przez `import fibonacci_c` i wołać funkcję przez `fibonacci_c.fibonacci`. W praktyce jest to dość żmudny proces i gdy chcemy uzyskać przenośny kod na wiele platform, musimy się sporo napracować. Warto wówczas zautomatyzować taki proces budowania i dystrybuować gotowe biblioteki jako `wheel`s (np. przez PyPI)

#### Użycie NOGILa
Po zaakceptowaniu PEP-703, począwszy od Pythona 3.13 powinien byc dostępny specjalny build CPythona, zbudowany z flagą `--disable-gil`. Jest to pierwsze podejście, które ma szansę na faktyczny sukces. W planach jest by docelowo CPython był oferowany wyłącznie bez GIL-a, ale przez kilka wydań będzie można korzystać z `GIL`-owej wersji i to ona na razie będzie domyślną. W tym czasie autorzy pythonowych bibliotek i rozszerzeń w C/C++ muszą dostosować swoje biblioteki do pracy z kodem, którego poprawnosc zależała od istnienia GILa. Do dopracowania pozostaje np. kwestia atomowości operacji na kolekcjach, jednak i tu pojawiają się sensowne rozwiązania. Zainteresowanych na razie trzeba odesłać do [tego repozytorium](https://github.com/colesbury/nogil) - jeśli Wasz kod zadziała na tej wersji to będzie działał już bez GIL-a. Można wtedy zmierzyć czy wydajność wzrosła i o ile, co pozwoli zdiagnozować rzeczywiste problemy i zoptymalizować implemeentację.

### Interfejs concurrent.Futures.Executor
`concurrent.futures` definiuje interfejs `Executor`, wyglądający mniej więcej tak:
```
class Executor:
    def submit(fn, *args, **kwargs):
        ...

    def map(func, *iterables, timeout=None, chunksize=1):
        ...

    def shutdown(wait=True, *, cancel_futures=False):
        ...
```

`submit` - zleca wykonanie na którymś z wątków-workerów funkcji `fn` z przekazanymi argumentami. Zwraca obiekt `Future`
`map` - uruchamia funkcję `func` na każdym elemencie  kolekcji przekazanych w `iterables`. `chunksize` reguluje jak duże segmenty `iterables` zostają za jednym razem przekazane do workerów - ma to znaczenie zwłaszcza dla implementacji opartej o procesy, gdzie komunikacja międzyprocesowa może być relatywnie dość wolna. Gdy podamy argument `timeout`, podczas iterowania z użyciem zwracanego tu iteratora po upływie tego czasu `next()` spowoduje rzucenie wyjątku `TimeoutError`
`shutdown` - sygnalizuje do executora, że należy zwolnić zasoby przez niego zaalokowane. Parametry regulują co dzieje się z niewykonanymi jeszcze obiektami `Future`.

Executory implementowane przez bibliotekę `concurrent.futures` są też `contextmanager`ami - nie musimy wołać na nich `shutdown`, bo automatycznie stanie się to jeśli stworzymy executor w bloku `with`:

```
with ThreadPoolExecutor(max_workers=5) as executor:
    ...
# tu już jest po zawołaniu shutdown
```

Razem z biblioteką dostajemy 2 implementacje: znany nam już `ThreadPoolExecutor`, który zlecane zadania wykonuje na `max_workers` wątkach, oraz `ProcessPoolExecutor`, osiągający to samo przy użyciu procesów. Sprawia to, że stosunkowo łatwo jest wymienić niewłaściwie dobraną implementację - jeśli po sprofilowaniu okaże się np.: że ograniczeniem w naszym programie jest IO, możemy się pokusić o użycie `ThreadPoolExecutora`, który ma niższy narzut na stworzenie workerów i jest elastyczniejszy w komunikacji między workerami. Dla odmiany procesy ograniczane przez CPU będą znacząco lepiej wykorzystywały dostepne rdzenie procesora.

### Procesy

In [None]:
from concurrent.futures import ProcessPoolExecutor, as_completed
from contextlib import contextmanager
from multiprocessing import freeze_support
import time


@contextmanager
def timer(name: str):
    start = time.monotonic()
    try:
        yield
    finally:
        print(f"{name} took {time.monotonic() - start}")

def bar(threadNo) -> int:
    with timer(threadNo):
        bla = 0
        for _ in range(10_000_000):
            bla += 1
        return bla

if __name__ == "__main__":
    freeze_support()
    with ProcessPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(bar, threadNo) for threadNo in range(10)]
        for future in as_completed(futures):
            print(f"got {future.result()}")

Oczywiście, analogicznie do wątków, istnieje również [niskopoziomowe API](https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing) do zarządzania procesami - służy do tego moduł `multiprocessing`, którego API jest bardzo podobne do `threading`. W wielu przypadkach jest ono jednak trudniejsze do użycia niż `concurrent.futures`, dlatego jeśli można, zaleca się by używać nowszego modułu `concurrent.futures`.

### Wymiana danych między procesami
Najprostsze w użyciu są 2 udostępniane przez bibliotekę multiprocessing sposoby wymiany danych między procesami: `Queue` i `Pipe`. `Queue` są thread- i process safe, pipe nie.`

In [None]:
from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello']) # <- wysyłamy dane

if __name__ == '__main__':
    q = Queue() # jedna współdzielona kolejka - muszę ją podać do funkcji implementującej ciało drugiego procesu
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get()) # <- odbieramy dane
    p.join()

In [None]:
from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello']) # <- wysyłamy dane
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe() # dwie końcówki rurki - dla każdego procesu po 1
    p = Process(target=f, args=(child_conn,)) # przekazanie drugiego końca rurki
    p.start()
    print(parent_conn.recv()) # <- odbieramy dane
    p.join()

Można również używać pamięci dzielonej, która jest reprezentowana przy użyciu typów `Value` i `Array` - jeśli jednak nie ma takiej konieczności, zazwyczaj lepszym wyborem będzie `Queue`:

In [None]:
from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

### Obiekty Future
Wynikiem zasubmitowania funkcji do wykonania na executorze - niezależnie od wybranej implementacji - jest obiekt klasy Future, który enkapsuluje asynchroniczne jej wykonanie. Interfejs tej klasy wygląda mniej więcej tak:
```
class Future:
    def cancel() -> bool:
        ...

    def cancelled() -> bool:
        ...

    def running() -> bool:
        ...

    def done() -> bool:
        ...

    def result(timeout: int =None) -> Any:
        ...

    def exception(timeout=None) -> Exception:
        ...

    def add_done_callback(fn):
        ...
```
Zazwyczaj jednak korzysta się po prostu z funkcji `as_completed` by iterować po kolejnych wynikach kończących sie asynchronicznych obliczeń:
```
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(foo, threadNo) for threadNo in range(5)]
    for f in concurrent.futures.as_completed(futures):
        ...
```

### freeze_support
Drobną dygresją może być to, że aby nasze programy napisane z użyciem modułu `multiprocessing` lub z użyciem `concurrent.futures.ProcessPoolExecutor`a działały bez problemu (zwłaszcza na Windowsie), konieczne jest dodanie kilku *magicznych linijek*:
- nasz program powinien mieć klauzulę `if __name__ == "__main__"` - jeśli metoda tworzenia procesów to spawn, to każda instancja procesu-dziecka wykonuje na początku _cały_ kod na poziomie modułów. Nie chcemy by kod tworzący nowy proces był wywoływany ponownie dla każdego dziecka rekurencyjnie
- ... a w niej na początku `multiprocessing.freeze_support()`
Funkcja `freeze_support()` jest niezbędna głównie na systemie Windows oraz w przypadku "zamrażania" aplikacji (tworzenia samodzielnych plików wykonywalnych) na macOS, zwłaszcza od Pythona 3.8, gdzie domyślną metodą uruchamiania jest `spawn` a nie `fork`. Pomimo że macOS i Windows używają metody `spawn` w różny sposób, dla obu systemów zaleca się użycie `freeze_support()` przy tworzeniu samodzielnych aplikacji. Jeśli nie "zamrażamy" swojego kodu, funkcja `freeze_support()` nie jest wymagana, ale jej dodanie jest dobrą praktyką w celu zapewnienia przyszłej kompatybilności.

## Demo: DataLoader

## Semafory i zmienne warunkowe
Najprostszym typem semafora jest oczywiście mutex, który w Pythonie nazywa się `Lock`


In [None]:
from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

... są również generalniejsze semafory:

In [None]:
max_psql_conns = 5
...
with BoundedSemaphore(value=max_psql_conns):
    conn = connectdb()
    try:
        # ... use connection ...
    finally:
        conn.close()

... i zmienne warunkowe:

In [None]:
def consumer(cond):
    with cond:
        while some_queue.isEmpty():
            cond.wait()
    print('Resource acquired')


def producer(cond):
    print('Making resource available')
    cond.notifyAll()

condition = threading.Condition()

### *Zadanie*
Poniżej podany jest algorytm "sita Eratostenesa". Przy użyciu `concurrent.futures` dla każdego `0 < n <= 1 000 000` sprawdź ile jest liczb pierwszych mniejszych od danego `n`. Załóż na potrzeby zadania, że z jakiegoś powodu musisz wykonywać algorytm dla każdej liczby od nowa.
* [ ] napisz `contextmanager`, który zmierzy czas obliczenia wszystkich liczb spełniających warunki zadania
* [ ] użyj `concurrent.futures` by rozdysponować pracę między wątki
* [ ] użyj `concurrent.futures` by rozdysponować pracę między procesy - jaka jest różnica w szybkości programu?
* [ ] zainstaluj `pstree` i zweryfikuj ile procesów lub wątków działa w trakcie wykonania Twojego programu
* [ ] jak zmienia się czas wykonania w zależności od ilości wątków/procesów podanych jako parametr `max_workers` executora?

In [None]:
def sieve(n):
    prime = [True for i in range(n + 1)]
    p = 2
    while (p * p <= n):
        if (prime[p] == True):
            for i in range(p * p, n + 1, p):
                prime[i] = False
        p += 1
    result = []
    for p in range(2, n + 1):
        if prime[p]:
            result.append(p)
    return len(result)

### *Zadanie*
```
git checkout task-visualizer-runner
pip-sync
```
Obecnie moduł `visualizer.py` wykonuje swój kod w głównym wątku aplikacji. W przypadku, gdy lista tickerów do przetworzenia jest duża, może to być wyjątkowo nieefektywne i zajmować wiele czasu. Chcemy użyć pewnej formy zrównoleglenia, tak by teoretycznie móc przetwarzać kilka tickerów na raz. W pakiecie `reporting` w module `visualizer_runner.py` zdefiniowany jest interfejs `VisualizerRunner`a, którego odpowiedzialnością jest uruchamianie istniejącego `Visualizera` w sposób równoległy.
- [ ] określ jaka forma zrównoleglenia będzie potencjalnie najlepsza. Zanim przystąpisz do wykonywania kolejnych etapów przedyskutujmy razem nasze podejście
- [ ] zaimplementuj podklasę `VisualizerRunner`a, która implementuje go w sposób jednowątkowy
- [ ] zaimplementuj podklasę `VisualizerRunner`a, która implementuje go w sposób równoległy zgodnie z wynikiem dyskusji
- [ ] uruchom program poleceniem `time python -m stock_trader --start 2008-01-01 --end 2009-01-01 --ticker-list-file ../test_tickers_100.txt --data-source local demo-plot`
- [ ] zmień w `src/settings.py` wartość `visualizer_runner_concurrency` na wartość reprezentującą wykonanie równoległe
- [ ] ponownie uruchom program
- [ ] czy można wysunąć jakieś wnioski na temat działania programu w wersji równoległej względem jednowątkowej?
- [ ] jaka inna architektura aplikacji mogłaby pomóc osiągnąć lepsze rezultaty?

## Async/await
Oprócz "tradycyjnych" form współbieżności i równoległości wykonania opartych o wątki i procesy, Python3 wprowadził wsparcie dla korutyn i programowania asynchronicznego. Korutyny to funkcje, które na czas trwania pewnej wolnej operacji - zazwyczaj IO - mogą zostać zawieszone, a gdy operacja się zakonczy ponownie wznowione. Historycznie podobną funkcjonalność oferowały generatory i pierwsze podejście do tego tematu w Pythonie opierało się na nich, jednak było ono dośc niewygodne i nieintuicyjne. Podobnie jak w C# i Node.js, od Pythona 3.7 istnieje możliwość definiowania korutyn przy użyciu słów kluczowych `async` i `await`:

In [12]:
def fun(x): # zwykła funkcja
    return x * x

fun(5) # zwykłe zawołanie zwykłej funkcji

async def fun2(x): # asynchroniczna korutyna
    print("fun2")
    return x * x

await fun2(5) # zawołanie korutyny

fun2(6) # zwraca obiekt reprezentujący korutynę która jeszcze nie wykonuje się!

fun2


<coroutine object fun2 at 0x107ddca00>

Normalnie `await` można użyć tylko wewnątrz korutyny - w przypadku użycia go w zwykłej funkcji dostaniemy błąd:
```
SyntaxError: 'await' outside async function
```
Jak w takim razie możemy uruchomić naszą asynchroniczną funkcję w bloku `if __name__ == '__main__'?` Musimy stworzyć event loop - przy użyciu modułu `asyncio`:

In [None]:
import asyncio


async def fun(x):
    return x * x


async def main() -> None:
    result = await fun(10)
    print(f'fun(10)={result}')

    result = await fun(5)
    print(f'fun(5)={result}')

if __name__ == '__main__':
    asyncio.run(main())


W Jupyterze nie powyższy kod nie zadziała - Jupyter wewnątrz sam zaimplementowany jest przy użyciu `asyncio`, więc Python nie pozwala na zagnieżdżone stworzenie nowego event loopa - dlatego poprzedni przykład działal :)

### Asynchroniczne pętle for, managery kontekstu i generatory
Czasami przydatne jest iterowanie po `Iterable`, którego elementy wymagają asynchronicznego wygenerowania - np. przez zapytanie http. Takie asynchroniczne `Iterable` nie może być wołane przez zwykłą pętlę for, która spodziewa się standardowych dunder-metod `__iter__` i `__next__`, które są synchroniczne. Zamiast tego musi ono implementować ich asynchroniczne wersje: `__aiter__` i `__anext__`, które wymagają `await`a. Zwykła, synchroniczna pętla for nie może więc ich wołać (bo `await` jest zakazany poza funkcjami asynchronicznymi). Wtedy używamy zamiast tego asynchronicznej pętli - `async for`. 

Przydatny przykład jest poniżej - iterujemy po obiektach future zwróconych z `asyncio.as_completed()` po tym jak zakończą się zapytania http z `fetch_stock_data` (w kolejności od najwcześniej zakończonego). 

In [None]:
import httpx
import asyncio

API_KEY = 'TWÓJ_KLUCZ_API'
BASE_URL = 'https://www.alphavantage.co/query'

async def fetch_stock_data(symbol: str):
    params = {
        'function': 'TIME_SERIES_DAILY',
        'symbol': symbol,
        'apikey': API_KEY
    }
    
    async with httpx.AsyncClient() as client:
        response = await client.get(BASE_URL, params=params)
        response.raise_for_status()
        return symbol, response.json()

async def main():
    symbols = ['AAPL', 'GOOGL', 'MSFT']

    async for future in asyncio.as_completed(fetch_stock_data(symbol) for symbol in symbols):
        symbol, data = await future
        print(f"Otrzymano dane dla: {symbol}")
        for date, entry in data.get('Time Series (Daily)', {}).items():
            print(f"Symbol: {symbol}, Data: {date}, Cena zamknięcia: {entry['4. close']}")

# Uruchomienie asynchronicznej funkcji main w pętli zdarzeń
asyncio.run(main())

Z analogicznych powodów w Pythonie istnieją asynchroniczne managery kontekstu czy generatory - ich interfejsy również są asynchroniczne więc mogą w swoich implementacjach `await`ować asynchroniczne operacje:

In [None]:
class AsyncDatabaseConnection:
    async def __aenter__(self):
        await self.connect()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.close()

    async def connect(self):
        # Asynchroniczna logika połączenia z bazą danych

    async def close(self):
         # Asynchroniczna logika zamknięcia połączenia z bazą danych

async with AsyncDatabaseConnection() as conn:
    # wykonaj asynchroniczne zapytania
    pass

In [None]:
import httpx
import asyncio

async def stream_http_response(url):
    async with httpx.AsyncClient() as client:
        response = await client.get(url, stream=True)  # Włączamy strumieniowanie odpowiedzi

        async for chunk in response.aiter_bytes():
            yield chunk  # Strumieniowo przetwarzamy kawałki danych

async def main():
    url = 'https://example.com/large_file'
    async for data_chunk in stream_http_response(url):
        # Tutaj możemy przetwarzać lub zapisywać kolejne partie danych
        print(data_chunk)

asyncio.run(main())

## Wołanie kodu synchronicznego z asynchronicznego
Większość bibliotek w ekosystemie Pythona jest zaimplementowana z użyciem paradygmatu synchronicznego, toteż czasem chcemy wywołać kod synchroniczny z asynchronicznego. Odwrotnie niż w drugą stronę to jest w pełni wspierane i dozwolone, jednak należy uważać na operacje trwające zbyt długo by nie blokować petli zdarzeń - wówczas dobrze skorzystać z dostępnej od Pythona 3.9  funkcji `asyncio.to_thread` i wykonać je po prostu na zewnętrznym wątku:

In [None]:
import asyncio
def long_running_task():
    # ... długotrwałe obliczenia ...
    return result

async def main():
    result = await asyncio.to_thread(long_running_task)
    # ... reszta kodu ...

asyncio.run(main())

W szczególności Python obecnie nie oferuje asynchronicznego API dostepu do plików, toteż operacje ładowania ich do pamięci czy zapisu zazwyczaj powinny korzystać z `to_thread`. Jeśli nie chcemy ich tak opakowywać sami, możemy skorzystać z gotowej biblioteki `aiofile`, która dokładnie to robi:

In [None]:
import asyncio
from pathlib import Path
from tempfile import gettempdir

from aiofile import async_open

tmp_filename = Path(gettempdir()) / "hello.txt"

async def main():
    async with async_open(tmp_filename, 'w+') as afp:
        await afp.write("Hello ")
        await afp.write("world")
        afp.seek(0)

        print(await afp.read())

        await afp.write("Hello from\nasync world")
        print(await afp.readline())
        print(await afp.readline())

loop = asyncio.get_event_loop()
loop.run_until_complete(main())