# Automatyczne pozyskiwanie danych

## Tomasz Rodak

Wykład 5

---

## Programowanie asynchroniczne z `asyncio`

**Programowanie asynchroniczne** to model współbieżności, w którym jednostki wykonawcze (korutyny) mogą zawieszać własne działanie podczas oczekiwania na operacje wejścia–wyjścia (I/O). Mechanizm ten umożliwia efektywne wykorzystanie pojedynczego wątku bez konieczności tworzenia i synchronizacji wielu wątków czy procesów.

Biblioteka `asyncio` realizuje ten model poprzez konstrukcje `async`/`await`. Cechy charakterystyczne `asyncio` to:

1. **Jednowątkowy model wykonania**
   Cały kod asynchroniczny realizowany jest w jednej **pętli zdarzeń** (*event loop*).

2. **Kooperatywna wymiana sterowania**
   Współprogramy (obiekty wykonywalne) same zwalniają sterowanie pętli zdarzeń w momentach oczekiwania na I/O. Pozwala to na przełączanie się między zadaniami bez wymuszonej preempcji (wielozadaniowości opartej na wywłaszczaniu).

3. **Przejrzysta składnia**
   Kluczowe słowa `async def` oraz `await` umożliwiają pisanie kodu o strukturze liniowej, co upraszcza debugowanie i utrzymanie aplikacji, eliminując konieczność stosowania zagnieżdżonych callbacków.

4. **Wydajność w zadaniach I/O-bound**
   W scenariuszach, gdzie większość czasu procesora jest poświęcana na oczekiwanie na zasoby zewnętrzne (np. sieć, system plików), `asyncio` znacząco zwiększa przepustowość przy minimalnym narzucie pamięciowym.

### Korutyny i pętla zdarzeń

#### Definiowanie korutyn

Korutynę definiuje się za pomocą `async def`. Wywołanie takiej funkcji zwraca obiekt korutyny, który nie zostaje wykonany do momentu uruchomienia pętli zdarzeń:

```python
async def foo(x):
    return 2 * x

coro = foo(3)  # utworzenie obiektu korutyny, bez wykonania
```

#### Uruchamianie korutyn

Do inicjacji pętli zdarzeń oraz wykonania korutyny służy funkcja `asyncio.run()`. W poniższym przykładzie korutyna `main()` koordynuje wykonanie `foo`:

```python
import asyncio

async def foo(x):
    return 2 * x

async def main():
    result = await foo(3)
    print(result)

if __name__ == "__main__":
    asyncio.run(main())
```

Funkcja `asyncio.run()` pełni trzy role:

1. Tworzy nową pętlę zdarzeń.
2. Uruchamia przekazaną korutynę.
3. Zamyka pętlę po zakończeniu wszystkich zadań.

### Instrukcja `await`

Instrukcja:

```python
await <awaitable>
```

zawiesza wykonanie bieżącej korutyny do momentu uzyskania wyniku z obiektu *awaitable*. Typowe rodzaje *awaitable* to:

* funkcje oznaczone `async def` (*coroutine functions*),
* obiekty korutyn zwrócone przez wywołania `async def` (*coroutine objects*),
* zadania (`asyncio.Task`),
* obiekty `asyncio.Future`.

W większości przypadków użytkownicy operują na korutynach i zadaniach (`Task`); bezpośrednie użycie `asyncio.Future` jest zarezerwowane dla scenariuszy niskopoziomowych.


**Uwaga:** `await` można stosować wyłącznie wewnątrz funkcji zdefiniowanych jako `async def`.

### `asyncio.sleep()`

Funkcja `asyncio.sleep()` pełni rolę nieblokującej pauzy, analogicznie do `time.sleep()`, jednak jako korutyna zwraca obiekt *awaitable*. Można ją wykorzystać w instrukcji `await` do zawieszenia korutyny na określony czas bez blokowania całej pętli zdarzeń.

Przykład:

```python
import asyncio
import time

async def delay(t, name=''):
    print(f'Starting {name} for {t} sec.')
    await asyncio.sleep(t)
    print(f'Stopping {name}.')

async def main():
    await asyncio.sleep(1)   # wstrzymuje main na 1 sekundę
    await delay(1, name='a')
    await delay(2, name='b')

start = time.perf_counter()
asyncio.run(main())
print(f'Duration: {time.perf_counter() - start:.2f} sec.')
```

W powyższym kodzie korutyny wykonywane są sekwencyjnie, co skutkuje czasem wykonania wynoszącym sumę wszystkich opóźnień (\~4 sekundy), ponieważ każda instrukcja `await` czeka na ukończenie poprzedniej korutyny.

Funkcja `asyncio.create_task()` tworzy **zadanie** (*task*) ze współprogramu/funkcji `async def` i planuje jego wykonanie w pętli zdarzeń. Zwraca utworzony obiekt zadania. Jest to funkcja **nieblokująca** - zwraca wartość natychmiast.

Podany poniżej program jest asynchroniczną wersją poprzedniego. Funkcja `main()` uruchamia współprogramy `asyncio.sleep()` i `delay()` w sposób współbieżny. Tym razem czas wykonania wyniesie naprawdę 2 sekundy.

```python
import asyncio
import time

async def delay(t, name=''):
    print(f'Starting {name} for {t} sec.')
    await asyncio.sleep(t)
    print(f'Stopping {name}.')

async def main():
    t1 = asyncio.create_task(asyncio.sleep(1))
    t2 = asyncio.create_task(delay(1, name='a'))
    await delay(2, name='b')

## Spodziewany czas wykonania: 2 sec.
start = time.perf_counter()
asyncio.run(main())
print(f'Duration: {time.perf_counter() - start} sec.')
```

Podany poniżej przykład pokazuje, że zadania na pętli zdarzeń włączane są wraz z pierwszą instrukcją `await`:

```python
import asyncio
import time

async def delay(t, name=''):
    print(f'Starting {name} for {t} sec.')
    await asyncio.sleep(t)
    print(f'Stopping {name}.')

async def main():
    t1 = asyncio.create_task(asyncio.sleep(1))
    t2 = asyncio.create_task(delay(1, name='a'))
    time.sleep(5) ## Napisy pojawią się po upływie 5 sek.
    await delay(2, name='b')

## Spodziewany czas wykonania: 7 sek.
start = time.perf_counter()
asyncio.run(main())
print(f'Duration: {time.perf_counter() - start} sec.')
```

Zadania powinny być wyczekiwane przez `await` lub zamykane w inny sposób; w przeciwnym razie nie zostaną we właściwy sposób zakończone wraz z końcem programu. W poniższym przykładzie zadanie `t1` nie podlega nigdzie wyczekaniu `await`, a że jest dłuższe niż pozostałe zadania, więc nie może zostać zakończone:

```python
import asyncio
import time

async def delay(t, name=''):
    print(f'Starting {name} for {t} sec.')
    await asyncio.sleep(t)
    print(f'Stopping {name}.')

async def main():
    t1 = asyncio.create_task(delay(10, name='long'))
    t2 = asyncio.create_task(delay(1, name='short'))
    await delay(2, name='medium')

## Spodziewany czas wykonania: 2 sec.
start = time.perf_counter()
asyncio.run(main())
print(f'Duration: {time.perf_counter() - start} sec.')
```

Wyczekanie wszystkich zadań:

```python
import asyncio
import time

async def delay(t, name=''):
    print(f'Starting {name} for {t} sec.')
    await asyncio.sleep(t)
    print(f'Stopping {name}.')

async def main():
    t1 = asyncio.create_task(delay(10, name='long'))
    t2 = asyncio.create_task(delay(1, name='short'))
    t3 = asyncio.create_task(delay(2, name='medium'))
    for t in [t1, t2, t3]:
        await t

## Spodziewany czas wykonania: 10 sec.
start = time.perf_counter()
asyncio.run(main())
print(f'Duration: {time.perf_counter() - start} sec.')
```

Program synchroniczny, gdyż tworzenie zadań rozdzielone jest wyczekiwaniem:

```python
import asyncio
import time

async def delay(t, name=''):
    print(f'Starting {name} for {t} sec.')
    await asyncio.sleep(t)
    print(f'Stopping {name}.')

async def main():
    for k in range(1, 11):
        await asyncio.create_task(delay(.1, name=f'{k}'))

## Spodziewany czas wykonania: 10x0.1 = 1 sec.
start = time.perf_counter()
asyncio.run(main())
print(f'Duration: {time.perf_counter() - start} sec.')
```

Poprzedni program poprawiony do wersji asynchronicznej:

```python
import asyncio
import time

async def delay(t, name=''):
    print(f'Starting {name} for {t} sec.')
    await asyncio.sleep(t)
    print(f'Stopping {name}.')

async def main():
    tasks = [asyncio.create_task(delay(1, name=f'{k}')) for k in range(1, 11)]
    for t in tasks:
        await t

## Spodziewany czas wykonania: 1 sek.
start = time.perf_counter()
asyncio.run(main())
print(f'Duration: {time.perf_counter() - start} sec.')
```

Odliczanie w tle:

```python
import asyncio
import time

async def odliczanie():
    for k in range(1, 11):
        await asyncio.sleep(1)
        print(f'Upłynęło {k} sek.')
    
async def superkomputer():
    print('Myślę...')
    await asyncio.sleep(10)
    return 42

async def main():
    superkomp = asyncio.create_task(superkomputer())
    await odliczanie()
    out = await superkomp
    print(f'Odpowiedź brzmi: {out}!')

## Spodziewany czas wykonania: 10 sek.
start = time.perf_counter()
asyncio.run(main())
print(f'Duration: {time.perf_counter() - start} sec.')
```

### Kasowanie zadań

Metoda `task.cancel()` wysyła do zadania wyjątek `asyncio.CancelledError`. Zadanie ulega zatrzymaniu dopiero po wyczekaniu przez `await`; wtedy rzucany jest wyjątek `CancelledError`. 

```python
import asyncio
import time

async def odliczanie():
    k = 1
    while True:
        try:
            await asyncio.sleep(1)
        except asyncio.CancelledError:
            break
        print(f'Upłynęło {k} sek.')
        k += 1
    
async def superkomputer(t):
    print('Myślę...')
    await asyncio.sleep(t)
    return 42

async def main():
    task = asyncio.create_task(odliczanie())
    out = await superkomputer(5)
    task.cancel()
    await task
    print(f'Odpowiedź brzmi: {out}!')

## Spodziewany czas wykonania: 5 sek.
start = time.perf_counter()
asyncio.run(main())
print(f'Duration: {time.perf_counter() - start} sec.')
```

#### Przykład

[Spinner](https://github.com/AllenDowney/fluent-python-notebooks/blob/master/18b-async-await/spinner_await.py) obracający się w tle.

```python
import asyncio
import itertools
import time

async def spinner(msg):
    it = itertools.cycle('\\|/-')
    while True:
        c = next(it)
        print(f'\r{c} {msg}', end='')
        try:
            await asyncio.sleep(.1)
        except asyncio.CancelledError:
            break
    blank = (len(msg) + 2) * ' '
    print(f'\r{blank}\r', end='')
        

async def superkomputer():
    print('Odpowiedź brzmi...')
    await asyncio.sleep(3)
    return 42

async def main():
    task_spinner = asyncio.create_task(spinner('myślę...'))
    out = await superkomputer()
    task_spinner.cancel()
    await task_spinner  
    print(f'\n{out}')

asyncio.run(main())
```

### Równoległe wykonywanie wielu korutyn - `asyncio.gather()`

Funkcja `asyncio.gather()` umożliwia uruchomienie wielu korutyn jednocześnie i oczekiwanie na ich ukończenie. 

```python
import asyncio
import time

async def fetch_data(id, delay):
    print(f'Pobieranie danych {id}...')
    await asyncio.sleep(delay)  # Symulacja czasu odpowiedzi API
    return f"Dane {id}"

async def main():
    # Uruchomienie trzech korutyn równocześnie
    start = time.perf_counter()
    results = await asyncio.gather(
        fetch_data(1, delay=2),
        fetch_data(2, delay=1),
        fetch_data(3, delay=3)
    )
    end = time.perf_counter()
    
    print(f"Wyniki: {results}")
    print(f"Czas wykonania: {end - start:.2f} sek.")
    # Czas wykonania: ~3 sek. (najdłuższy czas spośród wszystkich zadań)

asyncio.run(main())
```

Zaletą `gather()` jest możliwość pobrania wszystkich wyników jako listy, w tej samej kolejności, w jakiej przekazaliśmy korutyny. Domyślnie, jeśli którakolwiek z korutyn zgłosi wyjątek, zostanie on propagowany, a pozostałe korutyny będą kontynuowane.


#### Obsługa błędów w `gather()`

Parametr `return_exceptions=True` pozwala zebrać wszystkie wyniki i wyjątki bez przerywania wykonania:

```python
import asyncio

async def safe_operation(id):
    await asyncio.sleep(1)
    if id == 2:
        raise ValueError("Błąd w operacji 2")
    return f"Wynik {id}"

async def main():
    results = await asyncio.gather(
        safe_operation(1),
        safe_operation(2),
        safe_operation(3),
        return_exceptions=True  # Zbieramy wyniki i wyjątki
    )
    
    for i, result in enumerate(results, 1):
        if isinstance(result, Exception):
            print(f"Operacja {i} zakończona błędem: {result}")
        else:
            print(f"Operacja {i} zwróciła: {result}")

asyncio.run(main())
```

### Obsługa wyjątków w korutynach

Standardowa obsługa wyjątków działa również w kontekście asynchronicznym, jednak wymaga dodatkowej uwagi:

```python
import asyncio

async def riskyOperation():
    await asyncio.sleep(1)
    raise ValueError("Coś poszło nie tak")
    return "Nigdy nie dojdziemy do tego miejsca"

async def main():
    try:
        result = await riskyOperation()
        print(f"Wynik: {result}")
    except ValueError as e:
        print(f"Złapany wyjątek: {e}")
    
    print("Kontynuujemy działanie programu...")

asyncio.run(main())
```

W przypadku zadań utworzonych za pomocą `create_task()`, wyjątki nie są automatycznie propagowane. Aby ich nie zgubić, należy używać `await` na zadaniu lub sprawdzić wynik za pomocą metody `task.exception()`:

```python
import asyncio

async def riskyTask():
    await asyncio.sleep(1)
    raise ValueError("Błąd w zadaniu")

async def main():
    # Utwórz zadanie
    task = asyncio.create_task(riskyTask())
    
    # Metoda 1: await z try/except
    try:
        await task
    except ValueError as e:
        print(f"Złapany wyjątek z zadania: {e}")
    
    # Metoda 2: sprawdzenie wyjątku po zakończeniu zadania
    # (działa tylko gdy zadanie jest już zakończone)
    other_task = asyncio.create_task(riskyTask())
    await asyncio.sleep(2)  # Poczekaj aż zadanie się zakończy
    
    if other_task.done():
        if other_task.exception():
            print(f"Zadanie zakończone z wyjątkiem: {other_task.exception()}")
        else:
            print(f"Zadanie zakończone z wynikiem: {other_task.result()}")

asyncio.run(main())
```

### Praktyczny przykład: Asynchroniczne zapytania HTTP

Jednym z najpopularniejszych zastosowań `asyncio` jest wykonywanie wielu zapytań HTTP równocześnie. Przykład z użyciem biblioteki `aiohttp`:

```python
import asyncio
import aiohttp # pip install aiohttp
import time

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [
        'http://httpbin.org/delay/3',
        'http://httpbin.org/delay/1', 
        'http://httpbin.org/delay/2'
    ]
    
    start = time.perf_counter()
    
    # Sesja aiohttp jest współdzielona między zapytaniami
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        
        for i, (url, result) in enumerate(zip(urls, results)):
            print(f"{i+1}. {url}: {len(result)} znaków")
    
    elapsed = time.perf_counter() - start
    print(f"Czas wykonania: {elapsed:.2f} sek.")
    # Czas wykonania: ~3 sek. zamiast ~6 sek. sekwencyjnie

asyncio.run(main())
```

W powyższym przykładzie wszystkie trzy zapytania wykonują się równolegle, a całkowity czas to około 3 sekundy (czas najdłuższego opóźnienia) zamiast 6 sekund (suma wszystkich opóźnień), które byłyby potrzebne przy sekwencyjnym wykonaniu.