In [None]:
import asyncio
import time

In [10]:
async def long_operation():
    await asyncio.sleep(10)
    return "Completed"


async def main():
    try:
        result = await asyncio.wait_for(long_operation(), timeout=2)
        print(result)
    except asyncio.TimeoutError:
        print("Operation timed out")  # This will be printed

if __name__ == "__main__":
    await asyncio.create_task(main())
    print("Done")



Operation timed out
Done


In [None]:
async def long_running():
    try:
        await asyncio.sleep(10)
        print("Completed")
    except asyncio.CancelledError:
        print("Task was cancelled")
        raise


async def main():
    task = asyncio.create_task(long_running())

    # Let it run for a bit then cancel
    await asyncio.sleep(2)
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("Main: task cancelled successfully")

if __name__ == "__main__":
    await main()
    print("Done")


Completed
Done


In [18]:
def run_in_new_loop(coro):
    new_loop = asyncio.new_event_loop()
    asyncio.set_event_loop(new_loop)
    try:
        return new_loop.run_until_complete(coro)
    finally:
        new_loop.close()


# Usage
import threading  # noqa: E402


def start_background_loop():
    result = run_in_new_loop(background_work())
    print(f"Result: {result}")


async def background_work():
    await asyncio.sleep(1)
    return "Done in background loop"


# Run in a separate thread
thread = threading.Thread(target=start_background_loop)
thread.start()

Result: Done in background loop


In [None]:
event = asyncio.Event()


async def waiter():
    print("Waiting for event...")
    await event.wait()
    print("Event received, continuing")


async def setter():
    await asyncio.sleep(2)  # Wait a bit
    print("Setting event")
    event.set()


async def main():
    await asyncio.gather(waiter(), setter()) # type: ignore


if __name__ == "__main__":
    await main()
    print("Done")

Waiting for event...
Event received, continuing
Setting event
Done


In [21]:
async def producer(queue):
    for i in range(5):
        await asyncio.sleep(1)
        item = f"Item {i}"
        await queue.put(item)
        print(f"Produced {item}")


async def consumer(queue):
    while True:
        item = await queue.get()
        print(f"Consumed {item}")
        await asyncio.sleep(2)  # Process item
        queue.task_done()


async def main():
    queue = asyncio.Queue()

    # Start consumer and producer tasks
    consumer_task = asyncio.create_task(consumer(queue))
    producer_task = asyncio.create_task(producer(queue))

    # Wait for producer to finish
    await producer_task

    # Wait for queue to be fully processed
    await queue.join()

    # Cancel consumer
    consumer_task.cancel()


if __name__ == "__main__":
    await main()
    print("Done")

Produced Item 0
Consumed Item 0
Produced Item 1
Consumed Item 1
Produced Item 2
Produced Item 3
Consumed Item 2
Produced Item 4
Consumed Item 3
Consumed Item 4
Done
