In [3]:
import asyncio
import logging

In [15]:
%%writefile asynctest.py

import asyncio
import logging

logging.basicConfig(format="[%(thread)-5d]%(asctime)s: %(message)s") # Chcek
logger = logging.getLogger('async')
logger.setLevel(logging.INFO)

async def print_after(msg, wait_secs):
    await asyncio.sleep(wait_secs)
    logger.info(msg)

async def test_async():
    await asyncio.gather(
        print_after("One second", 1),
        print_after("Two seconds", 2),
        print_after("Three seconds", 3))
    logger.info("finished")

def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(test_async())

if __name__ == '__main__':
    main()

Overwriting asynctest.py


In [16]:
!python3 asynctest.py

[140735129403392]2015-12-02 08:45:07,798: One second
[140735129403392]2015-12-02 08:45:08,799: Two seconds
[140735129403392]2015-12-02 08:45:09,798: Three seconds
[140735129403392]2015-12-02 08:45:09,798: finished


In [18]:
%load_ext yf

In [19]:
main()

INFO:async:One second
INFO:async:Two seconds
INFO:async:Three seconds
INFO:async:finished


In [20]:
import asyncio
import logging
import time
from concurrent.futures import ThreadPoolExecutor

logging.basicConfig(format="[%(thread)-5d]%(asctime)s: %(message)s")
logger = logging.getLogger('async')
logger.setLevel(logging.INFO)

executor = ThreadPoolExecutor(max_workers=10)
loop = asyncio.get_event_loop()

def cpu_bound_op(exec_time, *data):
    """
    Simulation of a long-running CPU-bound operation
    :param exec_time: how long this operation will take
    :param data: data to "process" (sum it up)
    :return: the processed result
    """
    logger.info("Running cpu-bound op on {} for {} seconds".format(data, exec_time))
    time.sleep(exec_time)
    return sum(data)

async def process_pipeline(data):
    # just pass the data along to level_a and return the results
    results = await level_a(data)
    return results

async def level_a(data):
    # tweak the data a few different ways and pass them each to level b.
    level_b_inputs = data, 2*data, 3*data
    results = await asyncio.gather(*[level_b(val) for val in level_b_inputs])
    # we've now effectively called level_b(...) three times with three inputs,
    # and (once the await returns) they've all finished, so now we'll take
    # the results and pass them along to our own long-running CPU-bound
    # process via the thread pool.
    # Note the signature of run_in_executor: (executor, func, *args)
    # The third argument and beyond will be passed to cpu_bound_op when it is called.
    result = await loop.run_in_executor(executor, cpu_bound_op, 3, *results)
    # level_a processing is now done, pass back the results
    return result

async def level_b(data):
    # similar to level a
    level_c_inputs = data/2, data/4, data/7
    results = await asyncio.gather(*[level_c(val) for val in level_c_inputs])
    result = await loop.run_in_executor(executor, cpu_bound_op, 2, *results)
    return result

async def level_c(data):
    # final level - queue up the long-running CPU-bound process in the
    # thread pool immediately
    result = await loop.run_in_executor(executor, cpu_bound_op, 1, data)
    return result

def main():
    start_time = time.time()
    result = loop.run_until_complete(process_pipeline(2.5))
    logger.info("Completed ({}) in {} seconds".format(result, time.time() - start_time))

if __name__ == '__main__':
    main()

INFO:async:Running cpu-bound op on (1.875,) for 1 seconds
INFO:async:Running cpu-bound op on (1.0714285714285714,) for 1 seconds
INFO:async:Running cpu-bound op on (3.75,) for 1 seconds
INFO:async:Running cpu-bound op on (0.625,) for 1 seconds
INFO:async:Running cpu-bound op on (0.35714285714285715,) for 1 seconds
INFO:async:Running cpu-bound op on (1.25,) for 1 seconds
INFO:async:Running cpu-bound op on (2.5,) for 1 seconds
INFO:async:Running cpu-bound op on (0.7142857142857143,) for 1 seconds
INFO:async:Running cpu-bound op on (1.25,) for 1 seconds
INFO:async:Running cpu-bound op on (3.75, 1.875, 1.0714285714285714) for 2 seconds
INFO:async:Running cpu-bound op on (2.5, 1.25, 0.7142857142857143) for 2 seconds
INFO:async:Running cpu-bound op on (1.25, 0.625, 0.35714285714285715) for 2 seconds
INFO:async:Running cpu-bound op on (2.232142857142857, 4.464285714285714, 6.696428571428571) for 3 seconds
INFO:async:Completed (13.392857142857142) in 6.011924982070923 seconds


In [38]:
# test generators

def mygen(s):
    while True:
        line = yield
        if s in line:
            print(line)

In [39]:
testgen = mygen('test')

In [40]:
testgen.send(None)

In [41]:
testgen.send('testing')

testing


In [42]:
{}.keys()

dict_keys([])