# Async programming

## I/O 대기
+ CPU 클럭의 속도가 1초라고 가정했을 때
+ RAM은 20초, SSD는 4일, HDD 6개월, 네트워크 전송은?

## 동시성
+ 여러 개의 요청을 동시에 다룰 수 있는 시스템을 구현하는 방식
+ 아래는 동기식으로 구현된 기본 함수

In [1]:
import time 
import functools


DEFAULT_FMT = '[{elapsed:0.8f}s] {name}({args}, {kwargs}) -> {result}'

def clock(fmt=DEFAULT_FMT): 
    def decorate(func): 
        @functools.wraps(func)
        def clocked(*_args, **_kwargs): # clocked에서 *, ** 키워드를 통해 설정된 인수를 변수화
            t0 = time.time()
            _result = func(*_args)
            elapsed = time.time() - t0
            name = func.__name__
            args = ', '.join(repr(arg) for arg in _args)
            pairs = ['%s=%r' % (k, w) for k, w in sorted(_kwargs.items())]
            kwargs = ', '.join(pairs)
            result = repr(_result)
            print(fmt.format(**locals()))
            return _result # clocked()는 데커레이트된 함수를 대체하므로, 원래 함수가 반환하는 값을 반환해야 한다.
        return clocked     # decorate()는 clocked()를 반환한다. 
    return decorate        # clock()은 decorate()를 반환한다. 

In [2]:
def network_request(number):
    time.sleep(1.0)
    return {"success": True, "result": number ** 2}


@clock()
def fetch_square(number):
    response = network_request(number)
    if response["success"]:
        return response["result"]

        
if __name__ == "__main__":
    t0 = time.time()

    returns = [fetch_square(i) for i in range(10)]
    
    elapsed = time.time() - t0
    print("Total:", elapsed, "sec")

[1.00107098s] fetch_square(0, ) -> 0
[1.00105882s] fetch_square(1, ) -> 1
[1.00104594s] fetch_square(2, ) -> 4
[1.00107336s] fetch_square(3, ) -> 9
[1.00061846s] fetch_square(4, ) -> 16
[1.00104070s] fetch_square(5, ) -> 25
[1.00106454s] fetch_square(6, ) -> 36
[1.00108457s] fetch_square(7, ) -> 49
[1.00107193s] fetch_square(8, ) -> 64
[1.00106812s] fetch_square(9, ) -> 81
Total: 10.012650966644287 sec


### Callback

+ 원리만 이해하고 복잡하니 쓰지 말자

In [3]:
import threading


def network_request_async(number, on_done):
    
    def timer_done():
        time.sleep(1.0)
        on_done({"success": True, "result": number ** 2})

    timer = threading.Thread(target=timer_done, args=[])
    timer.start()
    
    
@clock()
def fetch_square_async(number):
    
    def on_done(response):
        if response["success"]:
            return response["result"]
        
    network_request_async(number, on_done)

In [4]:
if __name__ == "__main__":
    t0 = time.time()

    returns = [fetch_square_async(i) for i in range(10)]
    
    elapsed = time.time() - t0
    print("Total:", elapsed, "sec")

[0.00017619s] fetch_square_async(0, ) -> None
[0.00045419s] fetch_square_async(1, ) -> None
[0.00030136s] fetch_square_async(2, ) -> None
[0.00009036s] fetch_square_async(3, ) -> None
[0.00022030s] fetch_square_async(4, ) -> None
[0.00024152s] fetch_square_async(5, ) -> None
[0.00020170s] fetch_square_async(6, ) -> None
[0.00019383s] fetch_square_async(7, ) -> None
[0.00023293s] fetch_square_async(8, ) -> None
[0.00019050s] fetch_square_async(9, ) -> None
Total: 0.0029489994049072266 sec


### Future

+ 요청한 자원을 추적하고 가용하게 될 때 까지 대기하는 데 도움이 되는 추상화

In [5]:
from concurrent.futures import Future

def callback(future):
    print(future.result()[::-1])

fut = Future()
fut.add_done_callback(callback)

In [6]:
vars(fut)

{'_condition': <Condition(<unlocked _thread.RLock object owner=0 count=0 at 0x7f69f02f7810>, 0)>,
 '_state': 'PENDING',
 '_result': None,
 '_exception': None,
 '_waiters': [],
 '_done_callbacks': [<function __main__.callback(future)>]}

In [7]:
fut.set_result("HELLO") ## 퓨쳐를 리턴함

OLLEH


In [8]:
vars(fut)

{'_condition': <Condition(<unlocked _thread.RLock object owner=0 count=0 at 0x7f69f02f7810>, 0)>,
 '_state': 'FINISHED',
 '_result': 'HELLO',
 '_exception': None,
 '_waiters': [],
 '_done_callbacks': [<function __main__.callback(future)>]}

+ Network_request_async 를 쓰레드 방식으로 변환

In [9]:
def network_reqeust_async(number):
    future = Future()
    result = {"success": True, "result": number ** 2}
    timer = threading.Timer(1.0, lambda: future.set_result(result))
    timer.start()
    return future


@clock()
def fetch_square_async(number):
    fut = network_reqeust_async(number)

    def on_done_future(future):
        response = future.result()
        if response["success"]:
#             return response["result"]
            print(response["result"], flush=True)        
        
    fut.add_done_callback(on_done_future)

In [10]:
if __name__ == "__main__":
    t0 = time.time()

    returns = [fetch_square_async(i) for i in range(10)]
    
    elapsed = time.time() - t0
    print("Total:", elapsed, "sec")

[0.00019860s] fetch_square_async(0, ) -> None
[0.00034356s] fetch_square_async(1, ) -> None
[0.00163865s] fetch_square_async(2, ) -> None
[0.00030327s] fetch_square_async(3, ) -> None
[0.00018096s] fetch_square_async(4, ) -> None
[0.00019813s] fetch_square_async(5, ) -> None
[0.00025463s] fetch_square_async(6, ) -> None
[0.00024033s] fetch_square_async(7, ) -> None
[0.00027776s] fetch_square_async(8, ) -> None
[0.00033355s] fetch_square_async(9, ) -> None
Total: 0.004523038864135742 sec


## asyncio 프레임워크
+ loop = asyncio.get_event_loop() 함수를 호출해 asyncio 루프를 얻을 수 있음
+ loop.call_later() 를 사용해 콜백 실행을 예약할 수 있음
+ loop.stop 메소드를 사용해서 루프 종료
+ loop.run_forever() 로 루프 시작 가능
+ loop.run_until_complete()을 통해 개별적인 실행 가능
+ ensure_future()는 Task 인스턴스를 반환해서 실행하고 싶은 코루틴을 이벤트 루프로 전달 가능

In [11]:
import asyncio
import nest_asyncio
nest_asyncio.apply() # This module patches asyncio to allow nested (due to jupyter notebook)

async def network_request(number):
    await asyncio.sleep(1.0)
    return {"success": True, "result": number ** 2}

@clock()
async def fetch_square(number):
    response = await network_request(number)
    if response["success"]:
        print(response["result"], flush=True)

In [12]:
if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    t0 = time.time()

    loop.run_until_complete(fetch_square(0))
    loop.run_until_complete(fetch_square(1))
    loop.run_until_complete(fetch_square(2))
    
    elapsed = time.time() - t0
    print("Total:", elapsed, "sec")

[0.00000048s] fetch_square(0, ) -> <coroutine object fetch_square at 0x7f69f0269cb0>
01

436
2549

9

64
16
81
0
[0.00000048s] fetch_square(1, ) -> <coroutine object fetch_square at 0x7f69f02a8830>
1
[0.00000262s] fetch_square(2, ) -> <coroutine object fetch_square at 0x7f69f02a8830>
4
Total: 3.008643627166748 sec


In [13]:
if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    t0 = time.time()

    loop.run_until_complete(asyncio.gather(fetch_square(0), fetch_square(1), fetch_square(2)))
    
    elapsed = time.time() - t0
    print("Total:", elapsed, "sec")

[0.00000048s] fetch_square(0, ) -> <coroutine object fetch_square at 0x7f69f0253200>
[0.00000095s] fetch_square(1, ) -> <coroutine object fetch_square at 0x7f69f028c710>
[0.00000072s] fetch_square(2, ) -> <coroutine object fetch_square at 0x7f69f028cf80>
0
1
4
Total: 1.003474473953247 sec


In [14]:
if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    t0 = time.time()

    asyncio.ensure_future(fetch_square(0))
    asyncio.ensure_future(fetch_square(1))
    asyncio.ensure_future(fetch_square(2))
    
#     loop.run_forever()

    elapsed = time.time() - t0
    print("Total:", elapsed, "sec")

[0.00000072s] fetch_square(0, ) -> <coroutine object fetch_square at 0x7f69f02a85f0>
[0.00000095s] fetch_square(1, ) -> <coroutine object fetch_square at 0x7f69f02a8c20>
[0.00000072s] fetch_square(2, ) -> <coroutine object fetch_square at 0x7f69f02a8830>
Total: 0.00031566619873046875 sec


In [5]:
import asyncio
import nest_asyncio
nest_asyncio.apply() # This module patches asyncio to allow nested (due to jupyter notebook)

from aiofile import AIOFile

path = Path("/home/shyeon/workspace/python/SparkDefinitiveGuide/data/activity-data")


async def load_json(file):
    async with AIOFile(file, "r", encoding="UTF-8") as afp:
        return await afp.read()

In [7]:
@clock()
async def main():
    dd = defaultdict(list)
    files = path.glob("*.json")
    
    for file in list(files)[:2]:
        jsons = await load_json(file)
        print(jsons)
   

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)

