In [None]:
#default_exp asyncUtil

# async
> tools to help writing async python codes

In [None]:
#hide
from nbdev.showdoc import *

# async wrap

In [None]:
#export
import asyncio
from functools import wraps, partial

def async_wrap(func):
    @wraps(func)
    async def run(*args, loop=None, executor=None, **kwargs):
        if loop is None:
            loop = asyncio.get_running_loop()
        pfunc = partial(func, *args, **kwargs)
        return await loop.run_in_executor(executor, pfunc)
    return run


In [None]:
%%time
@async_wrap
def aSlowFunc(input_:str):
  time.sleep(2)
  return input_

## async func execute
import nest_asyncio, time
nest_asyncio.apply()
import time


async def runASlowFunc(input_):
  return await aSlowFunc(input_)
async def runLoop():
  rtup = (runASlowFunc(i) for i in range (10))
  r = await asyncio.gather(*rtup)
  return r
asyncio.run(runLoop())

CPU times: user 2.78 ms, sys: 3.69 ms, total: 6.47 ms
Wall time: 4.01 s


[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [None]:
async def main():
    await asyncio.sleep(1)
    print('hello')

asyncio.run(main())

hello


# thread mapping

In [None]:
#export
import multiprocessing.dummy
from typing import Callable, List, Any, Iterable
from beartype import beartype
@beartype
def asyncMap(f:Callable, data:Iterable[Any], threads:int = 5)->Any:
  p = multiprocessing.dummy.Pool(threads)
  return p.map(f,data)

In [None]:
%%time
import time
asyncMap(lambda x: (x+1, time.sleep(1))[0] , range(100), threads = 100)[:10]

CPU times: user 12.3 ms, sys: 8.86 ms, total: 21.1 ms
Wall time: 1.03 s


[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [None]:
def aSlowFunc(x):
  time.sleep(1)
  return x

In [None]:
%%time
asyncMap(aSlowFunc, range(100))[:10]

CPU times: user 7.04 ms, sys: 268 µs, total: 7.31 ms
Wall time: 20 s


[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [None]:
input_ = list(zip(range(10), range(1,11)))
print(input_)
asyncMap(lambda x: (lambda x,y: x+y )(x[0],x[1]), input_)

[(0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, 9), (9, 10)]


[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]

# asyncAwaitMap

In [None]:
#export
def asyncAwaitMap(f:Callable, data:Iterable[Any])->Any:
  af = async_wrap(f) # convert to async func
  async def runLoop():
    rtup = (af(i) for i in data)
    return await asyncio.gather(*rtup)
  return asyncio.run(runLoop())

In [None]:
%%time
import nest_asyncio
nest_asyncio.apply()
asyncAwaitMap(aSlowFunc, range(100))[:10]

CPU times: user 21 ms, sys: 4.2 ms, total: 25.2 ms
Wall time: 17 s


[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [None]:
input_ = list(zip(range(10), range(1,11)))
print(input_)
asyncAwaitMap(lambda x: (lambda x,y: x+y )(x[0],x[1]), input_)

[(0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, 9), (9, 10)]


[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]

# AsyncThread

In [None]:
#export
from concurrent.futures import ThreadPoolExecutor
def asyncThreadMap(f,data, threads=10):
  with ThreadPoolExecutor(threads) as tr:
    return tr.map(f,data)

In [None]:
%%time
def aSlowFunc(x):
  time.sleep(1)
  return x

list(asyncThreadMap(aSlowFunc, range(100)))[:10]

CPU times: user 9.18 ms, sys: 1.04 ms, total: 10.2 ms
Wall time: 10 s


[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

# AsyncProcess map

In [None]:
#export
from concurrent.futures import ProcessPoolExecutor
def asyncProcessMap(f,data, threads=10):
  with ProcessPoolExecutor(threads) as tr:
    return tr.map(f,data)

In [None]:
%%time
def aSlowFunc(x):
  time.sleep(1)
  return x

list(asyncProcessMap(aSlowFunc, range(100)))[:10]

CPU times: user 24.6 ms, sys: 50.9 ms, total: 75.6 ms
Wall time: 10.1 s


[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]