# Chapter.18 Asyncio를 이용한 동시성

동시성은 한 번에 많은 것을 다룬다.

병렬성은 한 번에 많은 것을 한다.

똑같지는 않지만, 연관성은 있다.

동시성은 구조, 병렬성은 실행에 관한 것이다.

동시성은 병렬화할 수 있는 문제를 해결하기위해

해결책을 구조화하는 방법을 제공한다.

**알아보자**
- 코루틴을 이용해서 동시성을 구현하는 asyncio 패키지에 대해서 설명
- 스레드와 비동기에 관계
- asyncio.Future와 concurrent.futures.Future의 차이점
- 국기 내려받기 예제의 비동기 버전 구현
- 비동기 프로그래밍이 네트워크 프로그램에서 높은 동시성을 관리하는 방법
- 콜백 개선 방법
- 블로킹 연산을 스레드풀에 덜어줌으로써 이벤트 루프를 블로킹하지 않는 방법
- asyncio 서버 작성, 높은 동시성 생각
- asyncio가 파이썬 생태계에 영향을 준 이유

## 18.1 스레드와 코루틴 비교

__파이썬 3.5에서 async와 await 키워드가 추가되었다.__

_yield from = await_

_@asyncio.coroutine = async_

In [1]:
import threading
import itertools
import time
import sys

class Signal: # 외부에서 제어하기 위해, 가변 객체 정의
    go = True

def spin(msg, signal):
    write, flush = sys.stdout.write, sys.stdout.flush
    for char in itertools.cycle('|/-\\'):  # itertools.cycle은 주어진 시퀀스를 순환하면서 계속 실행된다. 이 for문은 사실상 무한루프.
        status = char + ' ' + msg
        write(status)
        flush()
        write('\x08' * len(status))  # 애니메이션을 위해 문자열만큼 백스페이스(x08)를 통해 값을 빼고 커서를 앞으로 이동시킨다.
        time.sleep(.1)
        if not signal.go: # False면 빠져나온다.
            break
    write(' ' * len(status) + '\x08' * len(status))  # 다시 처음 커서로 이동시키고 메세지 출력행을 청소한다.


def slow_function():  # 실행시간이 오래걸리는 함수라 생각하자.
    time.sleep(3) # GIL이 해체된다. 다른 스레드 실행.
    return 42


def supervisor():
    signal = Signal()
    spinner = threading.Thread(target=spin,
                               args=('thinking!', signal))
    print('spinner object:', spinner)  # 두번째 스레드 출력
    spinner.start()  # 두번째 스레드 실행
    result = slow_function()  # 메인 스레드 블로킹, 두번째 스레드가 애니메이션을 보여준다.
    signal.go = False
    spinner.join()  # spinner 스레드가 끝날때까지 기다린다. join(timeout=None)
    return result


def main():
    result = supervisor()
    print('Answer:', result)

__파이썬은 스레드를 정지하는 API가 존재하지 않는다. 스레드에 메세지를 보내 종료시켜야 한다. 여기서는 **signal.go**__

**만약 종료 API를 보내지 않았다면, 메인 쓰레드가 종료되어도 계속 동작한다.**

In [2]:
import asyncio
import itertools
import sys


@asyncio.coroutine  # asyncio에 사용할 coroutine은 @asyncio.coroutine과 같이 데커레이트해야 한다.
def spin(msg):  # 스레드에서 사용하던 signal 인수가 없다.
    write, flush = sys.stdout.write, sys.stdout.flush
    for char in itertools.cycle('|/-\\'):
        status = char + ' ' + msg
        write(status)
        flush()
        write('\x08' * len(status))
        try:
            yield from asyncio.sleep(.1)  # 이벤트 루프를 블로킹 하지않고 잠자기 위해 yield from asyncio.sleep 사용 (time.sleep이 아니다.)
        except asyncio.CancelledError:  # CancelledError는 취소가 요청된 것이므로 루프를 종료한다.
            break
    write(' ' * len(status) + '\x08' * len(status))


@asyncio.coroutine
def slow_function():
    yield from asyncio.sleep(3)  # 메인 루프는 잠자고 난 후에 코루틴을 계속 실행한다.
    return 42


@asyncio.coroutine
def supervisor():
    spinner = asyncio.async(spin('thinking!'))  # 코루틴 실행을 스케쥴링하고 Task 객체 안에 넣어 Task 객체를 즉시 반환한다.
    print('spinner object:', spinner)  # Task 객체 출력
    result = yield from slow_function() 
    spinner.cancel()  # Task 객체를 취소한다. 그러면 yield from 에서 asyncio.CancelledError가 발생한다.
    return result


def main():
    loop = asyncio.get_event_loop()  # 이벤트 루프 참조
    result = loop.run_until_complete(supervisor())
    loop.close()
    print('Answer:', result)


> **주 스레드를 블로킹해서 이벤트 루프를 중단시키고 그래서 애플리케이션 전체를 멈추고 싶지 않으면 asyncio 코루틴 안에서 time.sleep()은 금지!! **

> **코루틴안에서는 yield from asyncio.sleep(초)를 사용해야한다. **

@asyncio.coroutine 데커레이터를 사용해야되는 이유.
- 일반함수와 다르게 보임.
- _코루틴이 yield from 되지 않고(일부작업이 완료되지 않았으므로, 버그가 발생할 가능성이 높다) 가비지 컬렉트되는 경우 경고 메세지 출력, 디버깅 도움_
- _@asyncio.coroutine 데커레이터된 제너레이터를 자동으로 기동하지 않음_

비교해보자

```python
def supervisor():
    signal = Signal()
    spinner = threading.Thread(target=spin,
                               args=('thinking!', signal))
    print('spinner object:', spinner)
    spinner.start()
    result = slow_function()
    signal.go = False
    spinner.join()
    return result

@asyncio.coroutine
def supervisor():
    spinner = asyncio.async(spin('thinking!'))
    print('spinner object:', spinner) 
    result = yield from slow_function() 
    spinner.cancel()
    return result
```

- asyncio.Task = threading.Thread
- Task는 코루틴, Thread는 콜러블
- Task는 객체는 직접 생성하지 않고, asyncio.async()나 loop.create_task()에 전달해서 가져온다.
- Task 객체를 가져오면, 이미 asyncio.async()등에 의해 실행이 스케쥴링. Thread 객체는 start() 메서드를 호출해서 실행하라고 명령.
- asyncio에서 slow_function()은 yield from으로 구동되는 코루틴. Thread에서 slow_function()은 평범한 함수로, 스레드에 의해 직접 호출. 
- Task에는 CancelledError를 발생하는 Task.cancel() 객체 메서드가 있다. Thread는 외부에서 API를 이용해 중단시킬 수 없다(무결성 훼손 방지). 
- supervisor() 코루틴은 main() 함수 안에서 loop.run_until_complete()로 실행해야한다.

쓰레드 보다 코루틴이 좋은 이유
- 기본적으로 인터럽트로부터 보호.(명시적으로 yield를 실행해야 프로그램 다른 부분이 실행되므로)
- 동기화를 위해 락을 잠그는게 아니라, 언제든 실행되고 있는 코루틴 중 하나만 사용하면 된다. 그리고 제어권을 넘겨주고 싶을 땐, yield나 yield from을 이용해서 스케줄러에게 넘겨 줄 수 있다. 그래서 **코루틴은 안전하다.**
- 코루틴은 yield 지점에서 중단되었을 때만 취소할 수 있다. 그래서 CancelledError 예외를 처리해서 마무리하면 된다.

## 18.1.1 asyncio.Future: 논블로킹 설계

**asyncio.Future와 concurrent.futures.Future 클래스 인터페이스가 거의 같지만 바꿔 쓸 수 없다.**

asyncio에서 BaseEventLoop.create_task() 메서드는 코루틴을 받아서 실행하기 위해 스케줄링하고, asyncio.Task 객체를 반환한다.

Taks는 Future의 서브 클래스이므로, Task 객체는 Future 객체이기도 하다.

asyncio.Future의 메소드 done(), add_done_callback()는 concurrent.futures.Future와 동일하나, **result()**는 아주 다르다.

차이점
1. 시간초과를 지정할 수 없다.
2. result() 메서드를 호출하면, 실행이 완료되지 않았다면 블로킹하고 완료를 기다리지 않고 asyncio.InvalidStateError 예외를 발생시킨다.

asyncio.Future에 결과를 가져오기 위해선 일반적으로 **yield from**을 이용한다.

yield from을 호출하면, 이벤트 루프를 블로킹하지 않고 작업 완료를 기다리는 과정을 자동으로 처리해준다.

yield from은 이벤트 루프에 제어권을 넘겨주기 위해 사용. (스케쥴러를 동작 시킨다.)

yield from와 add_done_callback()는 비슷하다.

콜백을 호출하지 않고 지연된 작업이 완료되면, 이벤트 루프는 Future 객체의 결과를 설정하고 yield from 표현식은 지연된 코루틴 내부에서 반환된 값을 생성하고 실행을 계속 진행한다.

그래서 다음과 같은 메서드가 필요없다.
- 코루틴 안에서 my_future가 실행을 완료한 다음에 수행할 작업은 단순히 yield from my_future 뒤에 넣으면 되므로 my_future.add_done_callback()을 호출할 필요가 없다.
- my_future에 대한 yield from 표현식의 값이 result가 되므로 my_future.result()를 호출할 필요가 없다. (result = yield from my_future)

물론 done(), add_done_callback(), result() 메서드가 필요한 경우도 있지만, 일반적으로 asyncio는 yield from으로 구동된다.

## 18.1.2 Future, Task, 코루틴에서 생성하기

asyncio에서는 yield from을 이용해서 asyncio.Future 객체를 가져올 수 있으므로 Future와 코루틴의 관계는 밀접.

foo()가 코루틴 함수거나 Future나 Task 객체를 반환하는 일반 함수면

**res = yield from foo()** 

코드가 작동한다는 것을 의미한다.

그렇기 때문에 asyncio API에서 코루틴과 Future 객체를 바꿔가면서 쓸 수 있는 경우가 많다.

실행하려면 반드시 **코루틴을 스케쥴링해야한다. 그러면 코루틴이 asyncio.Task 객체 안에 래핑된다.**

Task 객체를 가져오기 위한 방법 2가지

- asyncio.async(coroutine_or_future, *, loop=None)
    - 코루틴과 Future 객체 통합
    - Future나 Task 객체면 그대로 반환.
    - 코루틴이면 async()가 loop.create_task()를 호출해서 Task 생성.
    - loop 키워드 인수에 이벤트 루프를 전달할 수 있다.
        - 생략하면 async()가 asyncio.get_event_loop()를 호출해서 루프 객체를 가져온다.
- BaseEventLoop.create_task(coroutine)
    - 코루틴을 실행
    - 스케줄링하고 asyncio.Task 객체 반환.
    - 외부라이브러리(Tornado 등)에서 제공하는 BaseEventLoop의 서브클래스에 호출하면, 외부라이브러리에서 제공하는 Task와 호환되는 클래스 객체가 반환**될 수도 있다.**

> BaseEventLoop.create_task()는 파이썬 3.4.2부터 사용.

asyncio.async()를 이용하여 코루틴을 자동으로 asyncio.Task 객체 안에 래핑하는 asyncio 함수들이 많다. 대표적인 것은 **BaseEventLoop.run_until_complete()**

파이썬 콘솔이나 간단한 테스트에서 Future 객체나 코루틴을 실험하고 싶다면 다음 코드를 사용하자.

``` python

import asyncio
def run_sync(coroutine_or_future):
    loop = asyncio.get_event_loop()
    return loop.run_until_complete(coroutine_or_future)

a = run_sync(some_coroutine())

```

## 18.2 asyncio와 aiohttp로 내려받기

asyncio: TCP, UDP만 지원

aiohttp: HTTP 지원

***source: flags.py***

```python

POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()

BASE_URL = 'http://flupy.org/data/flags'

DEST_DIR = 'downloads/'


def save_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)


def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = requests.get(url)
    return resp.content


def show(text):
    print(text, end=' ')
    sys.stdout.flush()


def download_many(cc_list):
    for cc in sorted(cc_list):
        image = get_flag(cc)
        show(cc)
        save_flag(image, cc.lower() + '.gif')

    return len(cc_list)


def main(download_many):
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))


if __name__ == '__main__':
    main(download_many)
```

In [3]:
import asyncio

import aiohttp

from flags import BASE_URL, save_flag, show, main


@asyncio.coroutine
def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = yield from aiohttp.request('GET', url)
    image = yield from resp.read()
    return image


@asyncio.coroutine
def download_one(cc):
    image = yield from get_flag(cc)  # 순차 버전과 차이점.
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc


def download_many(cc_list):
    loop = asyncio.get_event_loop()  # 하위 이벤트 루프 구현에 대한 참조를 가져온다.
#     loop = asyncio.new_event_loop() # 추가
    to_do = [download_one(cc) for cc in sorted(cc_list)]  # 제너레이터 객체 리스트 생성
    wait_coroutine = asyncio.wait(to_do)  # 블로킹 함수가 아니다. 코루틴으로서 자신에게 전달된 코루틴들이 모두 완료되면 완료된다.
    res, _ = loop.run_until_complete(wait_coroutine)  # wait_coroutine가 완료될 때까지 이벤트 루프 실행. 이벤트 루프가 실행되는 동안 블로킹.
    loop.close()

    return len(res)

# if __name__ == '__main__':
#     main(download_many)

coroutine asyncio.wait(futures, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
- Future, 코루틴의 반복형을 받는다.
- wait()는 각 코루틴을 Task 안에 래핑.
- wait()가 관리하는 객체는 모두 Future 객체가 된다.
- return 코루틴/제너레이터 객체
- return된 제너레이터 객체를 구동하기 위해 loop.run_until_complete()에 전달.
- wait는 완료되지 않았더라도 반환하게 만드는 timeout과 return_when 키워드가 있다.

AbstractEventLoop.run_until_complete(future)
- wait와 비슷하게 코루틴을 Task 안에 래핑.
- 코루틴, Future, Task 모두 yield from으로 구동.
- wait_coroutine 실행이 완료되면 return (<실행이 완료된 Future 집합>, <실행이 완료되지 않음 Future 집합>)

asyncio에 대해 알아야 할 새로운 개념은 많지만 쉽게 접근하면 *yield from 키워드가 없는 것처럼 생각* 하면 된다.

``` python
@asyncio.coroutine
def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = yield from aiohttp.request('GET', url)
    image = yield from resp.read()
    return image
```

블로킹되지 않는 점만 제외하면 다음 코드와 동작이 같다.

``` python
def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = aiohttp.request('GET', url)
    image = resp.read()
    return image
```

16.7 'yield from 사용하기'에서 yield from의 사용법 두 가지 사실
- yield from으로 연결된 전체 코루틴 체인은 궁극적으로 가장 바깥쪽에 있는 대표 제너레이터의 next()나 send()를 명시적 혹은 암묵적으로 호출하는 비코루틴 호출자에 의해 구동된다.
- 이 체인 가장 안쪽에 있는 하위 제너레이터는 단지 yield를 단순 제너레이터이거나 반복형 객체여야 한다.

asyncio API도 유효하며 특징은 다음과 같다.
- 대표 제너레이터는 loop.run_until_complete() 등의 asyncio API에 전달함으로써 구동된다. (asyncio 이벤트 루프가 처리)
- 코루틴 체인은 yield from을 호출하면서 끝내야 한다. 즉, 가장 안쪽의 하위제너레이터는 우리가 만든 코드가 아니라, 실제로 입출력을 수행하는 라이브러리 함수여야 한다.

asyncio를 사용할 때 우리는 코루틴 체인을 만든다.

가장 바깥쪽 대표 제너레이터는 asyncio(run_until_complete) 자체에 의해 구동.

가장 안쪽에 있는 하위 제너레이터는 asyncio 라이브러리(aiohttp...)가 제공하는 코루틴에 위임.

## 18.3 블로킹 호출을 에둘러 실행하기

_단일 쓰레드인데 왜 asyncio가 빠르지..?_

블로킹 함수가 전체 애플리케이션을 종료하지 않는 방법
- 블로킹 연산을 각기 쓰레드에서 처리
- 모든 블로킹 연산을 논블로킹 비동기 연산으로 바꾼다.

이벤트 루프가 응답을 받으면, 우리 코드를 다시 호출 한다.

실수만 하지 않는다면 **스레드는 절대로 블로킹되지 않는다.**

그래서 빠르다.

그리고 **스레드보다 코루틴이 메모리 부하가 적다.**

## 18.4 asyncio 내려받기 스크립트 개선

## 18.4.1 asyncio.as_completed() 사용하기

*** source: flags2_common
```python
import os
import time
import sys
import string
import argparse
from collections import namedtuple
from enum import Enum


Result = namedtuple('Result', 'status data')

HTTPStatus = Enum('Status', 'ok not_found error')

POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()

DEFAULT_CONCUR_REQ = 1
MAX_CONCUR_REQ = 1

SERVERS = {
    'REMOTE': 'http://flupy.org/data/flags',
    'LOCAL':  'http://localhost:8001/flags',
    'DELAY':  'http://localhost:8002/flags',
    'ERROR':  'http://localhost:8003/flags',
}
DEFAULT_SERVER = 'LOCAL'

DEST_DIR = 'downloads/'
COUNTRY_CODES_FILE = 'country_codes.txt'


def save_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)


def initial_report(cc_list, actual_req, server_label):
    if len(cc_list) <= 10:
        cc_msg = ', '.join(cc_list)
    else:
        cc_msg = 'from {} to {}'.format(cc_list[0], cc_list[-1])
    print('{} site: {}'.format(server_label, SERVERS[server_label]))
    msg = 'Searching for {} flag{}: {}'
    plural = 's' if len(cc_list) != 1 else ''
    print(msg.format(len(cc_list), plural, cc_msg))
    plural = 's' if actual_req != 1 else ''
    msg = '{} concurrent connection{} will be used.'
    print(msg.format(actual_req, plural))


def final_report(cc_list, counter, start_time):
    elapsed = time.time() - start_time
    print('-' * 20)
    msg = '{} flag{} downloaded.'
    plural = 's' if counter[HTTPStatus.ok] != 1 else ''
    print(msg.format(counter[HTTPStatus.ok], plural))
    if counter[HTTPStatus.not_found]:
        print(counter[HTTPStatus.not_found], 'not found.')
    if counter[HTTPStatus.error]:
        plural = 's' if counter[HTTPStatus.error] != 1 else ''
        print('{} error{}.'.format(counter[HTTPStatus.error], plural))
    print('Elapsed time: {:.2f}s'.format(elapsed))


def expand_cc_args(every_cc, all_cc, cc_args, limit):
    codes = set()
    A_Z = string.ascii_uppercase
    if every_cc:
        codes.update(a+b for a in A_Z for b in A_Z)
    elif all_cc:
        with open(COUNTRY_CODES_FILE) as fp:
            text = fp.read()
        codes.update(text.split())
    else:
        for cc in (c.upper() for c in cc_args):
            if len(cc) == 1 and cc in A_Z:
                codes.update(cc+c for c in A_Z)
            elif len(cc) == 2 and all(c in A_Z for c in cc):
                codes.add(cc)
            else:
                msg = 'each CC argument must be A to Z or AA to ZZ.'
                raise ValueError('*** Usage error: '+msg)
    return sorted(codes)[:limit]


def process_args(default_concur_req):
    server_options = ', '.join(sorted(SERVERS))
    parser = argparse.ArgumentParser(
                description='Download flags for country codes. '
                'Default: top 20 countries by population.')
    parser.add_argument('cc', metavar='CC', nargs='*',
                help='country code or 1st letter (eg. B for BA...BZ)')
    parser.add_argument('-a', '--all', action='store_true',
                help='get all available flags (AD to ZW)')
    parser.add_argument('-e', '--every', action='store_true',
                help='get flags for every possible code (AA...ZZ)')
    parser.add_argument('-l', '--limit', metavar='N', type=int,
                help='limit to N first codes', default=sys.maxsize)
    parser.add_argument('-m', '--max_req', metavar='CONCURRENT', type=int,
                default=default_concur_req,
                help='maximum concurrent requests (default={})'
                      .format(default_concur_req))
    parser.add_argument('-s', '--server', metavar='LABEL',
                default=DEFAULT_SERVER,
                help='Server to hit; one of {} (default={})'
                      .format(server_options, DEFAULT_SERVER))
    parser.add_argument('-v', '--verbose', action='store_true',
                help='output detailed progress info')
    args = parser.parse_args()
    if args.max_req < 1:
        print('*** Usage error: --max_req CONCURRENT must be >= 1')
        parser.print_usage()
        sys.exit(1)
    if args.limit < 1:
        print('*** Usage error: --limit N must be >= 1')
        parser.print_usage()
        sys.exit(1)
    args.server = args.server.upper()
    if args.server not in SERVERS:
        print('*** Usage error: --server LABEL must be one of',
              server_options)
        parser.print_usage()
        sys.exit(1)
    try:
        cc_list = expand_cc_args(args.every, args.all, args.cc, args.limit)
    except ValueError as exc:
        print(exc.args[0])
        parser.print_usage()
        sys.exit(1)

    if not cc_list:
        cc_list = sorted(POP20_CC)
    return args, cc_list


def main(download_many, default_concur_req, max_concur_req):
    args, cc_list = process_args(default_concur_req)
    actual_req = min(args.max_req, max_concur_req, len(cc_list))
    initial_report(cc_list, actual_req, args.server)
    base_url = SERVERS[args.server]
    t0 = time.time()
    counter = download_many(cc_list, base_url, args.verbose, actual_req)
    assert sum(counter.values()) == len(cc_list), \
        'some downloads are unaccounted for'
    final_report(cc_list, counter, t0)
```

In [7]:
import asyncio
import collections
import contextlib

import aiohttp
from aiohttp import web
import tqdm

from flags2_common import main, HTTPStatus, Result, save_flag

# 503 방지를 위해 기본값을 낮게 설정.
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000


class FetchError(Exception):  # 예외 클래스 생성
    def __init__(self, country_code):
        self.country_code = country_code


@asyncio.coroutine
def get_flag(base_url, cc): # return byte, 에러 404, HttpProcessingError
    url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
    resp = yield from aiohttp.request('GET', url)
    with contextlib.closing(resp):
        if resp.status == 200:
            image = yield from resp.read()
            return image
        elif resp.status == 404:
            raise web.HTTPNotFound()
        else:
            raise aiohttp.HttpProcessingError(
                code=resp.status, message=resp.reason,
                headers=resp.headers)


@asyncio.coroutine
def download_one(cc, base_url, semaphore, verbose):  # semaphore는 동시 요청 수 제한을 위한 동기화 장치
    try:
        with (yield from semaphore):  # semaphore 최대치 초과시 코루틴이 브로킹이 된다. (콘텍스트 관리자)
            image = yield from get_flag(base_url, cc)  # with를 빠져나올 때, semaphore가 +1이 되고 다른 코루틴이 동작함.
    except web.HTTPNotFound:
        status = HTTPStatus.not_found
        msg = 'not found'
    except Exception as exc:
        raise FetchError(cc) from exc  # 생성한 예외 클래스 적용
    else:
        save_flag(image, cc.lower() + '.gif')
        status = HTTPStatus.ok
        msg = 'OK'

    if verbose and msg:
        print(cc, msg)

    return Result(status, cc)


@asyncio.coroutine
def downloader_coro(cc_list, base_url, verbose, concur_req):  # 코루틴 함수라 main에서 바로 호출이 불가능하다.
    counter = collections.Counter()
    semaphore = asyncio.Semaphore(concur_req)  # 세마포어를 설정한다. 제한.
    to_do = [download_one(cc, base_url, semaphore, verbose)
             for cc in sorted(cc_list)]  # 코루틴을 호출하는 객체 리스트 생성

    to_do_iter = asyncio.as_completed(to_do)  # 실행이 완료된 Future 객체 반복자를 가져온다.
    if not verbose:
        to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list))  # tqdm 설정
    for future in to_do_iter:
        try:
            res = yield from future  # asyncio의 result()는 yield from
        except FetchError as exc:
            country_code = exc.country_code  # 에러 발생 코드
            try:
                error_msg = exc.__cause__.args[0]  # 원래 예외에서 오류 메세지 가져오기
            except IndexError:
                error_msg = exc.__cause__.__class__.__name__  # 메세지가 없으면 예외 클래스명 가져오기
            if verbose and error_msg:
                msg = '*** Error for {}: {}'
                print(msg.format(country_code, error_msg))
            status = HTTPStatus.error
        else:
            status = res.status

        counter[status] += 1

    return counter


def download_many(cc_list, base_url, verbose, concur_req):
    loop = asyncio.get_event_loop()
    coro = downloader_coro(cc_list, base_url, verbose, concur_req)
    counts = loop.run_until_complete(coro) # 이벤트 루프 전달
    loop.close()

    return counts


#if __name__ == '__main__':
#    main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)


asyncio에서는 Semaphore를 이용해서 제한한다.

**asyncio.Semaphore(concur_req) = futures.ThreadPoolExecutor(max_workers)**

**concur_req = max_workers**

세마포어(Semaphore)는 내부 카운터를 가지고 있는 객체로 
- acquire() 코루틴 메소드를 호출할 때 마다 감소
- release() 코루틴 메소드 호출 마다 증가
- 초깃값은 객체 생성시 설정 ---> asyncio.Semaphore(concur_req)

카운터가 0보다 클때는 acquire()가 호출되도 블로킹 되지 않으나, 0일 때는 acquire()가 호출되면 블로킹된다. 다른곳에서 release()를 호출해서 카운터를 증가시킬 때 블로킹이 풀린다.

``` python
with (yield from semaphore):  
    image = yield from get_flag(base_url, cc)
```

다음은 concur_req보다 작거나 같은 수의 get_flag 코루틴이 실행되도록 보장한다.

**_에러가 발생한 경우_ asyncio.as_completed() Future 객체는 호출할 때 전달한 Future 객체와 동일하다고 장담할 수 없다.**

그래서 17장의 code mapping 방법이 아닌 위의 FetchError와 같이 구현했다.

그러나 에러가 없는 경우 국가 코드는 가져올 수 있다.

- 에러 발생: 객체가 다를 수 있다.
- 에러 없음: 객체는 동일하다.

## 18.4.2 Executor를 이용해서 이벤트 루프 블로킹 피하기

save_flag()가 블로킹 함수.

내부적으로는 블로킹하는 입출력 함수들은 GIL을 해제하므로, 다른 스레드가 진행할 수 있다.
그러나, flags2_asyncio.py에서 save_flag()는 asyncio 이벤트 루프와 공유하는 유일한 스레드를 블로킹하므로, 파일을 저장하는 동안 애플리케이션이 멈춘다.

이 문제의 해결책은 이벤트 루프 객체의 **run_in_executor()** 메서드다.

asycnio 이벤트 루프는 내부에 스레드 풀 실행자를 가지고 있다.

In [5]:
@asyncio.coroutine
def download_one(cc, base_url, semaphore, verbose):
    try:
        with (yield from semaphore):
            image = yield from get_flag(base_url, cc)
    except web.HTTPNotFound:
        status = HTTPStatus.not_found
        msg = 'not found'
    except Exception as exc:
        raise FetchError(cc) from exc
    else:
        loop = asyncio.get_event_loop()
        loop.run_in_executor(None,  # 실행자 객체. 이벤트 루프의 기본 스레드 풀 실행자를 사용할 때는 None을 지정.
                save_flag, image, cc.lower() + '.gif')  # 콜러블 및 콜러블 인수.
        status = HTTPStatus.ok
        msg = 'OK'

    if verbose and msg:
        print(cc, msg)

    return Result(status, cc)

> 파일 평균 값이 클수록 효과는 잘 드러난다.

## 18.5 콜백에서 Future와 코루틴으로

어떻게 코루틴이 콜백 함수를 향상시키지?

콜백 지옥: 콜백안에 콜백 함수가 있는경우. (어떤 연산이 어떤 연산에 종속적일 때)

```python
def stage1(response1):
    request2 = step1(response1)
    api_call2(request2, stage2)
    
def stage2(response2):
    request3 = step2(response2)
    api_call3(request3, stage3)
    
def stage3(response3):
    step3(response3)

    
api_call1(request1, stage1)
```

결과가 나오면 코루틴의 send() 메서드를 이용해서 활성화 시키면 된다.

```python
@asyncio.coroutine
def three_stages(request1):
    response1 = yield from api_call1(request1)
    
    request2 = step1(response1)
    response2 = yield from api_call2(request2)
    
    request3 = step2(response2)
    response3 = yield from api_call3(request3)
    
    step3(response3)
    
loop.create_task(three_stages(request1))
```



장점
- 에러상황에 대처하기 쉽다. (콜백 기반인 경우, 에러상황과 성공한 상황 둘다 처리해야한다. 에러 처리시 콜백 지옥에 가까워 진다...)
- 한눈에 보여 이해하기 쉽다.

단점
- 코루틴에 친해져야 한다.
- 일반함수는 사용할 수 없다. 반드시 코루틴이어야 한다.

## 18.5.1 한번 내려받을 때 여러 가지 요청하기

In [10]:
@asyncio.coroutine
def http_get(url):
    res = yield from aiohttp.request('GET', url)
    if res.status == 200:
        ctype = res.headers.get('Content-type', '').lower()
        if 'json' in ctype or url.endswith('json'):
            data = yield from res.json()  # json이라면 json
        else:
            data = yield from res.read()  # json이 아니라면 byte
        return data

    elif res.status == 404:
        raise web.HTTPNotFound()
    else:
        raise aiohttp.errors.HttpProcessingError(
            code=res.status, message=res.reason,
            headers=res.headers)


@asyncio.coroutine
def get_country(base_url, cc):
    url = '{}/{cc}/metadata.json'.format(base_url, cc=cc.lower())
    metadata = yield from http_get(url)  # json으로 만들어진 metadata를 받는다.
    return metadata['country']


@asyncio.coroutine
def get_flag(base_url, cc):
    url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
    return (yield from http_get(url)) # 키워드가 나란히 오면 에러가 난다. 그래서 () 처리


@asyncio.coroutine
def download_one(cc, base_url, semaphore, verbose):
    try:
        with (yield from semaphore): # 가능한 빨리 가져오기 위해 2개의 with 블록에서 따로 호출한다.
            image = yield from get_flag(base_url, cc)
        with (yield from semaphore):
            country = yield from get_country(base_url, cc)
    except web.HTTPNotFound:
        status = HTTPStatus.not_found
        msg = 'not found'
    except Exception as exc:
        raise FetchError(cc) from exc
    else:
        country = country.replace(' ', '_')
        filename = '{}-{}.gif'.format(country, cc)
        loop = asyncio.get_event_loop()
        loop.run_in_executor(None, save_flag, image, filename)
        status = HTTPStatus.ok
        msg = 'OK'

    if verbose and msg:
        print(cc, msg)

    return Result(status, cc)

## 18.7 요약

- 비동기식 시스템은 사용자가 스레드 관리 필요성을 없앰으로 멀티스레드 시스템보다 더 많은 동시성 연결을 관리할 수 있다.
- 코루틴을 이용하여 콜백이 가진 에러 처리를 간단히 할 수 있다.
- yield from을 빼고 보면 일반적인 순차 코드처럼 보인다.