In [5]:
import os

import labbench

print('working directory: ', os.path.abspath('.'))
print('module path: ', labbench.__path__)

working directory:  c:\Users\dkuester\Documents\src\labbench\tests
module path:  ['C:\\Users\\dkuester\\Documents\\src\\labbench\\labbench']


In [12]:
import asyncio
import time

import nest_asyncio

nest_asyncio.apply()


async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)


async def main():
    task1 = asyncio.create_task(say_after(1, 'hello'))

    task2 = asyncio.create_task(say_after(2, 'world'))

    print(f"started at {time.strftime('%X')}")

    # Wait until both tasks are completed (should take
    # around 2 seconds.)
    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")


asyncio.run(main())

started at 11:31:29
hello
world
finished at 11:31:31


In [166]:
from functools import partial

hi = None


def blocking_call(delay, message):
    t0 = time.perf_counter()

    try:
        while time.perf_counter() - t0 < delay:
            time.sleep(0.05)
            # time.sleep(delay)
    except:
        print('* cancelled')
        raise
    finally:
        print(f'delay {delay} finished ', time.perf_counter())

    if delay <= 0.5:
        raise ValueError(f'whoops {delay} {message}')

    return delay


awaitables = []


async def _thread_main(concurrent=True, **kws):
    async def awaitify(func, *args, **kws):
        try:
            if asyncio.iscoroutine(func):
                return func
            elif concurrent:
                loop = asyncio.get_running_loop()
                return await loop.run_in_executor(None, partial(func, *args, **kws))
            else:
                return func(*args, **kws)
        except asyncio.CancelledError:
            print('cancel')
            raise

    awaitables = [awaitify(blocking_call, v, k) for k, v in kws.items()]

    # tasks = [asyncio.create_task(asyncio.ensure_future(a)) for a in awaitables]
    try:
        gather = asyncio.gather(*awaitables, return_exceptions=False)
        rets = await asyncio.ensure_future(gather)
    except:
        print(gather)
        for a in awaitables:
            try:
                a.throw(asyncio.CancelledError)
            except BaseException as ex:
                print('fun: ', ex)
                continue
        raise

    return rets
    # return dict(zip(kws.keys(), rets))


def run(**kws):
    return asyncio.run(_thread_main(**kws))


try:
    ret = run(hello=0.5, there=2.5, concurrent=True)
finally:
    print('outer finally: ', time.perf_counter())

delay 0.5 finished  6625.2482755
fun:  cannot reuse already awaited coroutine
cancel
fun:  
outer finally:  6625.2488247


ValueError: whoops 0.5 hello

In [160]:
a = awaitables[1]
a.throw(asyncio.CancelledError)

RuntimeError: cannot reuse already awaited coroutine

In [163]:
a.send?

[1;31mDocstring:[0m
send(arg) -> send 'arg' into coroutine,
return next iterated value or raise StopIteration.
[1;31mType:[0m      builtin_function_or_method


In [154]:
a = await blocking_call(0.5, 'nothin')

delay 0.5 finished  5948.4740669


ValueError: whoops 0.5 nothin

In [None]:
loop = asyncio.get_running_loop