I/O-Bound: Threading, Multiprocessing, AsyncIO

In [19]:
import requests, time
import concurrent.futures
import threading


# 각 스레드에서 생성되는 객체, 이것도 하나의 coding pattern이다.

thread_local = threading.local()

# 전역에 같은 이름으로 선언했지만 각 스레드마다 독립적인 메모리를 할당 받을 수 있도록 한다.
# 독립된 namespace를 사용한다.

def get_session():
    print(thread_local.__dict__) 
    if not hasattr(thread_local, "session"):
        thread_local.session = requests.Session()
        # dictionary 형태

    return thread_local.session
    # max_worker의 값대로 독립된 session을 할당 받는다.

# 실행함수1 다운로드
def request_site(url):

    # 세션 획득
    # 이전처럼 매개변수로 session을 받아오면 thread가 공유해서 문제가 생길 수 있다.
    session = get_session()

    with session.get(url) as response:
        print(f'[Read contents: {len(response.content)}, Status Code: {response.status_code}] from {url}')


# 실행함수2 요청
def request_all_sites(urls):

    # 멀티스레드 실행
    # 반스시 max_worker 개수 조절 후 session 객체 확인
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        executor.map(request_site, urls)





# sync blocking IO, 순서대로 방문한다.
def main():
    # test URLs
    urls = [
        'https://www.jython.org',
        'https://olympus.realpython.org/dice',
        'https://realpython.com',
    ] *3

    # 실행시간 측정
    start_time = time.time()

    # 실행
    request_all_sites(urls)

    # 실행 시간 종료
    duration = time.time() - start_time

    # 결과 출력
    print()
    print(f'Downloaded {len(urls)} sites in {duration} seconds')

각 스레드별로 namespace가 다르게 나오기 때문에 {}로 표시된다. GIL 있지만 IO 작업이 무겁지만 않으면 효과적이다.

In [20]:
if __name__ == '__main__':
    main()

{}
{}
{}{}

[Read contents: 10782, Status Code: 200] from https://www.jython.org
{'session': <requests.sessions.Session object at 0x00000148CE4D62B0>}
[Read contents: 10782, Status Code: 200] from https://www.jython.org
{'session': <requests.sessions.Session object at 0x00000148CDB6BF40>}
{'session': <requests.sessions.Session object at 0x00000148CE4D6040>}
[Read contents: 10782, Status Code: 200] from https://www.jython.org
{'session': <requests.sessions.Session object at 0x00000148CE4D6040>}
{'session': <requests.sessions.Session object at 0x00000148CE4D62B0>}
[Read contents: 56718, Status Code: 200] from https://realpython.com
[Read contents: 56718, Status Code: 200] from https://realpython.com
[Read contents: 56716, Status Code: 200] from https://realpython.com

Downloaded 9 sites in 2.155684471130371 seconds


In [21]:
import multiprocessing

# 각 프로세스 메모리 영역에 생성되는 객체는 독럽적이다.
# 그래서 함수 실행마다 객체를 생성하지 않는 것이 좋다. 각 프로세스마다 할당한다.

session = None

def set_global_session():
    global session 
    # global이 아니면 그때그때 객체가 만들어진다.
    # 각 process마다 다른 객체가 생성된다.

    #print(session)
    if not session:
        session = requests.Session()


# 실행함수1 다운로드
def request_site(url):

    print(f'what is session? {session} \n')

    with session.get(url) as response:
        name = multiprocessing.current_process().name
        print(f'[{name} -> Read contents: {len(response.content)}, Status Code: {response.status_code}] from {url}')


# 실행함수2 요청
def request_all_sites(urls):

    # 멀티프로세싱 실행
    # 반스시 processes 개수 조절 후 session 객체 확인
    with multiprocessing.Pool(initializer = set_global_session, processes=4) as pool:
        pool.map(request_site, urls)





# sync blocking IO, 순서대로 방문한다.
def main():
    # test URLs
    urls = [
        'https://www.jython.org',
        'https://olympus.realpython.org/dice',
        'https://realpython.com',
    ] *3

    # 실행시간 측정
    start_time = time.time()

    # 실행
    request_all_sites(urls)

    # 실행 시간 종료
    duration = time.time() - start_time

    # 결과 출력
    print()
    print(f'Downloaded {len(urls)} sites in {duration} seconds')

In [22]:
if __name__ == '__main__':
    #main()
    pass
# 주피터에서 멀티프로세싱은 안 되더라 방법을 찾자

AsyncIO: 동시 프로그래밍 paradigm으로 변화했다. <br>
싱글 코어 -> 속도 향상에 미미해졌다. -> 비동기 프로그래밍이 필요해짐 -> CPU연산, DB연동, API 호출 등 대기 시간이 늘어났다. <br>
그래서 non-blocking과 async가 필요하다. <br>
파이썬 3.4에서 asyncio가 표준 라이브러리로 등장했다.

In [23]:
import asyncio

# def만으로 함수를 선언하면 동기함수이다.
# async def function() 이렇게 선언하면 비동기 함수이며
# 비동기 함수 안에서 비동기함수를 실행하면 그 앞에 반드시 await를 써야 한다.
# 안 쓰면 동기처리가 되거나 예외가 발생한다.


# async가 붙은 함수는 coroutine이 반환된다. promise object가 반환된다는 것이다.

def exe_calculate_sync(name, n):
    
    for i in range(1, n+1):
        print(f'{name} -> {i} of {n} is calculating...')
        time.sleep(1)

    print(f'{name} - {n} works are done!')

def process_sync():
    
    start = time.time()
    
    exe_calculate_sync('One', 3)
    exe_calculate_sync('Two', 2)
    exe_calculate_sync('Three', 1)

    end = time.time()

    print(f'>>> total seconds: {end - start}')








In [24]:
if __name__ == '__main__':
    # sync 실행
    process_sync()

One -> 1 of 3 is calculating...
One -> 2 of 3 is calculating...
One -> 3 of 3 is calculating...
One - 3 works are done!
Two -> 1 of 2 is calculating...
Two -> 2 of 2 is calculating...
Two - 2 works are done!
Three -> 1 of 1 is calculating...
Three - 1 works are done!
>>> total seconds: 6.0451061725616455


In [25]:
import asyncio, time

async def exe_calculate_async(name, n):
    
    for i in range(1, n+1):
        print(f'{name} -> {i} of {n} is calculating...')
        #time.sleep(1)  #async가 없는 동기함수이다.
        
        await asyncio.sleep(1)

    print(f'{name} - {n} works are done!')


async def process_async():
    
    start = time.time()
    
    # 여러개일 때는 묶을 수 있고, 한 개라면 await function 이렇게 써도 된다.
    await asyncio.wait([
        exe_calculate_async('One', 3),
        exe_calculate_async('Two', 2),
        exe_calculate_async('Three', 1),
        ])

    # 만약 비동기 함수 내부에서 동기함수를 사용하면
    # RuntimeError: asyncio.run() cannot be called from a running event loop 예외 발생

    end = time.time()

    print(f'>>> total seconds: {end - start}')

In [26]:
# Async 실행
# 파이선 3.7 이상

if __name__ == '__main__':
    await process_async() # 주피터 노트북에서는 이렇게 실행해야 한다.
    #asyncio.run(process_async()) # 3.7 이상이며 py file 혹은 구버전 IPython에서는 이렇게 해야 한다.
    #asyncio.get_envet_loop().run_until_complete(process_sync) # 3.7 이하

# 실행시간이 6초에서 3초로 줄어들었다.


One -> 1 of 3 is calculating...
Three -> 1 of 1 is calculating...
Two -> 1 of 2 is calculating...
One -> 2 of 3 is calculating...
Three - 1 works are done!
Two -> 2 of 2 is calculating...
One -> 3 of 3 is calculating...
Two - 2 works are done!
One - 3 works are done!
>>> total seconds: 3.0309979915618896


In [27]:
!pip install aiohttp
# requests 대신에 비동기 패키지를 설치한다.





In [36]:
# asyncio가 threading보다 더 생각할 거리가 많다.
# 비동기함수인지 신경써야 하고 기존 패키지가 비동기를 지원하는지도 따져야 한다.

# request 패키지는 동기식으로 처리된다.
import aiohttp






# 실행함수1 다운로드
async def request_site(url, session):

    print(f'what is session? {session} \n')
    
    async with session.get(url) as response:
        print(f"Read Contents {response.content_length}, from {url}.")


# 실행함수2 요청
async def request_all_sites(urls):

    # 비동기에서 with문을 사용하면 async를 붙인다.
    async with aiohttp.ClientSession() as session:

        #작업목록
        tasks = []
        for url in urls:
            task = asyncio.ensure_future(request_site(session, url))
            tasks.append(task)

        # tasks 확인
        #print(*task)
        #print(task)

        await asyncio.gather(*tasks, return_exceptions= True)
        # 모든 사이트에 접속한다는 보장이 없기 때문에 미리 예외 True를 했다.
   


async def main():
    # test URLs
    urls = [
        'https://www.jython.org',
        'https://olympus.realpython.org/dice',
        'https://realpython.com',
    ] *3

    # 실행시간 측정
    start_time = time.time()

    # run이 있는 영역은 비동기를 시작하는 곳이므로 main함수 앞에 async를 붙이든 말든 상관없다.
    #asyncio.run(request_all_sites(urls))
    await request_all_sites(urls)
    #asyncio.get_event_loop().run_until_complete(request_all_sites(urls))


    duration = time.time() - start_time

    # 결과 출력
    print()
    print(f'Downloaded {len(urls)} sites in {duration} seconds')

In [37]:
if __name__ == '__main__':
    await main()

what is session? https://www.jython.org 

what is session? https://olympus.realpython.org/dice 

what is session? https://realpython.com 

what is session? https://www.jython.org 

what is session? https://olympus.realpython.org/dice 

what is session? https://realpython.com 

what is session? https://www.jython.org 

what is session? https://olympus.realpython.org/dice 

what is session? https://realpython.com 


Downloaded 9 sites in 0.0030024051666259766 seconds
