## 並行処理いろいろ

In [29]:
"""
シングルスレッドで並行処理を実行する

イベントループ: 
- 全ての上位ループ
- この上でコルーチンが起動する

コルーチン:
- サブルーチンがエントリーからリターンまでを一つの処理単位とするのに対し、コルーチンはいったん処理を中断した後、続きから処理を再開できる。
- 要するに、「途中で中断可能な一連の処理の単位」である。

https://qiita.com/osorezugoing/items/d26921f0affd62b87858
"""

import asyncio

async def task(name):
    print(f'{name}() started')
    await asyncio.sleep(1)
    print(f'{name}() finished')

async def main():
    task1 = asyncio.create_task(task("task1"))  # タスクを作成
    task2 = asyncio.create_task(task("task2"))  # タスクを作成
    task3 = asyncio.create_task(task("task3"))  # タスクを作成
    await task1
    await task2
    await task3

# イベントループを起動し、その上でコルーチンを起動
# asyncio.run(main()) # NOTE: jupyter noteは、asyncioのイベントループ上で動作するので、イベントループを開始できない

# コルーチンを起動
await main()

task1() started
task2() started
task3() started
task1() finished
task2() finished
task3() finished


In [5]:
"""
threading.Threadによるマルチスレッド処理
"""

import threading

def task(name):
    print(f'{name} started')
    x = 0
    for i in range(1000000):
        x += i
    print(f'{name} finished:', x)

def main():
    thread1 = threading.Thread(target=task, args=("task1",))
    thread2 = threading.Thread(target=task, args=("task2",))
    thread1.start()
    thread2.start()
    # NOTE: join()を実行すると、この処理が終了するまで呼び出し元の処理をブロックできる。
    thread1.join()
    thread2.join()
    

main()

task1 started
task2 started
task1 finished: 499999500000
task2 finished: 499999500000


In [28]:
"""
run_in_executorにより、別スレッドで処理を実行する
通常の関数をawaitできるようにする仕組み
"""

import asyncio
import concurrent.futures
import functools
from logging import getLogger
logger = getLogger(__name__)
MACOS = True

def task(name):
    print(f"{name} started ")
    s = sum(i for i in range(10 ** 7))
    print(f"{name} finished ")
    return s

async def main_with_thread_pool():
    loop = asyncio.get_running_loop()
    with concurrent.futures.ThreadPoolExecutor(
        max_workers=5,    
    ) as pool:
        task1 = loop.run_in_executor(pool, functools.partial(task, name="task1"))
        task2 = loop.run_in_executor(pool, functools.partial(task, name="task2"))
        task3 = loop.run_in_executor(pool, functools.partial(task, name="task3"))
        task4 = loop.run_in_executor(pool, functools.partial(task, name="task4"))
        task5 = loop.run_in_executor(pool, functools.partial(task, name="task5"))
        result1 = await task1
        result2 = await task2
        result3 = await task3
        result4 = await task4
        result5 = await task5
        print('result:', result1, result2, result3, result4, result5)

async def main_with_process_pool():
    # NOTE: MACでは以下をjupyter上で動かせない
    # 参考: https://stackoverflow.com/questions/61860800/running-a-processpoolexecutor-in-ipython
    if MACOS is True:
        logger.error("This Code Can't Work On Jupyter Note On Mac OS")
        return
    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        task1 = loop.run_in_executor(pool, functools.partial(task, name="task1"))
        task2 = loop.run_in_executor(pool, functools.partial(task, name="task2"))
        result1 = await task1
        result2 = await task2
        print('result:', result1, result2)


if __name__ == '__main__':# これが必要
    # イベントループを起動し、その上でコルーチンを起動
    # asyncio.run(main())

    # コルーチンを起動
    await main_with_thread_pool()
    # await main_with_process_pool()


task1 started 
task2 started 
task3 started 
task4 started 
task5 started 
task2 finished 
task3 finished 
task5 finished 
task1 finished 
task4 finished 
result: 49999995000000 49999995000000 49999995000000 49999995000000 49999995000000


## ベンチマークしてみる


In [39]:
import time
def aiomeasure(func):
    async def wrapper(*args, **kwargs):
        start_time = time.perf_counter()
        response = await func(*args, **kwargs)
        end_time = time.perf_counter()
        elapsed_time = end_time - start_time
        print(elapsed_time)
        return response
    return wrapper

def measure(func):
    def wrapper(*args, **kwargs):
        start_time = time.perf_counter()
        response = func(*args, **kwargs)
        end_time = time.perf_counter()
        elapsed_time = end_time - start_time
        print(elapsed_time)
        return response
    return wrapper

In [41]:
# asyncio
import asyncio
async def calc(x): # <- この関数は非同期で実行する
    r = 0
    for i in range(10**7): # <- 適当な重い計算
        if random.random() < 0.99:
            r += i % x
    return r
@aiomeasure
async def main():
    r = await asyncio.gather(*[calc(i) for i in range(1, 16)]) # <-　ここで 並列処理させて結果を取得
    print(r)

await main()

[0, 4949842, 9899853, 14850333, 19798688, 24749292, 29698347, 34650229, 39602254, 44548188, 49498952, 54449004, 59400835, 64348560, 69302384]
21.681085432000145


In [40]:
# ブロッキング
import asyncio
import time
import random
def calc(x):
    r = 0
    for i in range(10**7):
        if random.random() < 0.99:
            r += i % x
    return r

@measure
def main():
    r = [calc(i) for i in range(1, 16)]
    print(r)

main()

[0, 4950143, 9900174, 14850373, 19800223, 24750781, 29701750, 34648908, 39596854, 44549535, 49496051, 54451633, 59398020, 64347779, 69293910]
21.624882266999975


In [42]:
# マルチスレッド
import time
import profile
from concurrent.futures import ThreadPoolExecutor as TPE
import random
def calc(x):
    r = 0
    for i in range(10**7):
        if random.random() < 0.99:
            r += i % x
    return r

@measure
def main():
    with TPE(max_workers=16) as exe:
        r = [r for r in exe.map(calc, list(range(1, 16)))]
    print(r)

main()


[0, 4949960, 9900207, 14849805, 19801069, 24750697, 29699973, 34648522, 39600089, 44552550, 49500535, 54453700, 59396486, 64349553, 69305112]
26.62063147100025
26.620727062225342


In [44]:
# マルチプロセス
# https://docs.python.org/ja/3/library/concurrent.futures.html

import time
import profile
from concurrent.futures import ProcessPoolExecutor as PPE
import random
def calc(x):
    r = 0
    for i in range(10**7):
        if random.random() < 0.99:
            r += i % x
    return r

@measure
def main():
    with PPE(max_workers=16) as exe:
        r = [r for r in exe.map(calc, list(range(1, 16)))]
    print(r)

if __name__ == "__main__":
    main()


Process SpawnProcess-28:
Process SpawnProcess-35:
Process SpawnProcess-24:
Process SpawnProcess-33:
Process SpawnProcess-38:
Process SpawnProcess-36:
Process SpawnProcess-25:
Process SpawnProcess-29:
Process SpawnProcess-26:
Process SpawnProcess-27:
Process SpawnProcess-34:
Process SpawnProcess-37:
Process SpawnProcess-30:
Process SpawnProcess-31:
Process SpawnProcess-32:
Traceback (most recent call last):
  File "/usr/local/Cellar/python@3.9/3.9.12/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/local/Cellar/python@3.9/3.9.12/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/Cellar/python@3.9/3.9.12/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/process.py", line 237, in _process_worker
    call_item = call_queue.get(block=True)
  File "/usr/local/Cellar/pyt

BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.