# CH17 Future를 이용한 동시성
* 비동기 작업을 위한 Future 객체

## 17.1 예제 : 세 가지 스타일의 웹 내려받기
* flags.py, flags_threadpool.py, flags_asyncio.py 비교
    * flags.py : 일반 순차 처리 방식 (소요시간 7.18초)
    * flags_threadpool.py : 스레드를 이용한 동시성 처리 방식 (소요시간 1.40초)
    * flags_asyncio.py : 단일 스레드 asyncio 를 이용한 동시성 처리 방식 (소요시간 1.35초)

In [1]:
# flags.py

import os
import time
import sys

import requests

POP20_CC = ('CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR').split()

BASE_URL = 'http://flupy.org/data/flags'

DEST_DIR = 'downloads/'

def save_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)

def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = requests.get(url)
    return resp.content

def show(text):
    print(text, end=' ')
    sys.stdout.flush()

def download_many(cc_list):
    for cc in sorted(cc_list):
        image = get_flag(cc)
        show(cc)
        save_flag(image, cc.lower() + '.gif')
    return len(cc_list)

def main(download_many):
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))

In [2]:
main(download_many)

BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN 
20 flags downloaded in 10.39s


In [3]:
# flags_threadpool.py

from concurrent import futures

MAX_WORKERS = 20 # Thread 수

def download_one(cc):
    image = get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc

def download_many(cc_list):
    workers = min(MAX_WORKERS, len(cc_list))
    with futures.ThreadPoolExecutor(workers) as executor: # 모든 스레드가 완료될때까지 블록
        res = executor.map(download_one, sorted(cc_list)) # 스레드 마다 작업 할당
    
    return len(list(res))

In [6]:
main(download_many) # 순서가 다름.

CN EG DE RU TR IN PKJP ID MX IR FR  PH NG ET US BD VN BR CD 
20 flags downloaded in 1.24s


* Future 클래스
    * concurrent.futures.Future
    * asyncio.Future
    * 대기중인 작업을 큐에 넣고, 완료 상태를 조사하고 결과를 가져올 수 있도록 캡슐화 함
    * 직접 Future 객체를 생성하면 안됨. 동시성 프레임워크(concurrent.futures, asyncio) 에서 배타적으로 생성
        * Future 는 앞으로 일어날 일을 나타내고, 이 실행 스케쥴링은 프레임워크가 알고 있기 때문
    * concurrent.futures.Future 객체는 concurrent.futures.Executor 의 서브클래스로 실행을 스케쥴링해야만 생성됨.
        * `Executor.submit()` 메서드는 콜러블을 받아서 실행을 스케쥴링하고 Future 객체를 반환
    * Future 객체의 상태 역시 직접 변경하면 안됨, 프레임워크가 컨트롤하기 때문
    * Future 클래스는 논블로킹이며, 이 객체에 연결된 콜러블 실행이 완료되었는지 여부를 알려주는 `done()` 메서드가 있음.
    * `done()` 이 참인경우, `add_done_callback()` 메서드가 수행.
    * `result()` 메서드 : 완료되었을때, 콜러블 결과를 반환하거나 콜러블 실행시 예외를 다시 발생
        * concurrent.futures.Future 는 결과가 나올때까지 호출자의 스레드 블로킹 (timeout 설정 가능)
        * asyncio.Future 는 yield from 으로 객체 상태를 가져와서 컨트롤 가능

참고 : http://homoefficio.github.io/2017/02/19/Blocking-NonBlocking-Synchronous-Asynchronous/

* Future 객체를 실제로 보기 위해, `futures.as_completed()` 함수로 변경

In [7]:
def download_many(cc_list):
    cc_list = cc_list[:5]
    with futures.ThreadPoolExecutor(max_workers=3) as executor: # executor.map 을 submit 와 as_completed 로 변경
        to_do = []
        for cc in sorted(cc_list):
            future = executor.submit(download_one, cc)
            to_do.append(future)
            msg = 'Scheduled for {}: {}'
            print(msg.format(cc, future))
        
        results=[]
        for future in futures.as_completed(to_do): # 완료된 것부터 호출되는 듯!
            res = future.result() # 블로킹 되지 않음.. as_completed 가 완료될때 생성되기 때문
            msg = '{} result: {!r}'
            print(msg.format(future, res))
            results.append(res)
        
    return len(results)

In [8]:
main(download_many)

Scheduled for BR: <Future at 0x7f8259f4bd90 state=running>
Scheduled for CN: <Future at 0x7f8259f5f490 state=running>
Scheduled for ID: <Future at 0x7f825871aa10 state=running>
Scheduled for IN: <Future at 0x7f82586cc790 state=pending>
Scheduled for US: <Future at 0x7f82586cc190 state=pending>
CN <Future at 0x7f8259f5f490 state=finished returned str> result: 'CN'
BR <Future at 0x7f8259f4bd90 state=finished returned str> result: 'BR'
ID <Future at 0x7f825871aa10 state=finished returned str> result: 'ID'
IN <Future at 0x7f82586cc790 state=finished returned str> result: 'IN'
US <Future at 0x7f82586cc190 state=finished returned str> result: 'US'

5 flags downloaded in 1.08s


* cocurrent.futures 는 전역 인터프리터 락(Global Interpreter Lock) 에 의해 제한되어 병렬 처리 되지 않음

## 17.2 블로킹 I/O와 GIL
* GIL 은 한번에 한 스레드만 파이썬 바이트코드를 실행하도록 제한 -> 단일 파이썬 프로세스가 다중 CPU 코어를 이용할 수 없다
* 그런데 블로킹 입출력을 실행하는 모든 표준 라이브러리 함수는 OS에서 결과를 기다리는 동안 GIL을 해제한다
    * 입출력 위주 작업의 파이썬은 스레드를 이용함으로써 네트워크 응답을 기다리는 동안 다른 스레드가 실행될 수 있어 이득을 볼 수 있다

## 17.3 concurrent.futures로 프로세스 실행하기
* 병렬 작업의 실행
    * ProcessPoolExecutor 를 사용하여 GIL 우회 (다중 프로세스에 작업을 분산 시켜 병렬 컴퓨팅을 가능케함)
    * ThreadPoolExecutor 와 같은 Executor 인터페이스를 구현하므로 쉽게 전환 가능
    
참고 : https://gmlwjd9405.github.io/2018/09/14/process-vs-thread.html

In [11]:
def download_many(cc_list):
    workers = min(MAX_WORKERS, len(cc_list))
    with futures.ProcessPoolExecutor() as executor:
        res = executor.map(download_one, sorted(cc_list))
    
    return len(list(res))

In [12]:
main(download_many) # 별로 안빨라짐.. 쓰레드로도 충분히 이득을 볼 수 있기 때문에

BR CD BD DE CN EG ID FR ET JP IR IN MX NG PH PK RU TR US VN 
20 flags downloaded in 1.18s


* 입출력이 아닌 계산 위주의 작업에서 병렬처리의 진가가 발휘됨. (입출력은 스레드 환경에서 블로킹 없이 가능하므로)

## 17.4 Executor.map() 실험


In [16]:
# demo_executor_map.py
from time import sleep, strftime
from concurrent import futures

def display(*args):
    print(strftime('[%H:%M:%S]'), end=' ')
    print(*args)

def loiter(n):
    msg = '{}loiter({}): doing nothing for {}s...'
    display(msg.format('\t'*n, n, n))
    sleep(n)
    msg = '{}loiter({}): done.'
    display(msg.format('\t'*n, n))
    return n * 10

def main():
    display("Script starting.")
    executor = futures.ThreadPoolExecutor(max_workers=3)
    results = executor.map(loiter, range(5)) # 논블로킹 메서드
    display('results:', results)
    display("Waiting for individual results:")
    for i, result in enumerate(results): # next(results) 는 future.result() 호출 <- 블로킹
        display('result {}: {}'.format(i, result))

In [17]:
main()

[02:55:35] Script starting.
[02:55:35] loiter(0): doing nothing for 0s...
[02:55:35] loiter(0): done.
[02:55:35] 	loiter(1): doing nothing for 1s...
[02:55:35] 		loiter(2): doing nothing for 2s...
[02:55:35] results: <generator object Executor.map.<locals>.result_iterator at 0x7f8270a7be50>
[02:55:35] Waiting for individual results:
[02:55:35] result 0: 0
[02:55:35] 			loiter(3): doing nothing for 3s...
[02:55:36] 	loiter(1): done.
[02:55:36] 				loiter(4): doing nothing for 4s...
[02:55:36] result 1: 10
[02:55:37] 		loiter(2): done.
[02:55:37] result 2: 20
[02:55:38] 			loiter(3): done.
[02:55:38] result 3: 30
[02:55:40] 				loiter(4): done.
[02:55:40] result 4: 40


## 17.5 진행 상황 출력하고 에러를 처리하며 내려받기
* 기존코드는 에러처리 안함, 다양한 에러 조건 처리 테스트

In [25]:
import time
from tqdm import tqdm
for i in tqdm(range(1000)):
    time.sleep(.01)

100%|██████████| 1000/1000 [00:10<00:00, 96.60it/s]


In [18]:
!python flags2_threadpool.py -h

usage: flags2_threadpool.py [-h] [-a] [-e] [-l N] [-m CONCURRENT] [-s LABEL]
                            [-v]
                            [CC [CC ...]]

Download flags for country codes. Default: top 20 countries by population.

positional arguments:
  CC                    country code or 1st letter (eg. B for BA...BZ)

optional arguments:
  -h, --help            show this help message and exit
  -a, --all             get all available flags (AD to ZW)
  -e, --every           get flags for every possible code (AA...ZZ)
  -l N, --limit N       limit to N first codes
  -m CONCURRENT, --max_req CONCURRENT
                        maximum concurrent requests (default=30)
  -s LABEL, --server LABEL
                        Server to hit; one of DELAY, ERROR, LOCAL, REMOTE
                        (default=LOCAL)
  -v, --verbose         output detailed progress info


In [26]:
!python flags2_sequential.py -s REMOTE

REMOTE site: http://flupy.org/data/flags
Searching for 20 flags: from BD to VN
1 concurrent connection will be used.
100%|███████████████████████████████████████████| 20/20 [00:10<00:00,  1.89it/s]
--------------------
0 flags downloaded.
20 not found.
Elapsed time: 10.61s


In [27]:
!python flags2_threadpool.py -s REMOTE a b c

REMOTE site: http://flupy.org/data/flags
Searching for 78 flags: from AA to CZ
30 concurrent connections will be used.
100%|███████████████████████████████████████████| 78/78 [00:01<00:00, 41.19it/s]
--------------------
0 flags downloaded.
78 not found.
Elapsed time: 2.14s


In [22]:
!pip install aiohttp

Please see https://github.com/pypa/pip/issues/5599 for advice on fixing the underlying issue.
To avoid this problem you can invoke Python with '-m pip' instead of running pip directly.
Collecting aiohttp
  Downloading aiohttp-3.6.2-cp37-cp37m-manylinux1_x86_64.whl (1.2 MB)
[K     |████████████████████████████████| 1.2 MB 853 kB/s eta 0:00:01
Collecting async-timeout<4.0,>=3.0
  Downloading async_timeout-3.0.1-py3-none-any.whl (8.2 kB)
Collecting multidict<5.0,>=4.5
  Downloading multidict-4.7.6-cp37-cp37m-manylinux1_x86_64.whl (149 kB)
[K     |████████████████████████████████| 149 kB 10.9 MB/s eta 0:00:01
[?25hCollecting yarl<2.0,>=1.0
  Downloading yarl-1.4.2-cp37-cp37m-manylinux1_x86_64.whl (256 kB)
[K     |████████████████████████████████| 256 kB 8.5 MB/s eta 0:00:01
Installing collected packages: async-timeout, multidict, yarl, aiohttp
Successfully installed aiohttp-3.6.2 async-timeout-3.0.1 multidict-4.7.6 yarl-1.4.2


In [28]:
!python flags2_asyncio.py -s REMOTE -al 100 -m 100

REMOTE site: http://flupy.org/data/flags
Searching for 100 flags: from AD to LK
100 concurrent connections will be used.
100%|████████████████████████████████████████| 100/100 [00:00<00:00, 137.92it/s]
--------------------
0 flags downloaded.
100 not found.
Elapsed time: 0.75s


### 17.5.1 flags2 예제에서의 에러 처리

In [None]:
def get_flag(base_url, cc):
    url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
    resp = requests.get(url)
    if resp.status_code != 200:  # http 응답 체크
        resp.raise_for_status()
    return resp.content


def download_one(cc, base_url, verbose=False):
    try:
        image = get_flag(base_url, cc)
    except requests.exceptions.HTTPError as exc:  # 위의 예외가 발생하면?
        res = exc.response
        if res.status_code == 404:
            status = HTTPStatus.not_found  # 404 인 경우만 처리
            msg = 'not found'
        else:  # 그 외인 경우, 같은 HTTPError 예외 다시 발생
            raise
    else:
        save_flag(image, cc.lower() + '.gif')
        status = HTTPStatus.ok
        msg = 'OK'

    if verbose:  # 디버깅 모드일 경우, 로그 출력
        print(cc, msg)

    return Result(status, cc)

In [None]:
def download_many(cc_list, base_url, verbose, max_req):
    counter = collections.Counter()  # HttpStatus 통계 구할때 사용
    cc_iter = sorted(cc_list)
    if not verbose:
        cc_iter = tqdm.tqdm(cc_iter)
    for cc in cc_iter:
        try:
            res = download_one(cc, base_url, verbose)
        except requests.exceptions.HTTPError as exc:
            error_msg = 'HTTP error {res.status_code} - {res.reason}'
            error_msg = error_msg.format(res=exc.response)
        except requests.exceptions.ConnectionError as exc:
            error_msg = 'Connection error'
        else:
            error_msg = ''
            status = res.status

        if error_msg:
            status = HTTPStatus.error
        counter[status] += 1
        if verbose and error_msg:
            print('*** Error for {}: {}'.format(cc, error_msg))

    return counter

### 17.5.2 futures.as_completed() 사용하기

In [None]:
import collections
from concurrent import futures

import requests
import tqdm

from flags2_common import main, HTTPStatus
from flags2_sequential import download_one

DEFAULT_CONCUR_REQ = 30
MAX_CONCUR_REQ = 1000


def download_many(cc_list, base_url, verbose, concur_req):
    counter = collections.Counter()
    with futures.ThreadPoolExecutor(max_workers=concur_req) as executor:
        to_do_map = {}
        for cc in sorted(cc_list):
            future = executor.submit(download_one,
                            cc, base_url, verbose)  # 첫번째 인수 콜러블, 나머지는 인수는 콜러블에 전달되는 인수
            to_do_map[future] = cc
        done_iter = futures.as_completed(to_do_map)  # 완료되는 순서대로 반환
        if not verbose:
            done_iter = tqdm.tqdm(done_iter, total=len(cc_list))
        for future in done_iter:  # 완료되는 순서대로 반환
            try:
                res = future.result() # 블로킹 되지 않음.
            except requests.exceptions.HTTPError as exc:
                error_msg = 'HTTP {res.status_code} - {res.reason}'
                error_msg = error_msg.format(res=exc.response)
            except requests.exceptions.ConnectionError as exc:
                error_msg = 'Connection error'
            else:
                error_msg = ''
                status = res.status

            if error_msg:
                status = HTTPStatus.error
            counter[status] += 1
            if verbose and error_msg:
                cc = to_do_map[future]
                print('*** Error for {}: {}'.format(cc, error_msg))

    return counter


if __name__ == '__main__':
    main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)

### 17.5.3 스레드 및 멀티프로세스의 대안
* 기존 thread 모듈도 있긴 하지만, futures.ThreadPoolExecutor 나 threading 모듈 사용
* 스레드간 데이터를 전송할 때에는 queue 모듈에서 제공하는 스레드 안전한 큐 사용
* 계산 위주 작업의 경우, GIL 때문에 스레드 대신 대안 필요
    * futures.ProcessPoolExecutor
    * multiprocessing -> 데이터 공유를 쉽게 해줌