## Basic

In [None]:
import asyncio

async def count():
    print("One")
    await asyncio.sleep(1)
    print("Two")
    await asyncio.sleep(1)

async def main():
    await asyncio.gather(count(), count(), count())

if __name__ == "__main__":
    import time

    start = time.perf_counter()
    await main()
    elapsed = time.perf_counter() - start
    print(f"{__name__} executed in {elapsed:0.2f} seconds.")

In [None]:
import asyncio
import aiohttp
import time
from requests import get, Response

async def get_response(url: str = "") -> None:
    result: Response = get(url)
    print(f"{result.status_code}")
    return result.status_code, result.text

url_paths: list[str] = [
    "https://google.com",
    "https://yahoo.com",
    "https://bing.com",
]

async def main():
    start_time: float = time.perf_counter()
    await asyncio.gather(
        *(get_response(url) for url in url_paths)
    )
    end_time: float = time.perf_counter()
    print(f"Processing time: {end_time - start_time = :.2f} (s)")

if __name__ == "__main__":
    await main()

In [None]:
import asyncio
import aiohttp
import traceback


async def check_by_aiohttp(session: aiohttp.ClientSession, url: str):
    try:
        async with session.get(url) as res:
            print(f"{url}: status -> {res.status}")
            if res.status != 200:
                response_error: str = await res.text()
                print(f"{response_error = }")
    except Exception as e:
        print(f"{url}: error -> {e}")


async def main():
    try:
        websites = [
            "https://realpython.com",
            "https://pycoders.com",
            "https://www.python.org",
            "https://google.com",
            "https://yahoo.com",
            "https://bing.com",
            "https://non-exist.com",
        ]

        async with aiohttp.ClientSession() as session:
            results = await asyncio.gather(
                *(check_by_aiohttp(session, url) for url in websites),
                # return_exceptions=True
            )

    except Exception as e:
        tb_str = "".join(traceback.TracebackException.from_exception(e).format())
        print(f"[{main.__name__}] Exception during processing:\n{tb_str}")


if __name__ == "__main__":
    await main() # for running on file .ipynb
    # asyncio.run(main(), debug=True) # For running in file .py

## Example

In [3]:
import asyncio
import aiohttp
import traceback

# ========== Example 1: test response status with asyncio ===========
print(f"{'':=<30} Example 1: test response status with asyncio {'':=>30}")

async def check_by_aiohttp(session: aiohttp.ClientSession, url: str):
    try:
        async with session.get(url) as res:
            print(f"{url}: status -> {res.status}")
            if res.status != 200:
                response_error: str = await res.text()
                print(f"[{check_by_aiohttp.__name__}] [{url}] Response error: {response_error = }")
    except Exception as e:
        print(f"[{check_by_aiohttp.__name__}] [{url}] Error: {e}")


async def main():
    try:
        websites = [
            "https://realpython.com",
            "https://pycoders.com",
            "https://www.python.org",
            "https://google.com",
            "https://yahoo.com",
            "https://bing.com",
            "https://non-exist.com",
        ]

        async with aiohttp.ClientSession() as session:
            results = await asyncio.gather(
                *(check_by_aiohttp(session, url) for url in websites),
                return_exceptions=True
            )

    except Exception as e:
        tb_str = "".join(traceback.TracebackException.from_exception(e).format())
        print(f"[{main.__name__}] Exception during processing:\n{tb_str}")


if __name__ == "__main__":
    await main()
    # asyncio.run(main(), debug=True)



https://www.python.org: status -> 200
https://google.com: status -> 200
[check_by_aiohttp] [https://non-exist.com] Error: Cannot connect to host non-exist.com:443 ssl:True [SSLCertVerificationError: (1, "[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: Hostname mismatch, certificate is not valid for 'non-exist.com'. (_ssl.c:1016)")]
https://bing.com: status -> 200
https://realpython.com: status -> 200
https://yahoo.com: status -> 429
[check_by_aiohttp] [https://yahoo.com] Response error: response_error = 'Edge: Too Many Requests'
https://pycoders.com: status -> 200


In [5]:
# ========== Example 2: test response status with asyncio ===========
print(f"{'':=<30} Example 2: test response status with asyncio {'':=>30}")

async def get_response(session: aiohttp.ClientSession, input_url: str = "") -> bool:
    try:
        res = await session.get(url=input_url)
        async with res:
            print(f"[{get_response.__name__}] [{input_url}] Status: {res.status}")
            if res.status != 200:
                response_error: str = await res.text()
                print(f"[{get_response.__name__}] [{input_url}] Response error: {response_error = }")
                return False
            return True
    except Exception as e:
        print(f"[{get_response.__name__}] [{input_url}] Error: {e}")
        return False

async def get_google_response(session: aiohttp.ClientSession, url: str = "") -> str:
    result = await get_response(session, url)
    print(f"[{get_google_response.__name__}] [{url}] Result: {result}")
    return result

async def get_python_response(session: aiohttp.ClientSession, url: str = "") -> str:
    result = await get_google_response(session, url)
    print(f"[{get_python_response.__name__}] [{url}] Result: {result}")
    new_res: str = "Hi" if result else "Ho"
    print(f"[{get_python_response.__name__}] New result: {new_res}")
    return new_res

async def main():
    import datetime
    async with aiohttp.ClientSession() as session:
        return_data: dict = {
            "gg_url": "https://google.com",
            "other_url": "https://www.python.org"
        }

        python_res: str = await get_python_response(session, return_data["gg_url"])
        return_data["python_res"] = python_res

        return_data["first_sync_res"] = "Correct" if 1 < 2 else "Incorrect"
        print(f"[{main.__name__}] First res: {return_data['first_sync_res']}")
        return_data["second_sync_res"] = datetime.datetime.now().strftime("%Y-%m-%d HH:MM:SS")
        print(f"[{main.__name__}] Second res: {return_data['second_sync_res']}")

        # in this way, i'm doing it sequently, not true parallel(gg_res run => wait gg_res complete => run other_res). if gg_res: str = get_google_response block, maybe other_res can not run
        gg_res: str = await get_google_response(session, return_data["gg_url"])
        return_data["gg_res"] = gg_res

        other_res: str = await get_google_response(session, return_data["other_url"])
        return_data["other_res"] = gg_res

        # to be true async
        true_async_gg_res = asyncio.create_task(get_google_response(session, return_data["gg_url"])) # return what? data type?
        print(asyncio.create_task(get_google_response(session, return_data["gg_url"])))
        trye_async_other_res = asyncio.create_task(get_google_response(session, return_data["other_url"]))
        new_gg_res: str = await true_async_gg_res
        return_data["new_gg_res"] = new_gg_res
        new_other_res: str = await trye_async_other_res
        return_data["new_other_res"] = new_other_res

        import random
        return_data["third_sync_res"] = random.randint(10, 50)
        print(f"[{main.__name__}] Third res: {return_data['third_sync_res']}")

        return return_data

if __name__ == "__main__":
    import json
    result = await main()
    print(json.dumps(result, indent=4))

[get_response] [https://google.com] Status: 200
[get_google_response] [https://google.com] Result: True
[get_python_response] [https://google.com] Result: True
[get_python_response] New result: Hi
[main] First res: Correct
[main] Second res: 2025-11-22 HH:MM:SS
[get_response] [https://google.com] Status: 200
[get_google_response] [https://google.com] Result: True
[get_response] [https://www.python.org] Status: 200
[get_google_response] [https://www.python.org] Result: True
<Task pending name='Task-125' coro=<get_google_response() running at /tmp/ipykernel_280/859564974.py:18>>
[get_response] [https://google.com] Status: 200
[get_google_response] [https://google.com] Result: True
[get_response] [https://google.com] Status: 200
[get_google_response] [https://google.com] Result: True
[get_response] [https://www.python.org] Status: 200
[get_google_response] [https://www.python.org] Result: True
[main] Third res: 37
{
    "gg_url": "https://google.com",
    "other_url": "https://www.python.

In [7]:
# Source - https://stackoverflow.com/a
# Posted by sajid
# Retrieved 2025-11-21, License - CC BY-SA 4.0

# Python 3.9.6
import asyncio
import time


async def test(name: str):
    print(f"sleeping: {name}", time.time())
    time.sleep(3) # imagine that this is big chunk of code/ or a number     crunching block that takes a while to execute

    print(f"awaiting sleep: {name}", time.time())
    await asyncio.sleep(2)

    print(f"woke up: {name}", time.time())


async def main():
    print("In main")
    tasks = [test(name="1"), test(name="2"), test(name="3")]
    await asyncio.gather(*tasks)


if __name__ == "__main__":
    await main()


In main
sleeping: 1 1763811050.1316621


awaiting sleep: 1 1763811053.1319177
sleeping: 2 1763811053.1321878
awaiting sleep: 2 1763811056.1323824
sleeping: 3 1763811056.132628
awaiting sleep: 3 1763811059.1328528
woke up: 1 1763811059.1334567
woke up: 2 1763811059.133513
woke up: 3 1763811061.13581


In [None]:
# import asyncio
# import aiohttp
# import async_timeout
# import random
# import time

# MAX_CONCURRENCY = 200      # you can try 500â€“2000 depending on server
# RETRIES = 3
# TIMEOUT = 5

# semaphore = asyncio.Semaphore(MAX_CONCURRENCY)


# async def fetch(session, url):
#     """Raw fetch call with timeout."""
#     async with async_timeout.timeout(TIMEOUT):
#         async with session.get(url) as resp:
#             data = await resp.read()
#             return resp.status, len(data)


# async def fetch_with_retry(session, url):
#     """Retry wrapper with exponential backoff."""
#     for attempt in range(1, RETRIES + 1):
#         try:
#             async with semaphore:
#                 status, size = await fetch(session, url)
#                 return {"url": url, "status": status, "size": size, "error": None}
#         except Exception as e:
#             if attempt == RETRIES:
#                 return {"url": url, "status": None, "size": None, "error": str(e)}
            
#             await asyncio.sleep(0.2 * (2 ** attempt) + random.random() * 0.1)


# async def fetch_all(urls):
#     conn = aiohttp.TCPConnector(
#         limit=0,            # unlimited connections
#         ttl_dns_cache=300   # keep DNS warm
#     )
#     timeout = aiohttp.ClientTimeout(total=None)

#     async with aiohttp.ClientSession(connector=conn, timeout=timeout) as session:
#         tasks = [asyncio.create_task(fetch_with_retry(session, url)) for url in urls]
#         return await asyncio.gather(*tasks)


# # entrypoint
# if __name__ == "__main__":
#     urls = [
#         "https://google.com",
#         "https://yahoo.com",
#         "https://bing.com",
#     ] * 1000  # simulate 3000 URLs

#     start = time.time()
#     results = asyncio.run(fetch_all(urls))
#     duration = time.time() - start

#     ok = sum(1 for r in results if r["error"] is None)
#     fail = len(results) - ok

#     print(f"Completed: {ok}, Failed: {fail}")
#     print(f"Took {duration:.2f} seconds")


In [None]:
# import time
# import asyncio
# import aiohttp
# import async_timeout
# import requests
# from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor


# # -----------------------------------------------------------------------------
# # Asyncio version
# # -----------------------------------------------------------------------------

# async def aio_fetch(session, url):
#     async with async_timeout.timeout(5):
#         async with session.get(url) as resp:
#             await resp.read()
#             return resp.status

# async def aio_run(urls, max_con=200):
#     connector = aiohttp.TCPConnector(limit=max_con)
#     async with aiohttp.ClientSession(connector=connector) as session:
#         tasks = [asyncio.create_task(aio_fetch(session, u)) for u in urls]
#         await asyncio.gather(*tasks)


# # -----------------------------------------------------------------------------
# # ThreadPool version
# # -----------------------------------------------------------------------------

# def requests_fetch(url):
#     r = requests.get(url, timeout=5)
#     return r.status_code

# def threads_run(urls, workers=200):
#     with ThreadPoolExecutor(max_workers=workers) as ex:
#         list(ex.map(requests_fetch, urls))


# # -----------------------------------------------------------------------------
# # ProcessPool version (will be terrible)
# # -----------------------------------------------------------------------------

# def requests_fetch_process(url):
#     r = requests.get(url, timeout=5)
#     return r.status_code

# def processes_run(urls, workers=20):
#     with ProcessPoolExecutor(max_workers=workers) as ex:
#         list(ex.map(requests_fetch_process, urls))


# # -----------------------------------------------------------------------------
# # Benchmark runner
# # -----------------------------------------------------------------------------

# def bench(name, fn):
#     print(f"\n=== Running: {name} ===")
#     start = time.time()
#     fn()
#     dur = time.time() - start
#     print(f"{name}: {dur:.2f} seconds")


# if __name__ == "__main__":
#     urls = ["https://google.com"] * 2000

#     bench("asyncio + aiohttp", lambda: asyncio.run(aio_run(urls, max_con=500)))
#     bench("ThreadPoolExecutor + requests", lambda: threads_run(urls, workers=200))
#     bench("ProcessPoolExecutor", lambda: processes_run(urls, workers=20))
