In [43]:
import nest_asyncio

nest_asyncio.apply()

In [44]:
import asyncio
from functools import wraps


def _run_async(f):
    @wraps(f)
    def wrapper(*args, **kwargs):
        loop = asyncio.get_event_loop()
        return loop.run_until_complete(f(*args, **kwargs))

    return wrapper

In [45]:
URLS = (
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/2",
    "https://httpbin.org/delay/3",
    "https://httpbin.org/status/400",
)

In [46]:
import requests
import logging
import tempfile
import time
import contextlib
import uuid


@contextlib.contextmanager
def temp_logger(file_name):
    logger = logging.getLogger(str(uuid.uuid4()))
    handler = logging.FileHandler(file_name)
    logger.setLevel(logging.DEBUG)
    try:
        logger.addHandler(handler)
        yield logger
    finally:
        logger.removeHandler(handler)


def process_url_with_requests(url):
    """
    1. Make a GET request.
    2. If request fails, retry at most 5 times
        and sleep for 0.5 seconds before each retry.
    3. Log a response to a temporary file.
    :return: None
    """
    with (
        tempfile.NamedTemporaryFile("a+") as file,
        temp_logger(file.name) as logger,
    ):
        for _ in range(5):
            try:
                response = requests.get(url)
                response.raise_for_status()
            except requests.exceptions.RequestException:
                time.sleep(0.5)
            else:
                break
        logger.info(f"Response from URL %s is %s", url, response.text)


def process_urls_with_requests():
    for url in URLS:
        process_url_with_requests(url)

In [None]:
%timeit -r 1 -n 100 process_urls_with_requests()

In [16]:
def process_url_with_requests_session(session: requests.Session, url):
    """
    1. Make a GET request.
    2. If request fails, retry at most 5 times
        and sleep for 0.5 seconds before each retry.
    3. Log a response to a temporary file.
    :return: None
    """
    with (
        tempfile.NamedTemporaryFile("a+") as file,
        temp_logger(file.name) as logger,
    ):
        for _ in range(5):
            try:
                response = session.get(url)
                response.raise_for_status()
            except requests.exceptions.RequestException:
                time.sleep(0.5)
            else:
                break
        logger.info(f"Response from URL %s is %s", url, response.text)


def process_urls_with_requests_session():
    with requests.Session() as session:
        for url in URLS:
            process_url_with_requests_session(session, url)

In [7]:
%timeit -r 1 -n 100 process_urls_with_requests_session()

11 s ± 0 ns per loop (mean ± std. dev. of 1 run, 10 loops each)


In [17]:
from concurrent.futures import ThreadPoolExecutor


def process_urls_with_requests_session_and_threads():
    with (
        requests.Session() as session,
        ThreadPoolExecutor(max_workers=len(URLS)) as executor,
    ):
        for url in URLS:
            executor.submit(process_url_with_requests_session, session, url)

In [9]:
%timeit -r 1 -n 100 process_urls_with_requests_session_and_threads()

4.44 s ± 0 ns per loop (mean ± std. dev. of 1 run, 10 loops each)


In [18]:
async def process_urls_with_requests_session_and_threads_async_wraps_future_structured():
    """This coroutine would finish only when all the urls were processed."""
    with (
        requests.Session() as session,
        ThreadPoolExecutor(max_workers=len(URLS)) as executor,
    ):
        futures = [
            asyncio.wrap_future(
                executor.submit(process_url_with_requests_session, session, url)
            )
            for url in URLS
        ]
        await asyncio.gather(*futures)

In [11]:
%timeit -r 1 -n 100 _run_async(process_urls_with_requests_session_and_threads_async_wraps_future_structured)()

4.41 s ± 0 ns per loop (mean ± std. dev. of 1 run, 10 loops each)


In [31]:
from loguru import logger as loguru_logger
import httpx
import aiofiles
import stamina

stamina.instrumentation.set_on_retry_hooks([])
loguru_logger.remove()


@contextlib.asynccontextmanager
async def temp_logger_async(file_name):
    logger = loguru_logger.bind(task=(task := str(uuid.uuid4())))
    handler_id = logger.add(
        file_name, filter=lambda record: record["extra"]["task"] == task, enqueue=True
    )
    try:
        yield logger
    finally:
        logger.remove(handler_id)


async def process_url_async(client: httpx.AsyncClient, url):
    """
    1. Make a GET request.
    2. If request fails, retry at most 5 times
        and sleep for 0.5 seconds before each retry.
    3. Log a response to a temporary file.
    :return: None
    """
    async with (
        aiofiles.tempfile.NamedTemporaryFile(
            "a+", delete=False, delete_on_close=False
        ) as file,
        temp_logger_async(file.name) as logger,
    ):
        with contextlib.suppress(httpx.HTTPError):
            async for attempt in stamina.retry_context(
                on=httpx.HTTPError, attempts=5, wait_max=0.5
            ):
                with attempt:
                    response = await client.get(url)
                    response.raise_for_status()
        logger.info("Response from URL {url} is {text}", url=url, text=response.text)


async def process_urls_async():
    async with httpx.AsyncClient() as client, asyncio.TaskGroup() as tg:
        for url in URLS:
            tg.create_task(process_url_async(client, url))

In [32]:
%timeit -r 1 -n 100 _run_async(process_urls_async)()

5 s ± 0 ns per loop (mean ± std. dev. of 1 run, 10 loops each)
