# Future를 이용한 동시성
아래는 각 나라의 국기를 순차적으로 다운로드받는 스크립트다.  

In [1]:
import os
import time
import sys

import requests

POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()

# BASE_URL = 'http://flupy.org/data/flags'  # using remote (not working well)
# BASE_URL = "http://nginx:8001/flags"  # using local
BASE_URL = "http://vaurien_delay:8002/flags"  # using local delayed
# BASE_URL = "http://vaurien_error_delay:8003/flags" # using local error

DEST_DIR = 'downloads/'

def save_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)


def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = requests.get(url)
    return resp.content


def show(text):
    print(text, end=' ')
    sys.stdout.flush()


def download_one(cc):
    image = get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc


def download_many(cc_list):
    for cc in sorted(cc_list):
        download_one(cc)
        # image = get_flag(cc)
        # show(cc)
        # save_flag(image, cc.lower() + '.gif')

    return len(cc_list)


def main(download_many):
    os.makedirs(DEST_DIR, exist_ok=True)
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))

In [2]:
main(download_many)

BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN 
20 flags downloaded in 20.25s


## Multi-Threading
이제 futures의 `ThreadPoolExecutor`를 사용하여 다운로드하자.  
- `workers`: thread 수
- `executor.map`을 통해 download_one을 multi-thread로 실행

In [3]:
from concurrent import futures

MAX_WORKERS = 20


def download_many(cc_list):
    workers = min(MAX_WORKERS, len(cc_list))
    with futures.ThreadPoolExecutor(workers) as executor:
        res = executor.map(download_one, sorted(cc_list))
        
    print()
    print(list(res))
    return len(list(res))


In [4]:
main(download_many)

BD BR CD CN DE EG ET ID FRIN IR  JPMX  PH RU TR PK NG VN US 
['BD', 'BR', 'CD', 'CN', 'DE', 'EG', 'ET', 'FR', 'ID', 'IN', 'IR', 'JP', 'MX', 'NG', 'PH', 'PK', 'RU', 'TR', 'US', 'VN']

0 flags downloaded in 1.11s


위의 스크립트는 Future를 사용하였지만 직접 건드리지 않았다.  
'Future'는 concurrent.futures와 asyncio에 내장된 컴포넌트이다.  
대기 중인 작업을 큐에 넣고, 완료 상태를 조사하고, 결과를 가져올 수 있도록 캡슐화한다.  
  
Future는 직접 생성하지 않는 것이 일반적이다. (concurrent.futures나 asyncio에서 배타적으로 생성)  
Future 클래스 내부에는 다음과 같은 method가 있다.  
- `done()`: Non-blocking, callable이 완료되었는지 여부 반환 (boolean)  
- `add_done_callback()`: future 객체 작업이 완료되면 callback 함수가 future를 인수로 받아 호출됨  

Future 클래스 내부를 보기 위해 다음 구조로 download_many를 대체해 보자.  
- `Executor.submit()`: 콜러블이 실행되도록 스케줄링 & Future 클래스 반환  
- `future.as_completed()`: Future 객체를 담은 반복형을 인수로 받아서, 완료된 Future 객체를 생성하는 반복자 반환  

In [5]:
def download_many(cc_list):
    cc_list = cc_list[:5]  # <1>
    with futures.ThreadPoolExecutor(max_workers=3) as executor:  # <2>
        to_do = []
        for cc in sorted(cc_list):  # <3>
            future = executor.submit(download_one, cc)  # <4>
            to_do.append(future)  # <5>
            msg = 'Scheduled for {}: {}'
            print(msg.format(cc, future))  # <6>

        results = []
        for future in futures.as_completed(to_do):  # <7>
            res = future.result()  # <8>
            msg = '{} result: {!r}'
            print(msg.format(future, res)) # <9>
            results.append(res)

    return len(results)

In [6]:
main(download_many)

Scheduled for BR: <Future at 0x7fb09852da50 state=running>
Scheduled for CN: <Future at 0x7fb09852d060 state=running>
Scheduled for ID: <Future at 0x7fb09852dfc0 state=running>
Scheduled for IN: <Future at 0x7fb09852f1c0 state=pending>
Scheduled for US: <Future at 0x7fb09852ee00 state=pending>
CN ID BR <Future at 0x7fb09852d060 state=finished returned str> result: 'CN'
<Future at 0x7fb09852dfc0 state=finished returned str> result: 'ID'
<Future at 0x7fb09852da50 state=finished returned str> result: 'BR'
IN <Future at 0x7fb09852f1c0 state=finished returned str> result: 'IN'
US <Future at 0x7fb09852ee00 state=finished returned str> result: 'US'

5 flags downloaded in 2.05s


위의 작업들은 multi-threading을 사용해 구현되어 있다.  
CPython 인터프리터는 'GIL(Global Interpreter Lock)'을 가지고 있어서, 한 번에 한 쓰레드만 실행 가능하다.  
그렇지만 Blocking I/O를 실행할 때는 GIL을 해제하기 때문에, 위와 같이 네트워크로부터 응답을 기다리는 동안에 다른 쓰레드가 실행될 수 있다.

## Multi-Processing
ThreadPoolExecutor 대신 ProcessPoolExecutor를 사용하면 multi-processing으로 구현할 수 있다.  
- CPU 코어 수보다 많은 프로세스를 실행하면 multi-threading에 비해 효율이 떨어질 수 있다.
- 계산 위주의 작업에서 높은 성능을 보인다.

In [7]:
def download_many(cc_list):
    workers = min(MAX_WORKERS, len(cc_list))
    with futures.ProcessPoolExecutor(workers) as executor:
        res = executor.map(download_one, sorted(cc_list))

    return len(list(res))

In [8]:
main(download_many)

BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN 
20 flags downloaded in 1.25s


## [Executor.map()] vs [Executor.submit() + future.as_completed()]
- `Executor.map()`은 결과를 순차적으로 반환한다.
    - 2번째, 3번째 작업이 끝나도 1번째부터 결과를 반환
- `Executor.submit()` + `future.as_completed()` 방식을 사용하면 결과를 완료 즉시 반환한다.