bw61 에서 블로킹 I/O와 스레드를 사용하는 TCP 서버를 asyncio와 코루틴을 사용하는 코드로 포팅했음

스레드 기반 구현으로부터 asyncio와 코루틴 기반으로 변경하는 방법
- 큰 프로그램 전체를 변경하려면 코드 베이스를 점진적으로 마이그레이션하면서 필요에 따라 테스트를 함께 갱신하면서 모든 기능이 제대로 동작하는지 확인해야함


## 스레드 기반 구현으로부터 asyncio와 코루틴 기반으로 변경하는 방법
### 구체적 단계
1. 최상위 함수가 def대신 async def를 사용하게 변경
2. 최상위 함수가 I/O를 호출하는 모든 부분을 asyncio.run_in_executor로 감싸라 (I/O호출하는 부분은 이벤트 루프가 블록될 가능성이 있음)
3. run_in_executor 호출이 사용하는 자원이나 콜백이 제대로 동기화 됐는지 확인하라 ( Lock이나 asyncio.run_coroutine_threadsafe 함수를 사용)
4. 호출 계층의 앞쪽으로 가면서 중간에 있는 함수와 메서드를 코루틴으로 변환하며 get_event_loop와 run_in_executor 호출을 없애려고 시도하라.

In [None]:
# run_thread 함수에 1~3단계를 적용

from threading import Lock, Thread

def run_threads(handles, interval, output_path):
    with open(output_path, 'wb') as output:
        lock = Lock()
        def write(data):
            with lock:
                output.write(data)

        threads = []
        for handle in handles:
            args = (handle, interval, write)
            thread = Thread(target=tail_file, args=args)
            thread.start()
            threads.append(thread)

        for thread in threads:
            thread.join()



import asyncio

async def run_tasks_mixed(handles, interval, output_path):
    loop = asyncio.get_event_loop()

    with open(output_path, 'wb') as output:
        async def write_async(data):
            output.write(data)

        def write(data):
            coro = write_async(data)
            future = asyncio.run_coroutine_threadsafe(
                coro, loop)
            future.result()

        tasks = []
        for handle in handles:
            task = loop.run_in_executor(
                None, tail_file, handle, interval, write)
            tasks.append(task)

        await asyncio.gather(*tasks)


       

- run_in_executor 메서드는 이벤트 루프가 특정 ThreadPoolExecutor나 디폴트 실행기 인스턴스(첫번째 인자 None인 경우)를 사용해 주어진 함수((여기서는 tail_file))를 실행하게 만든다
- run_in_executor 함수를 그에 대응하는 await 식 없이 여러번 호출함으로써 run_tasks_mixed 코루틴은 각 입력 파일마다 파일을 한 줄씩 처리하는 작업을 팬아웃 시킨다.
- 그후  asyncio.gather 함수와 await 식을 사용해 tail_file이 모두 종료되도록 팬인 시킨다.
- 이 코드는 asyncio.run_coroutine_threadsafe 를 사용하기 때문에 Lock 인스턴스와 witer 도우미 함수를 사용할 필요가 없다.
- asyncio.run_coroutine_threadsafe 함수를 사용하면 일반적인 작업자 스레드가 코루틴(여기서는 write_async)을 호출해 주 스레드에서 실행되는 이벤트 루프를 통해 실해하도록 만든다
- 따라서 스레드 간 동기화가 이뤄지는 효과가 있고 출력 파일에 기록하는 작업이 모두 이벤트 루프에 의해 주 스레드에서 이뤄지도록 보장한다.
- asyncio.gather 대기가 끝나면 출력 파일에 대한 데이터 기록도 끝났다고 가정할 수 있으므로 경합 조건이 일어나는 것을 걱정할 필요 없이 with 문을 통해 출력 파일을 close 할 수 있다.
  


In [None]:

input_paths = ...
handles = ...
output_path = ...

tmpdir, input_paths, handles, output_path = setup()

asyncio.run(run_tasks_mixed(handles, 0.1, output_path))

confirm_merge(input_paths, output_path)

In [None]:
import asyncio

async def run_tasks_mixed(handles, interval, output_path):
    loop = asyncio.get_event_loop()

    with open(output_path, 'wb') as output:
        async def write_async(data):
            output.write(data)

        def write(data):
            coro = write_async(data)
            future = asyncio.run_coroutine_threadsafe(
                coro, loop)
            future.result()

        tasks = []
        for handle in handles:
            task = loop.run_in_executor(
                None, tail_file, handle, interval, write)
            tasks.append(task)

        await asyncio.gather(*tasks)


# 4단계 적용.   get_event_loop와 run_in_executor 호출을 없애려고 시도

async def tail_async(handle, interval, write_func):
    loop = asyncio.get_event_loop()

    while not handle.closed:
        try:
            line = await loop.run_in_executor(
                None, readline, handle)
        except NoNewData:
            await asyncio.sleep(interval)
        else:
            await write_func(line)


# 예제 9
async def run_tasks(handles, interval, output_path):
    with open(output_path, 'wb') as output:
        async def write_async(data):
            output.write(data)

        tasks = []
        for handle in handles:
            coro = tail_async(handle, interval, write_async)
            task = asyncio.create_task(coro)
            tasks.append(task)

        await asyncio.gather(*tasks)


### run_tasks_mixed 함수에 4단계 적용
- 새로운 tail_async 구현을 사용하면 get_event_loop와 run_in_executor를 run_tasks_mixed 함수에서 완전히 제거해 호출 계측 한 단계 아래로 내려보낼 수 있다
- 그러면 깔끄하고 훨씬 쫓아가기 쉬운 코드만 남는다.

- 이런식으로 반복적인 리팩터링 패턴을 계속 진행하면서 readline 함수를 비동기 코루틴으로 변경할 수 있다.
- 하지만 이 함수는 너무 많은 블록킹 파일 I/O연산을 사용하므로 성능 저하와 코드 명황성 저하를 가져온다는 점에서 asyncio 로 포팅하기에 적합하지 않아 보인다. 

- 상향식 접근법도 하향식과 똑같은 형태로 끝남


In [None]:
### 결론
- asyncio 이벤트 루프의 run_in_executor를 메서드(await를 사용해 완료를 기다릴 수 있음)를 사용하면 코루틴이 ThreadPoolExecutor 스레드 풀을 사용해 동기적인 함수를 호출 할 수 있다. 이 기능을 사용하면 하향식으로  asyncio 마이그레이션 할 수 있다.

In [None]:
# 코드 전문

import random
random.seed(1234)

import logging
from pprint import pprint
from sys import stdout as STDOUT

# 모든 출력을 임시 디렉터리에 기록함
import atexit
import gc
import io
import os
import tempfile

TEST_DIR = tempfile.TemporaryDirectory()
atexit.register(TEST_DIR.cleanup)

# 윈도우에서 프로세스 깔끔하게 종료하기
OLD_CWD = os.getcwd()
atexit.register(lambda: os.chdir(OLD_CWD))
os.chdir(TEST_DIR.name)

def close_open_files():
    everything = gc.get_objects()
    for obj in everything:
        if isinstance(obj, io.IOBase):
            obj.close()

atexit.register(close_open_files)


# 예제 1
class NoNewData(Exception):
    pass

def readline(handle):
    offset = handle.tell()
    handle.seek(0, 2)
    length = handle.tell()

    if length == offset:
        raise NoNewData

    handle.seek(offset, 0)
    return handle.readline()


# 예제 2
import time

def tail_file(handle, interval, write_func):
    while not handle.closed:
        try:
            line = readline(handle)
        except NoNewData:
            time.sleep(interval)
        else:
            write_func(line)


# 예제 3
from threading import Lock, Thread

def run_threads(handles, interval, output_path):
    with open(output_path, 'wb') as output:
        lock = Lock()
        def write(data):
            with lock:
                output.write(data)

        threads = []
        for handle in handles:
            args = (handle, interval, write)
            thread = Thread(target=tail_file, args=args)
            thread.start()
            threads.append(thread)

        for thread in threads:
            thread.join()


# 예제 4
# 핸들에 쓰는 프로세스를 에뮬레이션하는 코드
import collections
import os
import random
import string
from tempfile import TemporaryDirectory

def write_random_data(path, write_count, interval):
    with open(path, 'wb') as f:
        for i in range(write_count):
            time.sleep(random.random() * interval)
            letters = random.choices(
                string.ascii_lowercase, k=10)
            data = f'{path}-{i:02}-{"".join(letters)}\n'
            f.write(data.encode())
            f.flush()

def start_write_threads(directory, file_count):
    paths = []
    for i in range(file_count):
        path = os.path.join(directory, str(i))
        with open(path, 'w'):
            # 읽기 스레드가 파일을 폴링하기 전에 경로상 파일이 존재하는지 확실히 하자
            pass
        paths.append(path)
        args = (path, 10, 0.1)
        thread = Thread(target=write_random_data, args=args)
        thread.start()
    return paths

def close_all(handles):
    time.sleep(1)
    for handle in handles:
        handle.close()

def setup():
    tmpdir = TemporaryDirectory()
    input_paths = start_write_threads(tmpdir.name, 5)

    handles = []
    for path in input_paths:
        handle = open(path, 'rb')
        handles.append(handle)

    Thread(target=close_all, args=(handles,)).start()

    output_path = os.path.join(tmpdir.name, 'merged')
    return tmpdir, input_paths, handles, output_path


# 예제 5
def confirm_merge(input_paths, output_path):
    found = collections.defaultdict(list)
    with open(output_path, 'rb') as f:
        for line in f:
            for path in input_paths:
                if line.find(path.encode()) == 0:
                    found[path].append(line)

    expected = collections.defaultdict(list)
    for path in input_paths:
        with open(path, 'rb') as f:
            expected[path].extend(f.readlines())

    for key, expected_lines in expected.items():
        found_lines = found[key]
        assert expected_lines == found_lines, \
            f'{expected_lines!r} == {found_lines!r}'

input_paths = ...
handles = ...
output_path = ...

tmpdir, input_paths, handles, output_path = setup()

run_threads(handles, 0.1, output_path)

confirm_merge(input_paths, output_path)

tmpdir.cleanup()


# 예제 6
import asyncio

# 윈도우에서는 ProactorEventLoop가 시그널 핸들러를 등록하려고 시도하기 때문에
# 스레드 안에서 ProactorEventLoop를 만들 수 없다.
#
# 대신 SelectEventLoop 정책을 사용하는 방식으로 우회한다.
# 참고: https://bugs.python.org/issue33792
policy = asyncio.get_event_loop_policy()
policy._loop_factory = asyncio.SelectorEventLoop

async def run_tasks_mixed(handles, interval, output_path):
    loop = asyncio.get_event_loop()

    with open(output_path, 'wb') as output:
        async def write_async(data):
            output.write(data)

        def write(data):
            coro = write_async(data)
            future = asyncio.run_coroutine_threadsafe(
                coro, loop)
            future.result()

        tasks = []
        for handle in handles:
            task = loop.run_in_executor(
                None, tail_file, handle, interval, write)
            tasks.append(task)

        await asyncio.gather(*tasks)


# 예제 7
input_paths = ...
handles = ...
output_path = ...

tmpdir, input_paths, handles, output_path = setup()

asyncio.run(run_tasks_mixed(handles, 0.1, output_path))

confirm_merge(input_paths, output_path)

tmpdir.cleanup()


# 예제 8
async def tail_async(handle, interval, write_func):
    loop = asyncio.get_event_loop()

    while not handle.closed:
        try:
            line = await loop.run_in_executor(
                None, readline, handle)
        except NoNewData:
            await asyncio.sleep(interval)
        else:
            await write_func(line)


# 예제 9
async def run_tasks(handles, interval, output_path):
    with open(output_path, 'wb') as output:
        async def write_async(data):
            output.write(data)

        tasks = []
        for handle in handles:
            coro = tail_async(handle, interval, write_async)
            task = asyncio.create_task(coro)
            tasks.append(task)

        await asyncio.gather(*tasks)


# 예제 10
input_paths = ...
handles = ...
output_path = ...

tmpdir, input_paths, handles, output_path = setup()

asyncio.run(run_tasks(handles, 0.1, output_path))

confirm_merge(input_paths, output_path)

tmpdir.cleanup()


# 예제 11
def tail_file(handle, interval, write_func):
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    async def write_async(data):
        write_func(data)

    # 맨 마지막에 한번 더 이벤트 루프를 실행해서
    # 다른 이벤트 루프가 stop()에 await하는 경우를 해결한다.
    coro = tail_async(handle, interval, write_async)
    loop.run_until_complete(coro)


# 예제 12
input_paths = ...
handles = ...
output_path = ...

tmpdir, input_paths, handles, output_path = setup()

run_threads(handles, 0.1, output_path)

confirm_merge(input_paths, output_path)

tmpdir.cleanup()