# Asynchronous programming with Python
## Module 4 - Compare asynchronous code approaches

### Agenda:

* Web crawler - find the shortest path between Wikipedia articles.
    * Synchronous code.
    * Multithreading.
    * Trio.
    * AsyncIO.
* Multiprocessing - calculate π value with Monte-Carlo method.
    * Synchronous code.
    * Trio.
    * AsyncIO.

### Find the shortest path between Wikipedia articles
The task have following constraints:

1. No more than 10 workers simultaneously.
2. Each worker should sleep for some time in range 0.5-1.5 seconds
   to prevent requests throttling.
3. Stop if the path length exceeds 10 articles.
4. Search only Ukrainian Wiki.

In [None]:
# Install dependencies
import re
!pip install trio asks requests aiohttp beautifulsoup4

#### Preparation
First define some common code.

In [None]:
from __future__ import annotations

import logging
import random
import re
from dataclasses import dataclass, field
from typing import List, Optional, Iterable
from urllib.parse import unquote

from bs4 import BeautifulSoup

BASE_URL = "https://uk.wikipedia.org"
MAX_WORKERS = 10
MAX_PATH_LEN = 10
log = logging.getLogger("main")


@dataclass
class Task:
    """The page URL to process."""
    url: str
    path: List[str] = field(default_factory=list)


@dataclass
class Page:
    """Processed page."""
    title: str
    links: List[str] = field(default_factory=list)

    @classmethod
    def from_html(cls, text: str) -> Page:
        """Parse HTML text and provide the page info."""
        soup = BeautifulSoup(text, 'html.parser')
        title = soup.find(id="firstHeading")
        all_links = soup.find(id="bodyContent").find_all("a")

        hrefs = []
        for link in all_links:
            href = unquote(link.attrs.get("href", ""))
            if (href.startswith("/wiki/")
                    # Exclude special pages
                    and ":" not in href
                    # Exclude "Вікісховище", "Вікіцитати" and so on.
                    and not href.startswith("/wiki/Вікі")
                    # Exclude links that look like dates
                    and not re.match(r"/wiki/\d.*", href)):
                hrefs.append(BASE_URL + href)

        return cls(title.text, hrefs)


def get_delay() -> float:
    """How long the worker should wait, seconds."""
    return random.random() + 0.5


def print_result(path: Optional[List[str]]):
    """Print the result of the search."""
    if path:
        print("The result is:", " -> ".join(path + ["."]))
    else:
        print("No route was found.")


def find_target_task(tasks: Iterable[Task], target_url) -> Optional[Task]:
    """Find a task that leads to the target URL, if any."""
    for task in tasks:
        if task.url == target_url:
            return task


def log_config(level=logging.DEBUG):
    """Configure logging."""
    logging.basicConfig(format="%(message)s")
    log.setLevel(level)

---
The URLs are supposed to be the same for all implementations.

In [None]:
START_URL = "https://uk.wikipedia.org/wiki/Трінільський_тигр"
TARGET_URL = "https://uk.wikipedia.org/wiki/Блискавка"

#### Synchronous code
Let's apply
[Breadth-first search](https://en.wikipedia.org/wiki/Breadth-first_search)
algorithm.

We visit a page, add all found links to the queue, and then pop the
links from the queue one by one and, if they were not visited before,
visit them and start over.

The synchronous implementation is quite slow.  It takes lots of time to
just process the first page.

In [None]:
import random
import time
from collections import deque

import requests


def main(url, target_url, max_path_len=MAX_PATH_LEN):
    """Synchronously crawl Wikipedia in order to find the path."""
    seen = set()
    queue = deque([Task(url)])

    while queue:
        # Some optimization to safe the calls
        target_task = find_target_task(queue, target_url)
        if target_task:
            return target_task.path

        task: Task = queue.popleft()

        if task.url in seen or len(task.path) >= max_path_len:
            continue

        log.info("Processing %s", task)
        seen.add(task.url)
        page: Page = process_page(task.url)
        new_path = task.path + [page.title]

        log.info("Adding %s ulrs.", len(page.links))
        tasks = [Task(url, new_path) for url in page.links]
        queue.extend(tasks)

    return None


def process_page(url: str) -> Page:
    """Request the page contents."""
    time.sleep(get_delay())
    resp = requests.get(url)
    resp.raise_for_status()
    return Page.from_html(resp.text)


log_config()
print_result(main(START_URL, TARGET_URL))

#### Multithreading
First refactor the code a little bit, to extract the `main`
functionality for a separate page.

It can look like this:

```python
def main(url, target_url, max_path_len=MAX_PATH_LEN):
    """Synchronously crawl Wikipedia in order to find the path."""
    seen = set()
    queue = [Task(url)]

    while True:
        current_tasks = []

        for task in queue:
            if task.url == target_url:
                return task.path

            if task.url in seen or len(task.path) >= max_path_len:
                continue
            current_tasks.append(task)
            seen.add(task.url)

        if not current_tasks:
            return None

        queue.clear()
        for task in current_tasks:
            tasks = process_task(task)
            queue.extend(tasks)


def process_task(task: Task) -> List[Task]:
    log.info("Processing %s", task)
    page: Page = process_page(task.url)
    new_path = task.path + [page.title]

    log.info("Adding %s ulrs.", len(page.links))
    return [Task(url, new_path) for url in page.links]

```

Now the last part can be executed with
[concurrent.futures.ThreadPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor).
See how it works much faster.

In [None]:
import random
import time
from concurrent.futures import ThreadPoolExecutor

import requests


def main(url, target_url, max_path_len=MAX_PATH_LEN):
    """Synchronously crawl Wikipedia in order to find the path."""
    seen = set()
    queue = [Task(url)]

    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        while True:
            current_tasks = []

            for task in queue:
                if task.url == target_url:
                    return task.path

                if task.url in seen or len(task.path) >= max_path_len:
                    continue
                current_tasks.append(task)
                seen.add(task.url)

            if not current_tasks:
                return None

            queue.clear()
            for tasks in executor.map(process_task, current_tasks):
                queue.extend(tasks)


def process_task(task: Task):
    log.info("Processing %s", task)
    page: Page = process_page(task.url)
    new_path = task.path + [page.title]

    log.info("Adding %s ulrs.", len(page.links))
    return [Task(url, new_path) for url in page.links]


def process_page(url: str) -> Page:
    """Request the page contents."""
    time.sleep(get_delay())
    resp = requests.get(url)
    resp.raise_for_status()
    return Page.from_html(resp.text)


log_config()
print_result(main(START_URL, TARGET_URL))

👉 *It is possible to use producer-consumer pattern with multithreading,
though using ThreadPoolExecutor here is much simpler,
while giving similar performance gain.*👈


#### Trio

In [None]:
import math
from dataclasses import dataclass, field
from typing import List, Set, Optional

import asks
import trio

@dataclass
class TaskContext:
    """Common context for the workers."""
    target_url: str
    max_path_len: int
    result: Optional[List[str]] = None
    seen: Set[str] = field(default_factory=set)


async def main(url: str, target_url: str, max_path_len: int = MAX_PATH_LEN):
    context = TaskContext(target_url=target_url, max_path_len=max_path_len)
    task = Task(url)

    send_channel, receive_channel = trio.open_memory_channel(math.inf)
    active_workers = trio.CapacityLimiter(MAX_WORKERS)

    async with trio.open_nursery() as nursery:
        async with send_channel, receive_channel:
            send_channel.send_nowait(task)

            for _ in range(MAX_WORKERS):
                nursery.start_soon(
                    crawler, context, active_workers, send_channel, receive_channel,
                )

            while True:
                # Give the workers a chance to start up.
                await trio.sleep(1)
                if active_workers.borrowed_tokens == 0 and send_channel.statistics().current_buffer_used == 0:
                    # All done!
                    nursery.cancel_scope.cancel()
                    break


async def crawler(context: TaskContext, active_workers, send_channel, receive_channel):
    task: Task

    async for task in receive_channel:
        async with active_workers:
            if context.result:
                return

            if task.url == context.target_url:
                context.result = task.path
                return

            if task.url in context.seen or len(task.path) >= context.max_path_len:
                continue

            log.info("Processing %s", task)
            context.seen.add(task.url)
            page: Page = await process_page(task.url)

            new_path = task.path + [page.title]
            tasks = [Task(url, new_path) for url in page.links]

            target_task = find_target_task(tasks, context.target_url)
            if target_task:
                context.result = new_path
                return

            log.info("Adding %s ulrs.", len(tasks))
            for next_task in tasks:
                send_channel.send_nowait(next_task)



async def process_page(url: str) -> Page:
    """Request the page contents."""
    await trio.sleep(get_delay())
    resp = await asks.get(url)
    resp.raise_for_status()
    return Page.from_html(resp.text)


log_config()
print_result(trio.run(main, START_URL, TARGET_URL))

<span style="font-size: x-large">Add your code below:</span>