# 並行処理と並列処理 ── 複数の処理を同時に行う

## 逐次処理で実行する

## 並行処理で実行する

#### マルチスレッドとGIL

## 並列処理で実行する

## Pythonと並行処理

### 並行処理と非同期処理の関係

# concurrent.futuresモジュール ── 並行処理のための高水準インタフェース

## FutureクラスとExecutorクラス ── 非同期処理のカプセル化と実行

In [None]:
# ThreadPoolExecutorはExecutorの具象サブクラス
from concurrent.futures import (
    ThreadPoolExecutor,
    Future
)

In [None]:
# 非同期に行いたい処理
def func():
    return 1

In [3]:
# 非同期に行いたい処理をsubmit()に渡す
future = ThreadPoolExecutor().submit(func)
isinstance(future, Future)

True

In [4]:
# 非同期で実行した処理の戻り値を取得
future.result()

1

In [5]:
# 現在の状態を確認する
future.done()

True

In [6]:
future.running()

False

In [7]:
future.cancelled()

False

## ThreadPoolExecutorクラス ── スレッドベースの非同期実行

### スレッドベースの非同期実行が効果的なケース

## ThreadPoolExecutorクラスを利用したマルチスレッド処理の実例

In [8]:
# 対象ページのURL一覧
urls = [
    'https://twitter.com',
    'https://facebook.com',
    'https://instagram.com',
]

In [9]:
from hashlib import md5
from pathlib import Path
from urllib import request

In [10]:
def download(url):
    req = request.Request(url)
    # ファイル名に/などが含まれないようにする
    name = md5(url.encode('utf-8')).hexdigest()
    file_path = './' + name
    with request.urlopen(req) as res:
        Path(file_path).write_bytes(res.read())
        return url, file_path

In [11]:
# 動きを確認
download(urls[0])

('https://twitter.com', './be8b09f7f1f66235a9c91986952483f0')

### 逐次処理で実装

In [12]:
import time
def elapsed_time(f):
    def wrapper(*args, **kwargs):
        st = time.time()
        v = f(*args, **kwargs)
        print(f"{f.__name__}: {time.time() - st}")
        return v
    return wrapper

In [13]:
@elapsed_time
def get_sequential():
    for url in urls:
        print(download(url))

In [14]:
get_sequential()

('https://twitter.com', './be8b09f7f1f66235a9c91986952483f0')
('https://facebook.com', './a023cfbf5f1c39bdf8407f28b60cd134')
('https://instagram.com', './09f8b89478d7e1046fa93c7ee4afa99e')
get_sequential: 3.2928359508514404


### マルチスレッドで実装

In [15]:
from concurrent.futures import (
    ThreadPoolExecutor,
    as_completed
)

In [16]:
@elapsed_time
def get_multi_thread():
    # max_workersのデフォルトはコア数x5
    with ThreadPoolExecutor(max_workers=3) as executor:
        futures = [executor.submit(download, url)
                   for url in urls]
        for future in as_completed(futures):
            # 完了したものから取得できる
            print(future.result())

In [17]:
get_multi_thread()

('https://twitter.com', './be8b09f7f1f66235a9c91986952483f0')
('https://facebook.com', './a023cfbf5f1c39bdf8407f28b60cd134')
('https://instagram.com', './09f8b89478d7e1046fa93c7ee4afa99e')
get_multi_thread: 1.3061559200286865


### マルチスレッドの注意点

### マルチスレッドでの動作に問題がある実装

In [18]:
from concurrent.futures import (
    ThreadPoolExecutor,
    wait
)

In [19]:
class Counter:
    def __init__(self):
        self.count = 0
    def increment(self):
        self.count = self.count + 1

In [20]:
def count_up(counter):
    # 1,000,000回インクリメントする
    for _ in range(1000000):
        counter.increment()

In [21]:
counter = Counter()
threads = 2
with ThreadPoolExecutor() as e:
    # 2つのスレッドを用意し、それぞれでcount_upを呼び出す
    futures = [e.submit(count_up, counter)
               for _ in range(threads)]
    done, not_done = wait(futures)

In [22]:
# 数値をカンマ区切りで表示
# 2,000,000にはなっていない
print(f'{counter.count=:,}')

counter.count=1,602,970


### スレッドセーフな実装

In [23]:
import threading
class ThreadSafeCounter:
    # ロックを用意する
    lock = threading.Lock()
    def __init__(self):
        self.count = 0
    def increment(self):
        with self.lock:
            # 排他制御したい一連の処理をこのブロック内に書く
            self.count = self.count + 1

In [24]:
counter = ThreadSafeCounter()
threads = 2
with ThreadPoolExecutor() as e:
    futures = [e.submit(count_up, counter)
               for _ in range(threads)]
    done, not_done = wait(futures)

In [25]:
# 期待通りの値になっている
print(f'{counter.count=:,}')

counter.count=2,000,000


## ProcessPoolExecutorクラス ── プロセスベースの非同期実行

### プロセスベースの非同期実行が効果的なケース

## ProcessPoolExecutorクラスを利用したマルチプロセス処理の実例

サンプルコードでは最終的なファイルのみを配布しています

(注:fib.py)
import sys

def fibonacci(n):
    a, b = 0, 1
    for _ in range(n):
        a, b = b, b + a
    else:
        return a

def main():
    n = int(sys.argv[1])
    print(fibonacci(n))

if __name__ == '__main__':
    main()

# 適当な値に調整すること
!python3 fib.py 1000000

### 逐次処理で実装

(注:fib.py)
import os
import time
import sys

def fibonacci(n):
    a, b = 0, 1
    for _ in range(n):
        a, b = b, b + a
    else:
        return a

def elapsed_time(f):
    def wrapper(*args, **kwargs):
        st = time.time()
        v = f(*args, **kwargs)
        print(f"{f.__name__}: {time.time() - st}")
        return v
    return wrapper

@elapsed_time
def get_sequential(nums):
    for num in nums:
        print(fibonacci(num))

def main():
    n = int(sys.argv[1])
    nums = [n] * os.cpu_count()
    get_sequential(nums)

if __name__ == '__main__':
    main()

!python3 fib.py 1000000

### マルチプロセスで実装

(注:fib.py)
import os
import time
import sys
from concurrent.futures import (
    ProcessPoolExecutor,
    as_completed
)

def fibonacci(n):
    a, b = 0, 1
    for _ in range(n):
        a, b = b, b + a
    else:
        return a

def elapsed_time(f):
    def wrapper(*args, **kwargs):
        st = time.time()
        v = f(*args, **kwargs)
        print(f"{f.__name__}: {time.time() - st}")
        return v
    return wrapper

@elapsed_time
def get_sequential(nums):
    for num in nums:
        print(fibonacci(num))

@elapsed_time
def get_multi_process(nums):
    with ProcessPoolExecutor() as e:
        futures = [e.submit(fibonacci, num)
                   for num in nums]
        for future in as_completed(futures):
            print(future.result())

def main():
    n = int(sys.argv[1])
    nums = [n] * os.cpu_count()
    get_multi_process(nums)

if __name__ == '__main__':
    main()

!python3 fib.py 1000000

In [26]:
!cat fib.py

import os
import time
import sys
from concurrent.futures import (
    ThreadPoolExecutor,
    as_completed
)

def fibonacci(n):
    a, b = 0, 1
    for _ in range(n):
        a, b = b, b + a
    else:
        return a

def elapsed_time(f):
    def wrapper(*args, **kwargs):
        st = time.time()
        v = f(*args, **kwargs)
        print(f"{f.__name__}: {time.time() - st}")
        return v
    return wrapper

@elapsed_time
def get_sequential(nums):
    for num in nums:
        print(fibonacci(num))

@elapsed_time
def get_multi_thread(nums):
    with ThreadPoolExecutor() as e:
        futures = [e.submit(fibonacci, num)
                   for num in nums]
        for future in as_completed(futures):
            print(future.result())

def main():
    n = int(sys.argv[1])
    nums = [n] * os.cpu_count()
    get_multi_thread(nums)

if __name__ == '__main__':
    main()

!python3 fib.py 1000000

### マルチプロセスの注意点

### pickle化できるオブジェクトを使う

In [27]:
!cat unpickle.py

from concurrent.futures import (
    ProcessPoolExecutor,
    wait
)

func = lambda: 1

def main():
    with ProcessPoolExecutor() as e:
        future = e.submit(func)
        done, _ = wait([future])
    print(future.result())

if __name__ == '__main__':
    main()

In [28]:
!python3 unpickle.py

concurrent.futures.process._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/queues.py", line 239, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function <lambda> at 0x10c615820>: attribute lookup <lambda> on __main__ failed
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "unpickle.py", line 15, in <module>
    main()
  File "unpickle.py", line 12, in main
    print(future.result())
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/_base.py", line 432, in result
    return self.__get_result()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/_base.py", lin

### 乱数の取り扱い方

In [29]:
!cat rand.py

from concurrent.futures import (
    ProcessPoolExecutor,
    as_completed
)
import numpy as np

def use_numpy_random():
    # 乱数生成器を初期化する場合はこの行を実行する
    # np.random.seed()
    return np.random.random()

def main():
    with ProcessPoolExecutor() as e:
        futures = [e.submit(use_numpy_random)
                   for _ in range(3)]
        for future in as_completed(futures):
            print(future.result())

if __name__ == '__main__':
    main()

In [30]:
!pip install numpy

You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [31]:
# macOSでPython3.8で実行
!python3 rand.py

0.008641189922594883
0.27818897134018383
0.947495469896293


In [32]:
!docker run -it --rm -v $(pwd):/usr/src/app -w /usr/src/app python:3.8.1 bash -c 'pip install numpy; python3 rand.py'

Collecting numpy
[?25l  Downloading https://files.pythonhosted.org/packages/d7/6a/3fed132c846d1e47963f30376cc041e9dd586d286d931055ad06ff65c6c7/numpy-1.17.4-cp38-cp38-manylinux1_x86_64.whl (20.5MB)
[K     |████████████████████████████████| 20.5MB 3.1MB/s eta 0:00:01
[?25hInstalling collected packages: numpy
Successfully installed numpy-1.17.4
0.840262178496553
0.4398793934941102
0.8109793620522091


In [33]:
!cat standard_rand.py

from concurrent.futures import (
    ProcessPoolExecutor,
    as_completed
)
import random

def use_starndard_random():
    return random.random()

def main():
    with ProcessPoolExecutor() as e:
        futures = [e.submit(use_starndard_random)
                   for _ in range(3)]
        for future in as_completed(futures):
            print(future.result())

if __name__ == '__main__':
    main()

In [34]:
!docker run -it --rm -v $(pwd):/usr/src/app -w /usr/src/app python:3.8.1 python3 standard_rand.py

0.5355067978782904
0.08670975807252346
0.42519421341484154


# asyncioモジュール ──イベントループを利用した並行処理を行う

## コルーチン ── 処理の途中で中断、再開する

### async構文を使ったコルーチンの実装

In [35]:
async def coro():
    return 1

In [36]:
# 戻り値は1ではなくコルーチンオブジェクト
coro()

<coroutine object coro at 0x10d5896c0>

In [37]:
import asyncio

In [38]:
await coro()

1

### await構文を使ったコルーチンの呼び出しと中断

In [39]:
import asyncio
import random

In [40]:
async def call_web_api(url):
    # Web APIの処理をここではスリープで代用
    print(f'send a request: {url}')
    await asyncio.sleep(random.random())
    print(f'got a response: {url}')
    return url

In [41]:
async def async_download(url):
    # awaitを使ってコルーチンを呼び出す
    response = await call_web_api(url)
    return response

In [42]:
result = await async_download('https://twitter.com/')

send a request: https://twitter.com/
got a response: https://twitter.com/


In [43]:
result

'https://twitter.com/'

### コルーチンの並行実行

In [44]:
async def main():
    task = asyncio.gather(
        async_download('https://twitter.com/'),
        async_download('https://facebook.com'),
        async_download('https://instagram.com'),
    )
    return await task

In [45]:
result = await main()

send a request: https://twitter.com/
send a request: https://facebook.com
send a request: https://instagram.com
got a response: https://instagram.com
got a response: https://twitter.com/
got a response: https://facebook.com


In [46]:
result

['https://twitter.com/', 'https://facebook.com', 'https://instagram.com']

## コルーチンのスケジューリングと実行

### イベントループ ── asyncioモジュールの中心的な機構

In [47]:
import asyncio
async def main():
    loop = asyncio.get_running_loop()
    print(loop)

In [48]:
await main()

<_UnixSelectorEventLoop running=True closed=False debug=False>


### タスク ── スケジューリングしたコルーチンをカプセル化

In [49]:
async def coro(n):
    await asyncio.sleep(n)
    return n

In [50]:
async def main():
    task = asyncio.create_task(coro(1))
    print(task)
    return await task

In [51]:
# print()時点ではまだPending状態
await main()

<Task pending name='Task-9' coro=<coro() running at <ipython-input-49-adc0461ab5af>:1>>


1

In [52]:
# タスクを作成して実行
# 3秒で完了する
async def main():
    task1 = asyncio.create_task(coro(1))
    task2 = asyncio.create_task(coro(2))
    task3 = asyncio.create_task(coro(3))
    print(await task1)
    print(await task2)
    print(await task3)

In [53]:
await main()

1
2
3


In [54]:
# コルーチンのまま実行
# こちらは6秒かかる
async def main():
    print(await coro(1))
    print(await coro(2))
    print(await coro(3))

In [55]:
await main()

1
2
3


### 非同期I/O ── イベントループに適したI/O処理

### 同期I/Oを利用する処理のタスク化

In [56]:
async def main():
    loop = asyncio.get_running_loop()
    # 同期I/Oを利用するdownloadからタスクを作成
    futures = [loop.run_in_executor(
        None, download, url) for url in urls]
    for result in await asyncio.gather(*futures):
        print(result)

In [57]:
await main()

('https://twitter.com', './be8b09f7f1f66235a9c91986952483f0')
('https://facebook.com', './a023cfbf5f1c39bdf8407f28b60cd134')
('https://instagram.com', './09f8b89478d7e1046fa93c7ee4afa99e')


## asyncioモジュールとHTTP通信

### aiohttp ── 非同期I/Oを利用するHTTPクライアント兼サーバライブラリ

# 本章のまとめ