In [2]:
import concurrent.futures

In [3]:
x = concurrent.futures.ThreadPoolExecutor()
help(x.submit)

Help on method submit in module concurrent.futures.thread:

submit(*args, **kwargs) method of concurrent.futures.thread.ThreadPoolExecutor instance
    Submits a callable to be executed with the given arguments.
    
    Schedules the callable to be executed as fn(*args, **kwargs) and returns
    a Future instance representing the execution of the callable.
    
    Returns:
        A Future representing the given call.



In [4]:
import asyncio
import time

async def foo():
    await asyncio.sleep(5)
    print("Foo!")
    
async def boo(x):
    time.sleep(2)
    return x*x

async def hello_world():
    await foo()  # waits for `foo()` to complete
    z = await boo(2)
    print("Hello World!")
    print(z)

# asyncio.run(hello_world())
await hello_world()

Foo!
Hello World!
4


In [5]:
loop = asyncio.get_event_loop()
loop.create_task(hello_world())
# loop.close()

<Task pending coro=<async-def-wrapper.<locals>.hello_world() running at <ipython-input-4-25d0707ec0b4>:15>>

In [6]:
### Gather

In [7]:
import asyncio


async def foo(n):
    await asyncio.sleep(2*n)  # wait 5s before continuing
    print(f"n: {n}!")
    return str(n)


async def main():
    tasks = [foo(1), foo(1.1), foo(1.2)]
    x = await asyncio.gather(*tasks)
    print("all done!")
    print(x)
    
    
await main()

n: 1!
Foo!
Hello World!
4
n: 1.1!
n: 1.2!
all done!
['1', '1.1', '1.2']


In [8]:
## Wait

In [9]:
import asyncio
from random import randrange


async def foo(n):
    s = randrange(5)
    print(f"{n} will sleep for: {s} seconds")
    await asyncio.sleep(s)
    print(f"n: {n}!")


async def main():
    tasks = [foo(1), foo(2), foo(3)]
    result = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    print('~~~~', result)
    
await main()

1 will sleep for: 0 seconds
2 will sleep for: 2 seconds
3 will sleep for: 0 seconds
n: 1!
n: 3!
~~~~ ({<Task finished coro=<async-def-wrapper.<locals>.foo() done, defined at <ipython-input-9-f6bb5b15cf44>:8> result=None>, <Task finished coro=<async-def-wrapper.<locals>.foo() done, defined at <ipython-input-9-f6bb5b15cf44>:8> result=None>}, {<Task pending coro=<async-def-wrapper.<locals>.foo() running at <ipython-input-9-f6bb5b15cf44>:11> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f3a1c2ca3d0>()]>>})
n: 2!


In [10]:
## Wait_for timeout

In [11]:
import asyncio


async def foo(n):
    await asyncio.sleep(2)
    print(f"n: {n}!")


async def main():
    try:
        await asyncio.wait_for(foo(1), timeout=1.0001)
    except asyncio.TimeoutError:
        print("timeout!")

await main()

timeout!


In [12]:
import asyncio
from random import randrange


async def foo(n):
    s = randrange(10)
    print(f"{n} will sleep for: {s} seconds")
    await asyncio.sleep(s)
    return f"{n}!"


async def main():
    counter = 0
    tasks = [foo("a"), foo("b"), foo("c")]

    for future in asyncio.as_completed(tasks):
        n = "quickest" if counter == 0 else "next quickest"
        counter += 1
        result = await future
        print(f"the {n} result was: {result}")
await main()

a will sleep for: 5 seconds
b will sleep for: 8 seconds
c will sleep for: 1 seconds
the quickest result was: c!
the next quickest result was: a!
the next quickest result was: b!


In [13]:
import asyncio


async def foo():
    await asyncio.sleep(3)
    print("Foo!")


async def hello_world():
    task = asyncio.create_task(foo())
    print(task)
    await asyncio.sleep(1)
    print("Hello World!")
    await asyncio.sleep(1)
    print(task)
    print('xxxx')
    
await hello_world()

<Task pending coro=<async-def-wrapper.<locals>.foo() running at <ipython-input-13-2954a3fc9540>:7>>
Hello World!
<Task pending coro=<async-def-wrapper.<locals>.foo() running at <ipython-input-13-2954a3fc9540>:8> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f3a1c42fc90>()]>>
xxxx


In [14]:
import asyncio


async def foo():
    await asyncio.sleep(3)
    print("Foo!")


async def hello_world():
    await foo()
    await asyncio.sleep(1)
    print("Hello World!")
    await asyncio.sleep(1)
    print('xxxx')
    
await hello_world()

Foo!
Foo!
Hello World!
xxxx


In [15]:
### callback

In [16]:
import asyncio


async def foo():
    await asyncio.sleep(5)
    return "Foo!"


def got_result(future):
    print(f"got the result! {future.result()}")


async def hello_world():
    task = asyncio.create_task(foo())
    task.add_done_callback(got_result)
    print(task)
    await asyncio.sleep(3)
    print("Hello World!")
    await asyncio.sleep(10)
    print(task)
    print('all done!')
await hello_world()

<Task pending coro=<async-def-wrapper.<locals>.foo() running at <ipython-input-16-08b2d8c81f74>:7> cb=[async-def-wrapper.<locals>.got_result() at <ipython-input-16-08b2d8c81f74>:12]>
Hello World!
got the result! Foo!
<Task finished coro=<async-def-wrapper.<locals>.foo() done, defined at <ipython-input-16-08b2d8c81f74>:7> result='Foo!'>
all done!


In [17]:
import asyncio
import concurrent.futures


def blocking_io():
    # File operations (such as logging) can block the
    # event loop: run them in a thread pool.
    with open("/dev/urandom", "rb") as f:
        return f.read(100)


def cpu_bound():
    # CPU-bound operations will block the event loop:
    # in general it is preferable to run them in a
    # process pool.
    return sum(i * i for i in range(10 ** 7))


async def main():
    loop = asyncio.get_running_loop()

    # 1. Run in the default loop's executor:
    result = await loop.run_in_executor(None, blocking_io)
    print("default thread pool", result)

    # 2. Run in a custom thread pool:
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, blocking_io)
        print("custom thread pool", result)

    # 3. Run in a custom process pool:
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_bound)
        print("custom process pool", result)

await(main())

default thread pool b'\xfexI\x81\xa7\xb4\xc2\xdbE\xb7\x9e\x8f\x16\xd9\x82\x7fE\xa0\xfa\x94\xaab\xe8\x1f!\x87\xf5\xb0\xc8\x0e\xc6\xa0\xae\x98H\x0c!`#\x1c\x1c\xe8\xa4)\xd7_U\xf4\xd5=\x8a\xb8b\x88`\x8b?SZ`\xd48\x04{\x16\x8f\xc8\x80Z\xba\xe5TLz\xbewE\xc9<h\xb4\xea\x1be\nm\xdf\xe8\xd9/a\xde\xe7L\x86\x87\xda\xdb\x95t'
custom thread pool b"\xc9M\xf5\xf3'z\xa23\x87\xact\xd5\x08?\x19\xacX\x15U\x9dB\x87$ih;\xd0\x82]\xc7\xa6\x08\x07\t\xf5\xcd\x8c\xe4v\xf1\x9b\x15\xc3]5\xfc\x92\xcd\x1aUG[\x05k\xd0*\xa3e\x10\nq\xbc]\xd3!Z#\x95\x1d\xa6+\xb7a\xe0M\x92\x96d}-J*\xa6\x16m\xab3\xa6Y\xb0\x91\x87\x10\xdbx\xd8\xa7\x17e\xc8"


AttributeError: Can't pickle local object 'async-def-wrapper.<locals>.cpu_bound'

In [24]:
import concurrent.futures
import time

def timefunc(func):
    def inner(*args, **kwargs):
        start = time.time()
        func(*args, **kwargs)
        end = time.time()
        print(f'Elapsed time is {end - start}')
    return inner


@timefunc
def main():
    for i in range(10000):
        for j in range(4):
            x = i * j
    print(x)

main()

@timefunc
def main2():
    for i in range(10000):
        for j in range(4):
            x = i + j 
    print(x)

main2()

with concurrent.futures.ThreadPoolExecutor() as exe: 
    f1 = exe.submit(main)
    f2 = exe.submit(main2)

import multiprocessing
pool = multiprocessing.Pool()
pool._processes

with concurrent.futures.ProcessPoolExecutor() as exe: 
    f1 = exe.submit(main)
    f2 = exe.submit(main2)