<a href="https://colab.research.google.com/github/suwatoh/Python-learning/blob/main/128_%E3%83%97%E3%83%AD%E3%82%BB%E3%82%B9%E3%81%A8%E3%82%B9%E3%83%AC%E3%83%83%E3%83%89.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

プロセスとスレッド
==================

並行処理と並列処理
------------------

現代の CPU の内部には、独立して処理作業を行う回路のブロックが複数組み込まれており、それぞれが単体の処理装置のように振る舞う。これらを **CPU コア**と呼ぶ。CPU が複数の CPU コアを持つことを**マルチコア**（multi-core）と表現する。

CPU コアに処理させる対象が複数あって、処理を「同時に」行わせたい場合、CPU コアの処理形態には次の 2 種類がある。

  * **並行処理**（concurrent processing）:  
*1 個の CPU コア* が複数の処理を切り替えながら進めること。「複数の処理を切り替え」るとは、開始した処理を中断し、別の処理を再開することをいう。
  * **並列処理**（parallel processing）:  
*複数の CPU コア* が割り振られた処理を同時に進めること。

並行処理では、どの時間でも 1 個の CPU コアが 1 つの処理作業を行っているにすぎないが、処理の切り替えを素早く行うことによって「同時に」行っているように見せかける。これは、ワンオペレーションの飲食店で全ての作業が切り盛りされる様子に似ている（画像は[いらすとや](https://www.irasutoya.com/2018/06/blog-post_850.html)より）。

<img src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEh5Dn4bGYN7AWxje4ecO8Ezh6Qqdn82HjlXE2iJtVj8Sl51a6DKoUTr5YrLloo8wRlf13YuzBPRAJ_mKW2TtYhOSIrAOiNg1WTBEpbMdf5enBQuuWnu6zeoxd2qhptHd-YYtUPwB01dKIA0/s800/job_one_operation_man.png" width="400">

並列処理では、複数の CPU コアを使って真の意味で「同時に」処理が進められるので、並行処理より確実に処理時間が短くなる。しかしながら、並列処理を行うには同時に行う処理の数に対して CPU コアの数が足りていることが必要であり、この条件がいつも満たされるとは限らない。

プロセスとスレッド
------------------

### プロセス ###

**プロセス**（process）は、プログラムを実行する実体（インスタンス）である。ハードディスク上に存在するプログラムがメモリ上に展開され、CPU で実行されている状態を指す。OS は、各プロセスを識別するために**プロセス ID**（process ID; PID）と呼ばれる一意な識別子を割り当てる。Linux では `ps` コマンド、Windows では `tasklist` コマンドで現在実行中のプロセスが一覧表示される。プロセスが終了すると、OS は割り当てたメモリを解放する。

In [None]:
!ps

    PID TTY          TIME CMD
      1 ?        00:00:00 docker-init
      7 ?        00:00:00 node
     11 ?        00:00:00 oom_monitor.sh
     13 ?        00:00:00 run.sh
     14 ?        00:00:00 kernel_manager_
     37 ?        00:00:00 tail
     63 ?        00:00:06 python3 <defunct>
     64 ?        00:00:00 colab-fileshim.
     82 ?        00:00:03 jupyter-noteboo
     83 ?        00:00:00 dap_multiplexer
    599 ?        00:00:03 python3
    626 ?        00:00:00 python3
    654 ?        00:00:00 language_servic
    662 ?        00:00:18 node
    794 ?        00:00:00 sleep
    795 ?        00:00:00 ps


実行中のプロセスに関して、以下の Python 関数が利用できる。

| 関数 | 機能 | 戻り値 |
|:---|:---|:---|
| `os.getpid()` | 実行中のプロセス ID を返す | `int` |
| `os.getppid()` | 親プロセス（実行中のプロセスを生成したプロセス）のプロセス ID を返す | `int` |

In [None]:
import os
os.getpid(), os.getppid()

(599, 82)

Unix 系 OS では、実行中のプロセスに関する情報を取得・設定する以下の Python 関数が利用できる。

| 関数 | 機能 | 戻り値 |
|:---|:---|:---|
| `os.getuid()` | 実行中のプロセスの実ユーザー ID を返す | `int` |
| `os.geteuid()` | 実行中のプロセスの実効ユーザー ID を返す | `int` |
| `os.getgid()` | 実行中のプロセスの実グループ ID を返す | `int` |
| `os.getegid()` | 実行中のプロセスの実効グループ ID を返す | `int` |
| `os.getgroups()` | 実行中のプロセスに関連付けられた従属グループ ID のリストを返す | `list` |
| `os.getpriority(which, who)` | プログラムのスケジューリング優先度を取得する。`which` は `PRIO_PROCESS`、`PRIO_PGRP`、あるいは `PRIO_USER` のいずれかとする。`who` <br />は `which` に応じて解釈される（`PRIO_PROCESS` であればプロセス ID、`PRIO_PGRP` であればプロセスグループ ID、そして `PRIO_USER` であ<br />ればユーザー ID)。`who` が `0` の場合、実行中のプロセス、そのグループと実ユーザー ID を意味する | `list` |
| `os.setuid(uid, /)` | 実行中のプロセスのユーザー id を設定する | `int` |
| `os.seteuid(euid, /)` | 実行中のプロセスに実効ユーザー ID をセットする | `None` |
| `os.setgid(gid, /)` | 実行中のプロセスにグループ ID をセットする | `None` |
| `os.setegid(egid, /)` | 実行中のプロセスに実効グループ ID をセットする | `None` |
| `os.setgroups(groups, /)` | 実行中のグループに関連付けられた従属グループ ID のリストを `groups` に設定する。`groups` はグループを特定する整数のリストとす<br />る。通常、この操作はスーパユーザーしか利用できない | `None` |
| `os.setpriority(which, who, priority)` | プログラムのスケジューリング優先度を設定する。`which` と `who` は `os.getpriority()` と同じ。`priority` は `-20` から `19` の整数値で、<br />デフォルトの優先度は `0`。小さい数値ほど優先されるスケジューリングとなる | `None` |

これらの関数は Windows では定義されない（呼び出すと `AttributeError` 例外が発生する）。

In [None]:
import os
os.getuid(), os.geteuid(), os.getgid(), os.getegid(), os.getgroups(), os.getpriority(os.PRIO_PROCESS, 0)

(0, 0, 0, 0, [0], 0)

メモリ上でプロセスは次のような構成となる。

  * **テキストセグメント**: プログラムそのものが格納される読み出し専用の領域
  * **データセグメント**:
      * **静的領域**: 定数やグローバル変数を格納する領域
      * **ヒープ領域**: オブジェクトのように、実行するまで容量が未確定で一時的に必要とするデータを格納する領域
  * **スタックセグメント**: 関数とその中に定義された変数を下から順に格納し、上から実行され取り出されていく領域。**スタック領域**ともいう。

``` text
             メモリマップ
0xffffffff┏━━━━━━━━┓
          ┃                ┃
          ┃  スタック領域  ┃ スタックセグメント
      SP→┃                ┃
          ┣━━━━━━━━┫
                   ↓ 自動拡張

          ┏━━━━━━━━┓
          ┃共有ライブラリ１┃
          ┗━━━━━━━━┛
          ┏━━━━━━━━┓
          ┃共有ライブラリ２┃
          ┗━━━━━━━━┛

                   ↑ 自動拡張
          ┣━━━━━━━━┫
          ┃                ┃
          ┃   ヒープ領域   ┃
          ┃                ┃ データセグメント
          ┠────────┨
          ┃    静的領域    ┃
          ┣━━━━━━━━┫
          ┃                ┃
      PC→┃                ┃ テキストセグメント
          ┃                ┃ （読み出し専用）
         0┗━━━━━━━━┛

```

ヒープ領域が不足すると、Python は `MemoryError` 例外を送出する。スタック領域が不足すると、Python はエラー（stack overflow）を出してインタープリターの実行を中断する。

共有ライブラリは、ヒープ領域とスタック領域の間にある。どの番地に割り当てられるかは、システムに依存する。

また、メモリとは別に、 CPU 内にはレジスタと呼ばれる記憶装置があり、そこにはスタック領域のどこを見ているかを指す**スタックポインタ**（SP）と、プログラムのどこを実行しているかを指す**プログラムカウンタ**（PC）が置かれている。プロセスは、メモリ上の構成と SP、PC を最小セットする。このデータの最小セットを**コンテキスト**（context）と呼ぶ。

### マルチプロセス ###

同時に実行するプロセスが複数存在することを**マルチプロセス**（multiprocessing）という。マルチプロセスでも各プロセスにコンテキストが作られることに変わりはないから、各プロセスのメモリ空間は独立していてメモリ上のデータを共有することはない。

1 つの CPU コアは 1 つのプロセスだけが利用できる。CPU コアが足りていれば、マルチプロセスは並列処理される。CPU コアが不足すれば、マルチプロセスは並行処理される。1 つの CPU コアで実行されるプロセスが切り替わることは**コンテキストスイッチ**（context switching）と呼ばれる。具体的には、CPU コアがあるプロセスを実行している最中に処理を中断して現在のプロセスのコンテキストを特定のメモリ領域などに保存し、別のプロセスのコンテキストを読み込んで処理を再開するということが行われる。コンテキストスイッチによるプロセスの切り替えは、人間には認識できないほどに高速に行われるため、複数のプロセスが同時に実行されているように感じられる。

### スレッド ###

1 つのアプリケーション内で複数のプロセスを起動し並列処理を行うことも可能である。しかしながら、各プロセスは独立のメモリ空間が割り当てられるので、起動時にそれなりのオーバーヘッドが発生する。また、プロセス間でデータを交換するには特別な仕組みが必要となる。

そこで、1 つのプロセス内で並列処理を行う仕組みが作られた。このとき各 CPU コアを占有する一連の処理の流れのことを**スレッド**（thread）と呼ぶ。つまり、スレッドはプロセスより細かい CPU コア利用の最小単位とされるものである。プロセス起動時には必ず 1 個のスレッドが生成されるので、1 個のプロセスには最小限 1 個のスレッドが存在する。プロセス起動時に生成されるスレッドを**メインスレッド**と呼ぶ。メインスレッドから枝分かれするスレッドが存在することを**マルチスレッド**（multithread）といい、マルチスレッドでないものは**シングルスレッド**（singlethread）という。枝分かれしたスレッドからさらに枝分かれすることもできる。

``` text
            
  ┏━━━━━━━━━━┓     ┏━━━━━━━━━━━━━━┓
  ┃（シングルスレッド）┃     ┃     （マルチスレッド）     ┃
  ┃    ┌────┐    ┃     ┃        ┌────┐        ┃
  ┃    │ 処理Ａ │    ┃     ┃        │ 処理Ａ │        ┃
  ┃    └────┘    ┃     ┃        └────┘        ┃
  ┃         ↓         ┃     ┃         ↙        ↘         ┃
  ┃    ┌────┐    ┃     ┃ ┌────┐  ┌────┐ ┃
  ┃    │ 処理Ｂ │    ┃     ┃ │ 処理Ｂ │  │ 処理Ｄ │ ┃
  ┃    └────┘    ┃     ┃ └────┘  └────┘ ┃
  ┃         ↓         ┃     ┃      ↓            ↓      ┃
  ┃    ┌────┐    ┃     ┃ ┌────┐  ┌────┐ ┃
  ┃    │ 処理Ｃ │    ┃     ┃ │ 処理Ｃ │  │ 処理Ｅ │ ┃
  ┃    └────┘    ┃     ┃ └────┘  └────┘ ┃
  ┗━━━━━━━━━━┛     ┗━━━━━━━━━━━━━━┛
```

スレッドは、メモリのスタック領域とスタックポインタとプログラムカウンタから構成される。ヒープ領域と静的領域は同じものをスレッド間で共有する。つまり、処理を行う関数などの流れを作成して、変数などのデータは共有するという仕組みである。スレッドの構成はプロセスの構成より小さいため、スレッド生成時に発生するオーバーヘッドはプロセス生成時のものより少ない。また、共有するデータについてはスレッド間でデータ交換を行う必要がない。

スレッドは CPU コアを占有するから、並列処理が可能なスレッドの数は物理的に存在するコア数に制限されるはずである。しかしながら、この制限を緩和する技術として**同時マルチスレッディング**（Simultaneous Multi-Threading; **SMT**）が存在する。ハイパースレッディング・テクノロジー（Hyper-Threading Technology）は、インテルの SMT 実装に対する同社の登録商標である。

SMT の概要は以下のとおり。物理的に存在する 1 個の CPU コアは複数の回路から構成され、それらは I/O 制御、整数演算、浮動小数点数演算などの役割を持つ。個々のスレッドは、CPU コア内の全ての回路を使うわけではないことが多い。たとえば、フィボナッチ数を調べるプログラムは整数演算ばかり行うスレッドを生成し、そのスレッドの処理中は浮動小数点数演算を行う回路が使われない。そこで、使用されない回路を他のスレッドに割り当て、2 つのスレッドが同時に実行するようにしたものが SMT である。各スレッドからは SMT は隠蔽され、2 つの CPU コアが動作しているかのように見える。このようにソフトウェア側から見える疑似的な CPU コアを**論理コア**と呼ぶ。SMT が有効な場合、1 スレッド - 1 論理コアという対応になる。

`os.cpu_count()` 関数を使うと、論理コア数を取得することができる。この関数は、論理コア数を取得できなかった時 `None` を返す。標準ライブラリの `multiprocessing` パッケージが提供する同名も論理コア数を返すが、論理コア数を取得できなかった時 に` NotImplementedError` 例外を送出する。

In [None]:
import os, multiprocessing
os.cpu_count(), multiprocessing.cpu_count()

(2, 2)

### タスク ###

1 つのまとまった処理または命令を**タスク**（task）と呼ぶ。どの大きさでまとめるとタスクと言えるという定義はない。タスクは、文脈によって、スレッド内の命令の集合を指していたり、スレッドそのものを指していたり、プロセスを指していたり、プロセスの集合を指していたりする。

マルチプロセスは、**マルチタスク**（multitasking）とも呼ばれる。なお、マルチタスクは日本において NEC の登録商標である。

threading
---------

プロセスとスレッドは OS によって管理される。OS のスレッド周りの API に対する標準の Python インターフェースとしては、`_thread` モジュールと `threading` モジュールがある。`_thread` は OS の API に近い低レベルのインターフェースを提供していて、`threading` は `_thread` の上に構築された高レベルなインターフェースを提供している。よって、通常は `threading` を使う。

### インスタンス化 ###

`threading` モジュールは、スレッドを `threading.Thread` オブジェクトとして扱う。コンストラクタは次のとおり:

``` python
threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
```

コンストラクタは常にキーワード引数を使って呼び出さなければならない。各引数は以下のとおり:

| 引数 | 意味 |
|:---|:---|
| `group` | `None` にする必要がある。この引数は将来 `ThreadGroup` クラスが実装されるときの拡張用に予約されている |
| `target` | インスタンスの `run()` メソッドによって起動される呼び出し可能オブジェクトを指定する。デフォルトでは何も呼び出さないことを示す `None` になっている |
| `name` | スレッドの名前を指定する。値はインスタンスの `name` 属性で参照できる。デフォルトでは、`Thread-N`（`N`は小さな 10 進数）、`target` 引数が指定された場合 `Thread-N (target)` という<br />形式でユニークな名前が構成される |
| `args` | `target` を呼び出すときの引数のリストかタプルを指定する。デフォルトは `()` |
| `kwargs` | `target` を呼び出すときのキーワード引数を辞書で指定する。デフォルトは `{}` |
| `daemon` | このスレッドがデーモン（メインメモリ上に常駐してバックグラウンドで動作するプログラム）であるか否かを示すブール値を設定する。値はインスタンスの `daemon` 属性で参照できる。<br />`None`（デフォルト）の場合、`daemon` 属性は現在のスレッドから継承される |

`threading.Thread` オブジェクトの識別には、`name` 属性（読み書き可能なプロパティ）または `ident` 属性（読み取り専用プロパティ）が使える。`ident` 属性はスレッドが開始されていなければ `None` で、開始されていれば非ゼロの整数が格納されている。

`threading.Thread` オブジェクトのメソッドは次のとおり:

| メソッド | 機能 | 戻り値 |
|:---|:---|:---|
| `start()` | スレッドを開始し、`run()` メソッドを実行する。このメソッドはオブジェクトあたり一度だけ呼び出すことができる | `None` |
| `run()` | このスレッド内で `target` を呼び出す | `None` |
| `join(timeout=None)` | スレッドが終了するまで待機する。`timeout` に浮動小数点数を指定した場合は、このメソッドは `timeout` 秒でタイムアウトする（次のコードに進む）。<br />現在のスレッドに対してこのメソッドを呼び出そうとすると `RuntimeError` 例外を送出する | `None` |
| `is_alive()` | スレッドが動作中の場合に `True` を返し、そうでない場合に `False` を返す。`join()` がタイムアウトしたかどうかはこのメソッドで確認できる | `bool` |

`threading` モジュールは、ロードされると暗黙のうちにメインスレッド（Python インタープリターが起動したスレッド）に対応するオブジェクトを作成する。このオブジェクトは、`threading.Thread` クラスを継承する `threading._MainThread` クラスのインスタンスであり、`name` 属性が `'MainThread'` になっている。

``` python
threading.current_thread()
```

この関数は、関数を呼び出している処理のスレッドに対応する `threading.Thread` オブジェクトを返す。関数を呼び出している処理のスレッドが `threading` モジュールで生成したものでない場合、限定的な機能しかもたないダミースレッドオブジェクトを返す。

``` python
threading.enumerate()
```

この関数は、現在、アクティブな `threading.Thread` オブジェクト全てのリストを返す。リストには、終了したスレッドとまだ開始していないスレッドは入らない。しかし、メインスレッドは、たとえ終了しても、常に結果に含まれる。

次のコードは、1 秒ごとに 3 回まで `print()` 関数で出力する `run()` 関数を 2 つのスレッドで実行する。

In [None]:
import threading
import time


def run(base):
    for i in range(3):
        time.sleep(1.0)
        print("{}: {}".format(threading.current_thread().name, base + i))


def main(*args):
    print(f"started at {time.strftime('%X')}")
    for t in args:
        t.start()
    for t in args:
        t.join()
    print(f"finished at {time.strftime('%X')}")


if __name__ == "__main__":
    t1 = threading.Thread(target=run, name="t1", args=(10,))
    t2 = threading.Thread(target=run, name="t2", args=(20,))
    main(t1, t2)

started at 02:39:45
t1: 10
t2: 20
t1: 11
t2: 21
t1: 12
t2: 22
finished at 02:39:48


`t1` と `t2` が交替で実行されていることが確認できる。2 つのスレッドを開始してから終了するまでの時間が 3 秒なので、並列処理が行われているように見える。

次のコードは、リストを 20 万回 `append()` する `run()` 関数を 2 個のスレッドで実行し、その後に同関数を 1 個のスレッドで実行する。

In [None]:
import threading
import time


def run():
    a = []
    for i in range(20000000):
        a.append(0)


def main(*args):
    print(f"started at {time.strftime('%X')}")
    for t in args:
        t.start()
    for t in args:
        t.join()
    print(f"finished at {time.strftime('%X')}")


if __name__ == "__main__":
    t1 = threading.Thread(target=run)
    t2 = threading.Thread(target=run)
    t3 = threading.Thread(target=run)
    print("2スレッド-----------")
    main(t1, t2)
    print("1スレッド-----------")
    main(t3)

2スレッド-----------
started at 02:39:48
finished at 02:39:54
1スレッド-----------
started at 02:39:54
finished at 02:39:57


1 スレッドの処理時間が 2 スレッドの処理時間の半分となっているから、2 スレッドは並列処理されていないことがわかる。2 つのコードの実行結果の違いはどのような理由によるものであろうか。

### GIL ###

実は、 CPython では、マルチスレッドがあっても CPU コアを占有できるのはロックを保持する単一のスレッドに限られる。このロックを**グローバルインタープリターロック**（Global Interpreter Lock）、略して GIL（ギル）と呼ぶ。GIL の獲得と解放により、動作するスレッドが切り替わる。つまり、**CPython のマルチスレッドは完全に並行処理となる**。ただし、GIL は Python で書かれた処理についての制限であり、モジュールが利用する C などで書かれた処理は GIL の制限を受けず、並列処理が可能である。

CPython に GIL が存在する理由は、C におけるデータ競合を回避するためである。C では、1 つのプロセス内の 2 つ以上のスレッドがメモリ上の同じ場所に同時にアクセスすることによってプログラムが C の規格で未定義の振る舞いをする現象があり、この現象を**データ競合**（data race）と呼ぶ。データ競合はソフトウェアセキュリティの脆弱性につながる可能性もある。データ競合を回避するには、メモリへのアクセスを制御（ロック）する必要がある。ところが、Python インタープリターの実装では、ロックが必須なメモリアクセスが多すぎて、パフォーマンスが大きく低下することがわかっている。そこで、個別のメモリアクセスを制御するのではなく、スレッドの動作全体を制御する GIL が導入された。

データ競合は C に限らない現象であるが、その定義や扱いはプログラミング言語ごとに異なる。このため、GIL の存在は Python 言語の仕様として要求されていない。実際、Java による Python 実装（Jython）には GIL は存在しない。しかしながら、最も広く使用されている Python 実装は CPython なので、事実上マルチスレッド Python プログラムは並行処理と考えなければならない。

シングルスレッドでは、データ競合は発生しないので、GIL の解放は起こらないようになっている。つまり、シングルスレッドの場合、スレッドは何事もなく最後まで実行される。

マルチスレッドの場合、GIL の獲得と解放がおおむね以下のように行われる。

  * あるスレッドが GIL を獲得すると、それだけが CPU コアを占有する。他のスレッドは（実行中であっても）休止する。
  * GIL を保持するスレッドは、I/O 操作が発生したら、自発的に全てのスレッドに通知を行う。これを受けて、他のスレッドが GIL を獲得する。
  * 休止中のスレッドは、タイムアウト（デフォルトで 5 ミリ秒後）になったら C のグローバル変数 `gil_drop_request` （デフォルト値は 0）を 1 に設定する。`gil_drop_request` が 1 の場合、GIL を保持するスレッドは全てのスレッドに通知を行う。これを受けて他のスレッドが GIL を獲得すると、GIL を解放したスレッドに通知し、`gil_drop_request` は 0 に戻される。
  * スレッドの切り替え時にはコンテキストスイッチが行われるので、スレッドが GIL を解放した後、再び GIL を獲得して実行する場合、前回の続きから再開する。

タイムアウト時間は Python コードで `sys.getswitchinterval()` 関数を使うと参照できる。また、`sys.setswitchinterval(interval)` によってタイムアウト時間を変更することもできる。

In [None]:
import sys
sys.getswitchinterval()

0.005

GIL の解放には I/O 操作発生によるものとタイムアウトによるものがあるわけであるが、この違いがスレッドの処理内容によってパフォーマンスに異なる影響を与えることになる。処理内容を、I/O 操作を行うものとそれ以外のもの（論理演算や整数演算、浮動小数点数演算など）に分けて、前者を **I/O バウンド**（I/O bound）、後者を **CPU バウンド**（CPU-bound）と呼ぶ。システム時刻の問い合わせ、標準入出力やファイルの読み書き、ネットワーク通信はすべて I/O バウンドな処理であり、Python オブジェクトの操作は CPU バウンドな処理である。2 種類の処理で GIL の影響の違いは次のように整理できる。

  * CPU バウンドな処理:  
計算の途中でもタイムアウトによりスレッドが切り替わるため、余計に計算コストがかかり遅くなる。
  * I/O バウンドな処理:  
I/O 操作が行われるたびにスレッドが切り替わる。遅いネットワーク通信などを待たずに他の処理に移ることができるため効率がよい。

先に上げた 2 つのコードのうち、1 番目のものについては、`run()` 関数内での `time.sleep()` 関数の実行と `print()` 関数の実行はともに I/O バウンドであり、I/O 操作が発生して下図のようにスレッドが切り替わるため、2 つのスレッドが交替で実行され、スリープ時間で処理時間が 2 倍になることはなかった。

``` text
           ┌─────────────────────────────────────────────────────┐
           │      ┌───────┐                                            ┌─────┐                        │
t1 thread: │run() │ time.sleep() │I/O 操作 ………………[スリープ]……………… │ print()  │I/O 操作   （以下省略） │
           │      └───────┘              タイムアウト  タイムアウト    └─────┘                        │
           └─────────────────────────────────────────────────────┘
                                    ↓      ↑           ↓    ↑      ↓     ↑                ↓      ↑
           ┌─────────────────────────────────────────────────────┐
           │              ┌───────┐          タイムアウト  タイムアウト   ┌─────┐                     │
t2 thread: │run()         │ time.sleep() │I/O 操作 …………[スリープ]………………│ print()  │I/O 操作 （以下省略）│
           │              └───────┘                                       └─────┘                     │
           └─────────────────────────────────────────────────────┘
```

2 番目のコードについては、`run()` 関数を実行するスレッドは CPU バウンドであり、2 スレッドの場合はタイムアウトでスレッドが切り替わる並行処理が行わる。このため、1 スレッドだけで計算する場合より処理時間が 2 倍以上長くなったのである。

では、CPU バウンドなスレッドと、I/O バウンドなスレッドが同時に走っている場合はどうなるのであろうか。この場合、先に I/O バウンドなスレッドが GIL を獲得すれば、I/O 操作発生で CPU バウンドなスレッドに切り替わり、I/O の待機中に処理が終わって効率が良い。しかし、スレッドは優先度を持たないため、CPU バウンドなスレッドが先に GIL を獲得することがあり、そうなるとタイムアウトになってから I/O バウンドなスレッドに切り替わるので I/O 待ちの待機時間の分だけ実行時間が長引くことになる。このように非効率な形で GIL が獲得される現象を **Convoy Effect** という。Convoy Effect が発生することを防ぐ方法はない。

なお、GIL によってマルチスレッドが完全に並行処理とされることは、スレッド数を CPU コア数以上にスケールしやすいということでもある。常に 1 個の CPU コアしか利用されないからである。とくに、I/O バウンドなスレッドの複数同時実行は、1 個の CPU コアしかなくても上記のようにあたかも並列処理が行われるようにみえる。

### デーモンスレッド ###

コンストラクタの `daemon` 引数またはインスタンスの `daemon` プロパティを `True` に設定したスレッドはデーモンとして稼働する。デーモンスレッドは、メインスレッドを含めてデーモンでないスレッドがすべて終了すると停止する。

次のコードでは、`t2` スレッドで実行される `run2()` 関数はグローバル変数 `request`に値を代入して終了する。`t1` スレッドはデーモンとして稼働し、そこで実行される `run1()` 関数は無限ループを実行する。その無限ループの中で、`request` 変数に値が代入されたらその値を出力する。

``` python
import time
import threading

request = None


def run1():
    global request
    while True:
        if request is None:
            time.sleep(0.2)
            print("{}: running...".format(threading.current_thread().name))
        else:
            print("{}: {} is received".format(threading.current_thread().name, request))
            request = None


def run2():
    global request
    time.sleep(0.5)
    request = "hoge"
    time.sleep(0.2)
    print("{}: end".format(threading.current_thread().name))


def main():
    t1 = threading.Thread(target=run1, name="t1", daemon=True)
    t2 = threading.Thread(target=run2, name="t2")
    t1.start()
    t2.start()
    t2.join()


if __name__ == "__main__":
    main()
    print("{}: end".format(threading.current_thread().name))
```

このコードの実行結果は以下のようになる。

``` shell
t1: running...
t1: running...
t1: running...
t1: hoge is received
t2: end
MainThread: end
```

`run1()` 関数の無限ループは、break 文がないにもかかわらずメインスレッドの終了後に停止するため、プログラムはちゃんと終了することがわかる。なお、`main()` 関数の最後に `t1.join()` を加えると、`t1.join()` は `run1()` 関数の終了を待つのでプログラムはいつまでたっても終了しなくなることに注意する。

### 競合状態 ###

マルチスレッドの実行順序やタイミングによって異なる結果が生まれる状態を**競合状態**（race condition）という。競合状態とデータ競合は、似て非なる概念である。GIL の存在によって Python ではデータ競合は発生しないが、競合状態は起こり得る。

たとえば、次のコードでは、`counter()` 関数を動かすスレッドを 3 つ作成している。`counter()` 関数では、引数に渡された辞書 `data` に対して `data['n']` の値（初期値 0）を 1 増加する処理を行うが、3 つのスレッドが終了した時点で `data['n']` の値は 3 になるとは限らない。

In [None]:
import threading
import time


def counter(data):
    n = data['n']
    time.sleep(0.2)
    data['n'] = n + 1
    print(f"{threading.current_thread().name}: {data['n']=}")


def main():
    data = {'n': 0}
    threads = [threading.Thread(target=counter, name=f"t{i}", args=(data,)) for i in range(3)]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
    print(f"{threading.current_thread().name}: {data['n']=}")


if __name__ == "__main__":
    main()

t0: data['n']=1
t2: data['n']=1t1: data['n']=1

MainThread: data['n']=1


最初に GIL を獲得したスレッドで `counter()` 関数内のローカル変数 `n` に `data['n']` の初期値 0 を代入し、直後に `time.sleep()` を呼び出す。この時点でスレッドが切り替わり、別のスレッドでもローカル変数 `n` に `data['n']` の初期値 0 が代入される。もし `time.sleep()` の呼び出しまでに 3 つのスレッドが 1 回ずつ実行されていれば、各スレッドでローカル変数 `n` の値は 0 になっている。この場合、その後のコンテキストスイッチにより、今度は各スレッドで `data['n']` に `0 + 1` の計算結果値が代入されることになるため、3 つのスレッドが終了した時点で `data['n']` の値は 3 になるとは限らないわけである（1 になるとも 2 になるとも限らない）。

競合状態での結果は、「プログラマーがそう書いたからそう動いている」だけであり、プログラミング言語で未定義の動作が起こっているわけではない。ただ、その結果がプログラマーの意図に反するものであればバグと言える。

### スレッドの同期 ###

複数の処理について、処理の進行を待ち合わせることを**同期**（synchronous）という。同期でないものは**非同期**（asynchronous）という。マルチスレッドまたはマルチプロセスのプログラミングにおいて、異なるスレッドやプロセス間で同期を取るための制御機構を**同期プリミティブ**（synchronization primitive）という。

マルチスレッドの競合状態を、複数人でオールを漕いでボートが進む状態に例えるなら、漕ぎ手がスレッド、ボートがプログラムであり、漕ぎ手のタイミングを合わせないとボートが正しい方向に進まないように、スレッド間で同期を取らないとプログラムが正しく動作しないのである（画像は[いらすとや](https://www.irasutoya.com/2012/07/blog-post_3963.html)より）。同期プリミティブは、漕ぎ手のタイミング合わせに使われる掛け声や笛に相当する。

<img src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhkr3KmVJffiUV0pebpKjpZLEVi7KVbmRbH7y4BBFZHkee5Yq8TtT7kDQxdB8VnrJ6wMFOsuwB1Lm9ER2GilJ0XIb28mHwxAz3sOSJLz48r9uGwd5BjoQcUoLf3FC1sTxSQ7-uA8Knx9D8/s800/olympic21_boat_4.png" width="400">

`threading` モジュールは、以下の同期プリミティブをサポートする。

  * ミューテックス（ロック）
  * セマフォ
  * イベント（Event）
  * コンディション（Condition）── 一般には条件変数（Condition Variables）と呼ばれる
  * バリア（Barrier）

### 直列化とミューテックス ###

スレッドを列に並ばせるように待機させ、一つずつ順番に実行させるような同期の取り方を**逐次化**あるいは**直列化**（serialize）と呼ぶ。共有データを扱う場面での競合状態では、直列化を行わないとバグが発生する可能性がある。

一方、あるスレッドが共有データなどの資源を利用している間は、他のスレッドが同じものにアクセスすることを制限もしくは禁止する仕組みのことを**排他制御**という。とくに、1 つのスレッドが資源を占有し、他のスレッドからのアクセスを禁止するという排他制御の方式は**ロック**（lock）と呼ばれる。GIL もロックの一種である。ロック方式は、代表的な排他制御なので、排他制御を意味する英語 mutual exclusion を略した形で**ミューテックス**（mutex）とも呼ばれる。ミューテックスは、直列化の手法として利用される。

``` python
threading.Lock()
```

これはロックを返す。ただし、実際には `threading.Lock` というクラスが定義されているわけではなく、これは `_thread.get_ident()` 関数の別名であることに注意すること。

ロックのメソッドは次のとおり。

| メソッド | 機能 | 戻り値 |
|:---|:---|:---|
| `acquire(blocking=True, timeout=-1)` | ロックの獲得を試み、実際に獲得すると `True` を、獲得できなかったとき `False` を返す。他のスレッドが既にロックを獲得している場合、<br />`blocking` と `timeout` の指定によってメソッドの挙動が異なる。`blocking` が `True`（デフォルト）なら、ロックが解放されるまで待機する。<br />`blocking` が `False` なら、直ちに `False` を返し待機しない。`timeout` に正の浮動小数点が設定された場合、`timeout` 秒だけ待機する。<br />`blocking` が `False` の場合に `timeout` を指定することは禁止される | `bool` |
| `release()` | ロックを解放する。これはロックを獲得したスレッドだけでなく、任意のスレッドから呼ぶことができる | `None` |

ロックはコンテキストマネージャーとして使用できる。`acquire()` メソッドは with ブロックに入るときに呼び出され、`release()` メソッドはブロックを終了するときに呼び出される。したがって、次のコード片

``` python
with some_lock:
    # do something...
```

これは、以下と同じ。

``` python
some_lock.acquire()
try:
    # do something...
finally:
    some_lock.release()
```

次のコードは、競合状態のコード例にロックを追加したものである。

In [None]:
import threading
import time


def counter(data, lock):
    with lock:  # ロックの獲得と解放
        n = data['n']
        time.sleep(0.2)
        data['n'] = n + 1
        print(f"{threading.current_thread().name}: {data['n']=}")


def main():
    data = {'n': 0}
    lock = threading.Lock()  # ロックを作成
    threads = [threading.Thread(target=counter, name=f"t{i}", args=(data, lock)) for i in range(3)]  # ロックを渡す
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
    print(f"{threading.current_thread().name}: {data['n']=}")


if __name__ == "__main__":
    main()

t0: data['n']=1
t1: data['n']=2
t2: data['n']=3
MainThread: data['n']=3


`data['n']` の値は、各スレッドが動かす `counter()` 関数により確実に 1 増加され、3 つのスレッドが終了した時点で 3 になる。この実行結果から、`counter()` 関数内の with 文でロックを獲得したスレッドが with 文の本体を処理している間、スレッド切り替えが行われず、このため `data['n']` に対する操作が直列化されていることがわかる。

一方、ロックの解放を待つスレッドは、待機中に別の処理ができれば効率的である。この場合、ロックの `acquire()` メソッドを引数 `blocking=False` として呼び出すと、ロックを獲得できなければ待機せず直ちに `False` を返すことを利用する。

次のコードは、前のコードに以下の変更を加えたものである。

  * `counter()` 関数は、無限の while ループを使って、ロックを獲得できれば `data['n']` の値を 1 増加してロック解放後にループを終了するが、ロックを獲得できない間は別の作業（時刻の表示）をする。

In [None]:
import threading
import time


def counter(data, lock):
    while True:
        if lock.acquire(blocking=False):
            print(f"{threading.current_thread().name}: ロックを獲得した")
            n = data['n']
            time.sleep(0.2)
            data['n'] = n + 1
            print(f"{threading.current_thread().name}: {data['n']=}")
            lock.release()
            break
        else:
            print(f"{threading.current_thread().name}: {time.strftime('%X')}")
            time.sleep(0.2)


def main():
    data = {'n': 0}
    lock = threading.Lock()  # ロックを作成
    threads = [threading.Thread(target=counter, name=f"t{i}", args=(data, lock)) for i in range(3)]  # ロックを渡す
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
    print(f"{threading.current_thread().name}: {data['n']=}")


if __name__ == "__main__":
    main()

t0: ロックを獲得した
t1: 02:39:57t2: 02:39:57

t0: data['n']=1
t2: ロックを獲得した
t1: 02:39:58
t2: data['n']=2
t1: ロックを獲得した
t1: data['n']=3
MainThread: data['n']=3


実行結果から、あるスレッドがロックを保持している間も、他のスレッドは待機せずに時刻表示処理を行っていることがわかる。このように、ロックの獲得で待機しないことを**ノンブロッキング**（non-blocking）であるという。

### デッドロック ###

**デッドロック**（deadlock）とは、2 つ以上のスレッド（あるいはプロセス）が互いに待機しあってしまい、結果としてどの処理も先に進めなくなってしまうことをいう。

以下のコードは、デッドロックを発生させる簡単な例である。

``` python
import time
import threading


def worker1(data, lock1, lock2):
    with lock1:
        n = data['n']
        time.sleep(0.2)
        with lock2:
            m = data['m']
            data["n"] = n - m
            print(f"{threading.current_thread().name}: {data['n']=}")


def worker2(data, lock1, lock2):
    with lock2:
        m = data['m']
        time.sleep(0.2)
        with lock1:
            n = data['n']
            data['m'] = m + n
            print(f"{threading.current_thread().name}: {data['m']=}")


def main():
    data = {'n': 0, 'm': 0}
    lock1 = threading.Lock()
    lock2 = threading.Lock()
    threads = [
        threading.Thread(target=worker1, name="t1", args=(data, lock1, lock2)),
        threading.Thread(target=worker2, name="t2", args=(data, lock1, lock2)),
    ]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
    print(f"{threading.current_thread().name}: {data['n']=}")


if __name__ == "__main__":
    main()
```

このコードでは、2 つの共有データの要素 `data['n']` と `data['m']` に対するアクセスを別々に制御するため、2 つのロック `lock1` と `lock2` を作成する。`worker1()` 関数は `lock1` を獲得した後、`lock2` を獲得しようとする。同様に、`worker2()` 関数は `lock2` を獲得した後、`lock1` を獲得しようとする。ロックの獲得が循環的になっているため、いつまでも条件を満たせず、デッドロックが発生する。

この問題は、**食事する哲学者の問題**として知られる。円卓に麺と箸 1 本が交互に並べられた状況で、席に着いた哲学者はまず左側の箸を取ってから次に右側の箸を取ってよいという制約があると仮定する。全員が同時に左側の箸を取ってしまうと、右側の箸が使えず、いつまでたっても食事ができないことになる（画像は https://docs.oracle.com/cd/E19205-01/821-2500/gepdy/index.html より引用）。

![](https://docs.oracle.com/cd/E19205-01/821-2500/images/figure2.gif)

デッドロックを検出するには、タイムアウトを設定する。つまり、ロックの `acquire()` メソッド呼び出し時に `timeout` を設定し、メソッドの戻り値が `False` なら例外を送出する。

``` python
try:
    acquired = lock1.acquire(timeout=3)
    if not acquired:
        raise TimeoutError("{}: Deadlock detected".format(threading.current_thread().name))
    # do something
finally:
    lock1.release()
```

ただし、ロック獲得にタイムアウトを設定する場合、同時にノンブロッキングには設定できないことに注意する。

### 再入可能ロック ###

デッドロックは、同じスレッドで同じロックを複数回獲得しようとする場合にも発生する。

たとえば、次のコードでは `worker()` 関数が再帰関数であり、呼び出すと引数 `times` を増やしながら再帰呼び出しが行われ、`times` が 3 以上になったら終了することを意図している。

``` python
import time
import threading


def worker(data, times, lock):
    print(f"{times=}")
    if times >= 3:
        return None
    else:
        with lock:
            n = data['n']
            time.sleep(0.2)
            data['n'] = n + 1
            worker(data, times + 1, lock)


def main():
    data = {'n': 0}
    lock = threading.Lock()
    thread = threading.Thread(target=worker, args=(data, 0, lock))
    thread.start()
    thread.join()
    print(f"{data['n']=}")


if __name__ == "__main__":
    main()
```

ところが、このコードを実行すると、次のように出力されたまま先に進まない:

``` text
times=0
times=1
```

1 回目の再帰呼び出しから先に進めないというデッドロックが起こる。これは、`worker()` の with 文の中でロックが解放されないまま再帰呼び出しがあり、そこでロックを獲得できず待機するからである。

この場合は、次の関数により得られる**再入可能ロック**（reentrant lock）を使用すればデッドロックを回避できる:

``` python
threading.RLock()
```

通常のロックと再入可能ロックの違いは、以下の点だけである。

  * いったんスレッドが再入可能ロックを獲得すると、同じスレッドはブロックされずにもう一度それを獲得できる。
  * そのスレッドは獲得した回数だけ再入可能ロックを解放しなければならない。

次のコードは、上記のコードを、再入可能ロックを使用するように変更したものである。

In [None]:
import threading
import time


def worker(data, times, lock):
    print(f"{times=}")
    if times >= 3:
        return None
    else:
        with lock:
            n = data['n']
            time.sleep(0.2)
            data['n'] = n + 1
            worker(data, times + 1, lock)


def main():
    data = {'n': 0}
    lock = threading.RLock()
    thread = threading.Thread(target=worker, args=(data, 0, lock))
    thread.start()
    thread.join()
    print(f"{data['n']=}")


if __name__ == "__main__":
    main()

times=0
times=1
times=2
times=3
data['n']=3


### セマフォ ###

ミューテックスより緩く、最大〇個のスレッドが共有資源にアクセスして良いとする排他制御の方式を**セマフォ**（semaphore）という。セマフォは、内部に空き数を示すカウンターを持つ。

セマフォでは直列化を実現できない。セマフォは、メモリ不足の回避やサーバーの負荷分散などの理由で、同時実行数に制限が必要な場合に使用される。

`threading` は、2 種類のセマフォをサポートする。1 つは通常のセマフォであり、もう 1 つは**有限セマフォ**（bounded semaphore）である。これらはそれぞれ `threading.Semaphore` クラスと `threading.BoundedSemaphore` クラスで実装される。インスタンス化は次のとおり:

``` python
threading.Semaphore(value=1)
threading.BoundedSemaphore(value=1)
```

`value` 引数は内部カウンターの初期値である。`value` を指定しない場合、デフォルトの値は 1 になる。セマフォは、ロックと同様に `acquire()` メソッドと `release()` メソッドを持ち、コンテキストマネージャーとして使用できる。

`acquire()` メソッドを呼び出すとカンターが 1 つ減少し、`release()` メソッドを呼び出すとカウンタが 1 つ増加する。カウンターが 0 の場合に `acquire()` メソッドを呼び出すと、カウンターが 1 以上になるまで待機する。この動作は、`acquire()` メソッドの引数 `blocking` と `timeout` で変更できる。

`release()` メソッドは引数 `n` として整数値を渡すことができ、内部カウンターを `n` だけ増加することもできる（`n` のデフォルトの値が 1 とされる）。通常のセマフォの `release()` メソッドの場合、何度呼び出してもカウンターは増加し続け初期値を上回ることができる。これに対して、有限セマフォの `release()` メソッドの場合、カウンターが初期値を上回ると `ValueError` 例外を送出する。

`acquire()` メソッドと `release()` メソッドを直接使う代わりに with 文でセマフォを使う限り、セマフォの内部カウンターが初期値を上回ることは考えにくいのであるが、カウンターの上限を確実に初期値に制限したい場合は、有限セマフォを選択するとよい。

次のコードは、Web サーバーから複数のファイルをダウンロードする処理をマルチスレッド化する例である。最大同時接続数を 2 とするために有限セマフォを利用している。

In [None]:
import random
import threading
import time


def download(url, semaphore):
    with semaphore:
        print(f"{url} からのダウンロード開始")
        time.sleep(random.uniform(0.2, 1.0))  # ダウンロードの処理時間をシミュレート
        print(f"{url} からのダウンロード終了")


def main():
    semaphore = threading.BoundedSemaphore(2)
    threads = [threading.Thread(target=download, args=(f"url{i}", semaphore)) for i in range(5)]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()


if __name__ == "__main__":
    main()

url0 からのダウンロード開始
url1 からのダウンロード開始
url1 からのダウンロード終了
url2 からのダウンロード開始
url2 からのダウンロード終了
url3 からのダウンロード開始
url0 からのダウンロード終了
url4 からのダウンロード開始
url4 からのダウンロード終了
url3 からのダウンロード終了


### イベントとスレッド間通信 ###

**イベント**は、特定の条件が満たされたことをスレッド間で**通知する**（notify）ために使用される。たとえば、あるタスクが完了したことを他のタスクに知らせたい場合に使用される。このようにスレッド間で通知が行われることを**スレッド間通信**と呼ぶ。

イベントは内部にフラグを持ち、このフラグがセットされたりクリアされたりする。同期を取りたいスレッド間でイベントを共有し、あるスレッドにより内部フラグがセットされることで他のスレッドは通知を受ける。このようにスレッド間通信と言っても、その仕組みは単純で、各スレッドからアクセスできる変数（グローバル変数や引数など）を介して値を操作したり確認したりしているだけである。

``` python
threading.Event()
```

これは、内部フラグの値が `False` であるイベントオブジェクトを返す。イベントオブジェクトは、次のメソッドを持つ:

| メソッド | 機能 | 戻り値 |
|:---|:---|:---|
| `set()` | 内部フラグの値を `True` にセットする | `None` |
| `clear()` | 内部フラグの値を `False` にリセットする | `None` |
| `wait(timeout=None)` | 内部フラグがセットされない間は待機する。内部フラグがセットされると待機を解除し、プログラムの実行を続ける。`timeout` に浮動小数点数を指定した場合は、<br />このメソッドは `timeout` 秒でタイムアウトする（プログラムの実行を続ける）。戻り値は、内部フラグがセットされたために返された場合は `True`、`timeout` が指定<br />されてタイムアウトした場合は `False` | `bool` |
| `is_set()` | 内部フラグが `True` にセットされているとき `True` を返す | `bool` |

`set()` でいったん内部フラグが `True` になると、スレッドが `wait()` を呼び出しても待機しなくなる。`clear()` で内部フラグをリセットすれば、`wait()` を呼び出したスレッドは待機するようになる。

次のコードは、イベントの使用例である。`worker()` 関数が動くスレッドたちは、`wait()` を呼び出すと待機しイベントを監視する。`event_trigger()` 関数が動くスレッドが `set()` で内部フラグをセットすると、`worker()` 関数が動くスレッドたちは同時に開始される。

In [None]:
import threading
import time


def worker(event):
    event.wait()
    print(f"{threading.current_thread().name}: start")
    time.sleep(0.2)
    print(f"{threading.current_thread().name}: end")


def event_trigger(event):
    print(f"{threading.current_thread().name}: start")
    time.sleep(0.5)
    print(f"{threading.current_thread().name}: end")
    event.set()


def main():
    event = threading.Event()
    threads = [
        threading.Thread(target=worker, name="t1", args=(event,)),
        threading.Thread(target=worker, name="t2", args=(event,)),
        threading.Thread(target=event_trigger, name="t3", args=(event,)),
    ]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()


if __name__ == "__main__":
    main()

t3: start
t3: end
t2: start
t1: start
t2: end
t1: end


イベントは、あるスレッドの処理を完了させてから、その結果を利用して別のスレッドで処理を行いたい場合に便利である。ミューテックスではスレッドの順番を指定できないことに注意する。

### コンディション ###

**コンディション**も、イベントと同様に特定の条件が満たされたときにスレッドの実行を再開するための同期プリミティブである。イベントと同様に `wait()` を呼び出したスレッドたちが待機し、通知を受けると起こされる。イベントと違って、コンディションは内部にロックを持つ。スレッドの実行再開は、ロック方式の排他制御が行われて直列化される。

``` python
threading.Condition(lock=None)
```

これはコンディションオブジェクトを返す。デフォルトでは、内部ロックとして再入可能ロックが新たに自動的に作成される。既存のロックをコンストラクタの引数として渡すこともでき、これにより複数のコンディションで同じロックを共有できる。

コンディションオブジェクトは、次のメソッドを持つ:

| メソッド | 機能 | 戻り値 |
|:---|:---|:---|
| `acquire(*args)` | 内部ロックの `acquire()` メソッドを呼び出し、その戻り値を返す | `bool` |
| `release()` | 内部ロックの `release()` メソッドを呼び出す | `None` |
| `wait(timeout=None)` | 通知を受けるか、タイムアウトするまで待機する。このメソッドはいったんロックを解放し、一度スレッドが起こされると、再度ロックを獲得して処理を戻す。<br />与えられた `timeout` が過ぎていなければ返り値は `True` となる。タイムアウトした場合には `False` を返す | `bool` |
| `notify(n=1)` | 待機中スレッドのうち `n` 個のスレッドを起こす。`n` のデフォルト値は `1` | `None` |
| `notify_all()` | 全ての待機中スレッドを起こす | `None` |

コンディションは、ロックと同様にコンテキストマネージャーとして使用できる。

`acquire()` と `release()` 以外のメソッドは、呼び出し側のスレッドがロックを獲得していないときに呼び出すと `RuntimeError` が送出される。

次のコードは、イベントの使用例をコンディションで書き直した例である。

In [None]:
import threading
import time


def worker(condition):
    with condition:
        condition.wait()
        print(f"{threading.current_thread().name}: start")
        time.sleep(0.2)
        print(f"{threading.current_thread().name}: end")


def event_trigger(condition):
    with condition:
        print(f"{threading.current_thread().name}: start")
        time.sleep(0.5)
        print(f"{threading.current_thread().name}: end")
        condition.notify_all()


def main():
    condition = threading.Condition()
    threads = [
        threading.Thread(target=worker, name="t1", args=(condition,)),
        threading.Thread(target=worker, name="t2", args=(condition,)),
        threading.Thread(target=event_trigger, name="t3", args=(condition,)),
    ]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()


if __name__ == "__main__":
    main()

t3: start
t3: end
t2: start
t2: end
t1: start
t1: end


`t1` スレッドは、`worker()` 関数内の `with condition` でロックを獲得して `wait()` を呼び出すと、いったんロックを解放して待機する。`t2` スレッドも同様である。`t3` スレッドは、`event_trigger()` 関数内の `with condition` でロックを獲得すると、ロックを保持したまま処理を完了し、最後に `notify_all()` を呼び出す。これにより `t1` スレッドが起こされると、再度ロックを獲得して続きの処理を行う。`t2` スレッドも同様である。`t1` スレッドと `t2` スレッドがロックを伴って動作するため、直列化される。

コンディションは、共有データを扱う場面での競合状態で、スレッドの順序付けと直列化を併用したい場合に便利である。

### バリア ###

``` python
threading.Barrier(parties, action=None, timeout=None)
```

これはバリアオブジェクトを返す。**バリア**は、指定された数のスレッドが到達するまで待つような地点を設定する機能を提供する。

具体的には、コンストラクタに渡される `parties` がスレッドの必要数であり、バリアオブジェクトの `wait()` メソッドでバリアの地点が設定される。つまり、`wait()` を呼んだ地点でスレッドが待機し、`parties` 個のスレッドが `wait()` を呼ぶと、それらは同時にすべて解放される。呼び出し可能オブジェクトが `action` としてコンストラクタに渡されていれば、スレッドが解放される時にそのうちの 1 つによって呼ばれる。

実のところ、バリアは内部にコンディションを持ち、スレッドの待ち合わせと解放がコンディションを使ったスレッド間通信により実現されている。このため、必要な数のスレッドがバリアの地点に到達しないと、デッドロックが発生する。デッドロックを回避するには、タイムアウトを使用する必要がある。`wait()` は `timeout` 引数を取り、それに浮動小数点数を与えると `timeout` 秒後にタイムアウトし、バリアが破壊される。スレッドが待っている間にバリアが破壊された場合、`wait()` メソッドは `threading.BrokenBarrierError` 例外を送出する。コンストラクタの `timeout` 引数でデフォルトのタイムアウト時間を指定できる。

次のコードはバリアの使用例である。

In [None]:
import random
import threading
import time


def report():
    print("{}: passed through the barrier".format(threading.current_thread().name))


def worker(barrier):
    time.sleep(random.uniform(0.2, 1.0))
    print(f"{threading.current_thread().name}: start")
    try:
        barrier.wait(timeout=1.0)
    except threading.BrokenBarrierError:
        print(f"{threading.current_thread().name}: TIMEOUT")
    else:
        print(f"{threading.current_thread().name}: end")


def main():
    barrier = threading.Barrier(2, action=report)
    threads = [threading.Thread(target=worker, name=f"t{i}", args=(barrier,)) for i in range(5)]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()


if __name__ == "__main__":
    main()

t4: start
t2: start
t2: passed through the barrier
t2: end
t4: end
t1: start
t3: start
t3: passed through the barrier
t3: end
t1: end
t0: start
t0: TIMEOUT


### スレッドローカルデータ ###

`threading.local` は、**スレッドローカルデータ**と呼ばれ、スレッド間で共有するオブジェクトでありながら、属性値は共有されないという特殊なオブジェクトの型である。`threading.local` の使用では、競合状態が起こらない。

スレッドローカルデータの例としては、各スレッドの動作を記録するログが挙げられる。ログには次のようなデータが記録される。

  * スレッド名
  * セッション ID
  * リクエスト ID
  * エラー情報

これらはスレッドに固有であり、スレッドの存続期間に渡って使用されるデータであるが、スレッドの実行に必要ではないため、スレッドで実行する関数の引数として渡したり、戻り値や例外メッセージとして呼び出し元に返すのは面倒である。この場合の一般的な解決策は、スレッドローカルデータを使用することである。

次のコードでは、`threading.local` を使用する例である。`local` は `threading.local` のインスタンスで、`worke` スレッドで `x` 属性が変更されるにもかかわらず、メインスレッドでは `x` 属性の変更が共有されない。

In [None]:
import threading

local = threading.local()


def worker():
    local.x = 1
    print(f"{threading.current_thread().name}: {local.x=}")


def main():
    local.x = 0

    thread = threading.Thread(target=worker, name="worker")
    thread.start()
    thread.join()

    print(f"{threading.current_thread().name}: {local.x=}")


if __name__ == "__main__":
    main()

worker: local.x=1
MainThread: local.x=0


### スレッドのカスタマイズ ###

`threading.Thread` クラスを継承して、`run()` メソッドをオーバーライドしたサブクラスを利用することができる。もとの `run()` メソッドは、インスタンス化の際に `target` が渡されていれば、それを実行するだけである。`run()` メソッドをオーバーライドすることで、スレッドの動作をカスタマイズできる。

クラス継承を使用したカスタマイズの良い例が、`threading.Timer` である。`threading.Timer` は、`interval` 秒後に実行するスレッドである。次のコードは、その定義の要約である。

``` python
class Timer(Thread):
    def __init__(self, interval, function, args=None, kwargs=None):
        Thread.__init__(self)
        self.interval = interval
        self.function = function
        self.args = args if args is not None else []
        self.kwargs = kwargs if kwargs is not None else {}
        self.finished = Event()

    def cancel(self):
        """Stop the timer if it hasn't finished yet."""
        self.finished.set()

    def run(self):
        self.finished.wait(self.interval)
        if not self.finished.is_set():
            self.function(*self.args, **self.kwargs)
        self.finished.set()
```

`threading.Timer` は、内部にイベントオブジェクト `finished` を持ち、`run()` メソッドが `wait()` でタイムアウトすると `function` を実行するようにオーバーライドされている。`interval` 秒経過する前であれば中止できる `cancel()` メソッドも定義されている。

たとえば、次のコードを実行すると、30 秒後に `'hello, world'` が出力される。

``` python
def hello():
    print("hello, world")

t = Timer(30.0, hello)
t.start()
```

queue
-----

標準ライブラリの `queue` モジュールは、データ構造とコンディションを組み合わせて、スレッド間で共有するデータ構造から次々とアイテムを取り出し、アイテムに対する処理をロックを伴って行うことをサポートする。これは、複数のスレッドの間でデータを安全に交換しなければならないときのマルチスレッドプログラミングで特に有益である。

内部に持つデータ構造の種類により、以下のクラスが提供される。

  * `queue.Queue`: FIFO のデータ構造、すなわちキュー（`collections.deque` で実装される）
  * `queue.LifoQueue`: LIFO のデータ構造、すなわちスタック（`collections.deque` で実装される）
  * `queue.PriorityQueue`: 優先度付きキュー（`heapq` の関数で実装される）

下 2 つのクラスは `queue.Queue` の派生クラスであり、データ構造に関する内部メソッドをオーバライドしている。3 つのクラスのコンストラクタ引数とメソッドは共通している。

``` python
queue.Queue(maxsize=0)
```

コンストラクタの引数 `maxsize` は、キューに入れられるアイテム数の上限を設定する整数。`maxsize` が 0 以下の場合は、キューの大きさは無限となる。

`queue.Queue` は、キューに入れられたアイテムで処理が完了していないものを内部でカウントする。このカウントを「未完了タスクのカウント」と呼ぶ。アイテムをキューに入れるとき、「未完了タスクのカウント」はインクリメントされる。アイテムの処理が完了したら、「未完了タスクのカウント」をデクリメントできる。

主なメソッドは次のとおり。

| メソッド | 機能 | 戻り値 |
|:---|:---|:---|
| `put(item, block=True,`<br />` timeout=None)` | `item` をキューに挿入し、「未完了タスクのカウント」をインクリメントする。キューの上限が設定されていて上限に達していた場合、`block` が `True`（デフォ<br />ルト）なら挿入処理はキューのアイテムが取り出されて空きが出るまでブロックされる。ただし、`timeout` に浮動小数点数を指定していれば、空きが出な<br />いまま `timeout` 秒経過した時に `queue.Full` 例外を送出する。`block` が `False` なら直ちに `queue.Full` 例外を送出する（この場合 `timeout` は無視<br />される） | `None` |
| `put_nowait(item)` | `put(item, block=False)` と等価 | `None` |
| `get(block=True,`<br />` timeout=None)` | キューからロックを伴ってアイテムを取り出す。通知を受けるか、タイムアウトするまで待機する。キューが空の場合は、`block` が `True`（デフォルト）ならア<br />イテムが入るまでブロックされ、`block` が `False` なら直ちに `queue.Empty` 例外を送出する（この場合 `timeout` は無視される） | アイテム |
| `get_nowait()` | `get(block=False)` と等価 | アイテム |
| `empty()` | キューが空の場合は `True` を返し、そうでなければ `False` を返す | `bool` |
| `full()` | キューが一杯の場合は `True` を返し、そうでなければ `False` を返す | `bool` |
| `task_done()` | 内部コンディションの `notify_all()` を呼び出し、`get()` で待機中の全てのスレッドを起こす。また、「未完了タスクのカウント」をデクリメントする。この結<br />果 0 未満になった場合（つまりキューにある要素より多く呼び出された場合） `ValueError` 例外が発生する | `None` |
| `join()` | 「未完了タスクのカウント」が 0 になるまで待機する | `None` |

`join()` はキューにあるすべてのアイテムが取り出されて処理されるまで待機したい場合に使用する。この場合、アイテムの取り出しに使われた `get()` の後に必ず `task_done()` を呼び出すようにしないと、いつまでたっても「未完了タスクのカウント」が 0 にならず、`join()` で待機し続けることになるので注意。

次のコードは、5 つのアイテム（`1` から `5` までの整数）の処理を 2 つのスレッドで同時に行うためにキューを使用する例である。各スレッドが動かす `worker()` 関数は、while ループにより、キューからアイテムを取り出し処理が終わっても終了せず、キューが空になるまでアイテムの処理を繰り返す。このように、ある一定数のスレッドをあらかじめ作成し、必要に応じてタスクを割り振ったり、使い回したりする仕組みを**スレッドプール**（thread pool）と呼ぶ。

In [None]:
import queue
import threading
import time


def worker(q):
    while not q.empty():
        try:
            item = q.get_nowait()
        except queue.Empty:
            break
        else:
            print(f"{threading.current_thread().name}: Working on {item}")
            time.sleep(item * 0.1)
            print(f"{threading.current_thread().name}: Finished {item}")
            q.task_done()


def main():
    # Queue を設定
    q = queue.Queue()
    for i in range(1, 6):
        q.put(i)

    # スレッドプールを設定（2 つのスレッドからなる）
    threads = []
    for i in range(2):
        thread = threading.Thread(target=worker, name=f"t{i}", args=(q,))
        threads.append(thread)

    # スレッドの開始
    for thread in threads:
        thread.start()

    # キューが空になるまで待機
    q.join()
    print("全てのタスクが完了した")

    # 終了を待機
    for thread in threads:
        thread.join()


if __name__ == "__main__":
    main()

t0: Working on 1
t1: Working on 2
t0: Finished 1
t0: Working on 3
t1: Finished 2
t1: Working on 4
t0: Finished 3
t0: Working on 5
t1: Finished 4
t0: Finished 5
全てのタスクが完了した


`worker()` 関数では while の条件でキューが空でないことを確認しているが、`get_nowait()` でキューが空の場合の例外処理をしている。確認後にキューの状態が他のスレッドにより変更される可能性があるからである。`get_nowait()` はキューが空の場合に直ちに `queue.Empty` 例外を送出する。`queue.Empty` 例外を捕捉したら while ループを停止して関数を終了する（else 節は実行されない）。

実行結果から、各タスクが処理の完了したスレッドに次々と割り振られ、各スレッドではタスク処理が直列化されることがわかる。

スレッドセーフ
--------------

マルチスレッドでデータ競合による未定義動作を引き起こしたり、競合状態による意図しない動作を引き起こしたりせず、デッドロックなどの問題も起きないプログラムは**スレッドセーフ**（thread-safe）であるという。

以下は、Python でのマルチスレッドとスレッドセーフなコーディングのためのベストプラクティスである。

  * **マルチスレッドを使わない**  
マルチスレッドはプロセス内の制御フローが難解になる。メモリ共有のため、1 つのスレッドの問題がプロセス全体に影響を与える可能性がある。同期やロックを忘れたり、デッドロックが起こるというミスを防げない。再現性の低いバグを見つける作業は難しい。CPython 特有の問題として、CPU バウンドな処理ではかえって遅くなる。マルチプロセスを使うべきである。
  * **共有リソースを最小化する**  
あえてマルチスレッドを選択する場合（マルチプロセスでのオーバーヘッドが許容できないなど）、共有データを使用せず、同期やロックを考えなくて済むようにすること。スレッドローカルデータを活用すること。
  * **不変オブジェクトを利用する**  
値オブジェクトは上記の例外である。値オブジェクトは不変だから、自然とスレッドセーフである。
  * **ロックの範囲を最小化する**  
どうしても共有データを使わなければならない場合、ロックを獲得している間は、できるだけ少ない操作を行い、他のスレッドが待たされる時間を短縮すること。
  * **タイムアウトを使用する**  
`acquire()` と `wait()` にタイムアウトを設定することで、デッドロックが発生した場合でも、スレッドが適切に解放されるようになる。

multiprocessing
---------------

`multiprocessing` は、`threading` モジュールに似た API を使用してプロセスの生成をサポートするパッケージである。マルチプロセスでは GIL の影響を受けないので並列処理が行われ、CPU 資源を最大限に活用できる。

### プロセス開始方式 ###

現在のプロセスから新たにプロセスを開始するとき、現在のプロセスを**親プロセス**、新たなプロセスを**子プロセス**という。プロセスが新しいタスクを引き受けた時、子プロセスを作成して処理させることができる。

`multiprocessing` はプロセスを開始するために以下の方法をサポートしている。

  * **fork**: `fork()` というシステムコールを呼び出すことで現在のプロセスをコピーする。プロセスの開始が速いが、プロセスのメモリ空間をそのままコピーするのでメモリ消費量が多い。Unix 系 OS でのみ利用可能。
  * **spawn**: 新たに Python インタープリターが動くプロセスを開始する。現在のモジュールは再読み込みされて変数も新たに作り直される。プロセスの開始に時間がかかるが、プログラムが動くのに必要なリソースのみ継承されるのでメモリ消費量を抑えられる。多くのプラットフォームで利用可能。

デフォルトの開始方式は、Unix 系 では fork、Windows では spawn。将来的には spawn に統一される予定。現在の開始方式名は `multiprocessing.get_start_method()` 関数で確認できる。開始方式を `method` に設定するには、`multiprocessing.set_start_method(method)` を実行する。これは一度しか呼び出すことができず、その場所もメインモジュールの `if __name__ == '__main__'` 節内で保護された状態でなければならない。

In [None]:
import multiprocessing
multiprocessing.get_start_method()

'fork'

### インスタンス化 ###

プロセスを使って子プロセスを作成するには、`multiprocessing.Process` クラスをインスタンス化する。

``` python
multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
```

`multiprocessing.Process` クラスは、`threading.Thread` クラスと同様に使うことができる。ただし、以下のメソッドが加わっている:

| メソッド | 機能 | 戻り値 |
|:---|:---|:---|
| `terminate()` | プロセスを強制終了する。finally 句などは実行されないことに注意 | `None` |
| `close()` | `Process` オブジェクトを閉じ、関連付けられていたすべてのリソースを開放する。中のプロセスが実行中であった場合、`ValueError` を送出する | `None` |

次は、`threading.Thread` のインスタンス化のコード例を `multiprocessing.Process` クラスに書き換えただけである。

In [None]:
import multiprocessing
import time


def run():
    a = []
    for i in range(20000000):
        a.append(0)


def main(*args):
    print(f"started at {time.strftime('%X')}")
    for t in args:
        t.start()
    for t in args:
        t.join()
    print(f"finished at {time.strftime('%X')}")


if __name__ == "__main__":
    t1 = multiprocessing.Process(target=run)
    t2 = multiprocessing.Process(target=run)
    t3 = multiprocessing.Process(target=run)
    print("2プロセス-----------")
    main(t1, t2)
    print("1プロセス-----------")
    main(t3)

2プロセス-----------
started at 02:40:04
finished at 02:40:08
1プロセス-----------
started at 02:40:08
finished at 02:40:10


このコードでは、各プロセスが動かす `run()` 関数は CPU バウンドな処理を行う。手元のマシンでこのコードを実行すると、2 個の子プロセスを同時に開始する場合と 1 個の子プロセスを開始する場合で処理にかかる時間がほとんど同じであった。これにより、CPU バウンドな処理でも並列処理されていること、つまり GIL の影響を受けないことがわかる。無料版 Colab では利用できる CPU コア数（論理コア数）が 2 個であるため、Python インタープリターを起動したプロセス（**メインプロセス**という）と子プロセス 1 個しか並列処理できず、2 個の子プロセスは並行処理されることに注意する。

``` python
multiprocessing.freeze_support()
```

この関数は、`multiprocessing` を使用しているプログラムをフリーズして Windows の実行可能形式を生成するためのサポートを追加する。サードパーティ製の PyInstaller などで実行可能形式を生成する場合に必要。この関数は、メインモジュールの `if __name__ == '__main__'` の直後に呼び出す必要がある。以下に例を示す:

``` python
from multiprocessing import Process, freeze_support

def f():
    print('hello world!')

if __name__ == '__main__':
    freeze_support()
    Process(target=f).start()
```

`freeze_support()` の呼び出しは、Unix 系 OS では効果がない。また、Windows の通常の Python インタープリターによって実行されているならば、`freeze_support()` は効果がない。

``` python
multiprocessing.current_process()
```

この関数は、現在のプロセスに対応する `Process` オブジェクトを返す。

``` python
multiprocessing.active_children()
```

この関数は、現在のすべてのアクティブな子プロセスのリストを返す。

``` python
multiprocessing.parent_process()
```

この関数は、現在のプロセスの親プロセスに対応する `Process` オブジェクトを返す。現在のプロセスがメインプロセスの場合、`None` を返す。

In [None]:
from multiprocessing import Process, current_process
import time

def counter(data):
    data['n'] += 1
    print(f"{current_process().name}: {data['n']=}")
    time.sleep(0.2)

def main():
    data = {'n': 0}
    p = Process(target=counter, name='ChildProcess', args=(data,))
    p.start()
    p.join()
    print(f"{current_process().name}: {data['n']=}")

if __name__ == "__main__":
    main()

ChildProcess: data['n']=1
MainProcess: data['n']=0


このコードでは、子プロセスにおいて辞書 `data` の要素 `data['n']` をインクリメントしているが、その結果がメインプロセスに共有されず、メインプロセスでは `data['n']` は初期値 `0` のままである。fork では子プロセスに `data` のコピーが渡される（spawn では `data` が新たに作り直される）からである。

このように、マルチプロセスでは各プロセスが独立したメモリ空間を持つため、他のプロセスの影響を受けずに動作する。これにより、子プロセスの 1 つがクラッシュしても他の子プロセスやメインプロセスは影響を受けず、プログラム全体がダウンするリスクを減らすことができる。

### プロセスの同期 ###

一般的にマルチプロセスプログラムでは、メモリが共有されないため競合状態が起こらず、マルチスレッドプログラムほどには同期プリミティブを必要としないが、`multiprocessing` は `threading` モジュールと等価な同期プリミティブを備えている。

  * `multiprocessing.Lock`
  * `multiprocessing.RLock`
  * `multiprocessing.Semaphore`
  * `multiprocessing.BoundedSemaphore`
  * `multiprocessing.Event`
  * `multiprocessing.Condition`
  * `multiprocessing.Barrier`

本来互いに独立しているプロセス間でこうした同期プリミティブが使えるのは、OS の機能を介して特定の一時ファイルにアクセスすることで実現されている。このようにプロセス間でデータのやり取りをする仕組みは**プロセス間通信**（Inter-Process Communication）、略して IPC と呼ばれる。IPC は、共有メモリを利用するだけのスレッド間通信と比べると重い処理となる。

次の例では、ロックを使用して、一度に 1 つのプロセスしか標準出力に書き込まないようにしている:

In [None]:
from multiprocessing import Process, Lock, current_process
import time

def worker(lock, i):
    with lock:
        print(f"{current_process().name}: start")
        time.sleep(0.1)
        print(f"{current_process().name}: end")

if __name__ == "__main__":
    lock = Lock()
    for num in range(5):
        Process(target=worker, name=f"p{num}", args=(lock, num)).start()

p0: start
p0: end
p1: start
p1: end
p2: start
p2: end
p3: start
p3: end


ロックを使用しないで標準出力に書き込んだ場合は、各プロセスからの出力がごちゃまぜになってしまう。ただし、ロックの範囲ではマルチプロセスの性能を発揮できなくなる。

### パイプとキュー ###

**パイプ**（pipe）は、OS が提供する IPC の方式の 1 つであり、2 つのプロセスの入出力をつなぐ。`multiprocessing` は、パイプを利用するための関数 `Pipe` を提供している。

``` python
multiprocessing.Pipe(duplex=True)
```

パイプの両端を表すコネクションオブジェクトのペア `(conn1, conn2)` を返す。`conn1` と `conn2` は「接続」した状態となる。`duplex` が `True`（デフォルト）の場合、パイプは双方向性となる。`duplex` が `False` の場合、パイプは一方向性で、`conn1` はデータの受信専用、`conn2` はデータの送信専用になる。

コネクションオブジェクトは、以下のメソッドを持つ。

| メソッド | 機能 | 戻り値 |
|:---|:---|:---|
| `send(obj)` | 接続している相手にオブジェクトを送る。オブジェクトは `pickle` でシリアライズ可能でなければならない。極端に大きすぎるオブジェクトでは `ValueError`<br /> 例外が送出されることがある。受信専用で `send()` を呼び出すと `OSError` 例外が発生する | `None` |
| `recv()` | 接続している相手側から送られたオブジェクトを返す。何か受け取るまで待機する。何も受け取らずに接続が相手側で閉じられた場合 `EOFError` 例外が<br />発生する。送信専用で `recv()` を呼び出すと `OSError` 例外が発生する | Unknown |
| `send_bytes(buf,`<br />` offset=0, size=None)` | 接続している相手にバイトデータとして `buf` を送る。`offset` が指定されると `buf` のその位置からデータが読み込まれる。`size` が指定されると `buf` か<br />らその量のデータが読み込まれる。極端に大きすぎるバイトデータでは `ValueError` 例外が送出されることがある。受信専用で `send_bytes()` を呼び出す<br />と `OSError` 例外が発生する | `None` |
| `recv_bytes([maxlength])` | 接続している相手側から送られたデータをバイト列として返す。何も受け取らずに接続が相手側で閉じられた場合 `EOFError` 例外が発生する。送信専用<br />で `recv_bytes()` を呼び出すと `OSError` 例外が発生する。`maxlength` を指定していて、かつデータが `maxlength` より長い場合、`OSError` 例外が発生する | `bytes` |
| `close()` | 接続を閉じる | `None` |

OS による本来のパイプは一方向性であり、双方向性は Python 側の拡張である。2 つのプロセスが同時に同じパイプにデータを入れたり受け取ったりすると、データが破損する可能性がある。このような危険を回避したい場合は、`Pipe(duplex=False)` として通信方向を制限するとよい。

`recv()` の処理ではデシリアル化が行われる。それはデータを送ったプロセスが信頼できる場合を除いてセキュリティリスクになることに注意する。これが問題となるなら、バイト列限定となるが `send_bytes()` と `recv_bytes()` を使うとよい。

`recv()` や `recv_bytes()` でデータを受け取る順番は、データを送った順番と同じ、つまり先入れ先出し（FIFO）となる。

In [None]:
from multiprocessing import Process, Pipe

def worker(conn):
    conn.send([42, None, 'hello'])
    conn.send_bytes(b'thank you')
    conn.close()

def  main():
    parent_conn, child_conn = Pipe()
    p = Process(target=worker, args=(child_conn,))
    p.start()
    assert parent_conn.recv() == [42, None, 'hello']
    assert parent_conn.recv_bytes() == b'thank you'
    p.join()

if __name__ == "__main__":
    main()

また、 `multiprocessing` は、パイプや 2～3 個のロック/セマフォを使用して実装されたプロセス共有キュー `Queue` も提供している。この `Queue` クラスは `queue.Queue` と同様に使用できるが、`task_done()` と `join()` メソッドがないことに注意。`put()` での例外に `queue.Full`、`get()` での例外に `queue.Empty` が使用されるが、それらは `multiprocessing` の名前空間では利用できないため、これらの例外を捕捉する場合は `queue` からインポートする必要がある。

In [None]:
from multiprocessing import Process, Queue
import time

def writer(q):
    data = []
    data.append([42, None, 'hello'])
    data.append(b'thank you')
    for msg in data:
        q.put(msg)
        time.sleep(0.5)

def reader(q):
    while True:
        print(q.get())

def main():
    q = Queue()
    pw = Process(target=writer, args=(q,))
    pr = Process(target=reader, args=(q,))
    pw.start()
    pr.start()
    # pwが完了するのを待つ（q.join()はないことに注意）
    pw.join()
    # prを終了（readerが無限ループなので）
    pr.terminate()

if __name__ == "__main__":
    main()

[42, None, 'hello']
b'thank you'


### 共有オブジェクト ###

実は、Unix 系 OS と Windows では、メモリ上の特定の領域を複数のプロセスからアクセス可能とする共有メモリをサポートしている。共有メモリを利用すると、データのシリアル化/デシリアル化を必要とするパイプに比べて、パフォーマンスが大幅に向上する。ただし、以下の点に注意する。

  * 共有メモリを使用すると、競合状態がマルチプロセスでも発生することになる。しかも、マルチプロセスでは並列処理が行われるので、共有メモリに対する操作が不可分であること（**アトミック**ともいう）に注意がより必要である。ここにアトミックとは、「操作に途中の状態がなく、一部のみが失敗するということがない」ことである。たとえば、代入演算 `=` はアトミックであるが、`+=` のような演算は読み込みと書き込みを含むためアトミックではない。共有データに対するアトミックでない操作がロックを伴わないと、他のプロセスによる操作が割り込んで、意図しない結果になる可能性がある。
  * 共有メモリは OS の機能なので、C のデータ型やデータ構造が利用される。

`multiprocessing` は、共有メモリを使用するオブジェクト（**共有オブジェクト**と呼ぶ）をサポートする。次の 2 つの関数は、それぞれ共有メモリに割り当てられた数値と数値の配列を扱う共有オブジェクトを返す。

``` python
multiprocessing.Value(typecode_or_type, *args, lock=True)
multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)
```

`typecode_or_type` に数値の C データ型を指定する。`ctypes` モジュールで使用できる型か、`array` モジュールで使用されるような 1 文字の型コードを指定できる。

キーワード専用引数 `lock` が `True`（デフォルト）なら、値へ同期アクセスするために新たに `RLock` オブジェクトが作成される。

`Value()` 関数が返すオブジェクトは、 `value` 属性で値を参照できる。`Array()` 関数が返すオブジェクトは、 `[]` によるインデックス参照で値を参照でき、スライスを使うこともできる。どちらのオブジェクトも、`get_lock()` メソッドで内部ロックオブジェクトにアクセスできる。

`Value` と `Array` の使用例:

In [None]:
from multiprocessing import Process, Value, Array


def worker(n, a):
    # アトミックではない操作はロックを必要とする
    with n.get_lock():
        n.value += 1

        for i in range(len(a)):
            a[i] = -a[i]


def main():
    num = Value("i", 0)
    arr = Array("d", [0.1, 0.2, 0.3])

    p = Process(target=worker, args=(num, arr))
    p.start()
    p.join()

    print(f"{num.value = }")
    print(f"{arr[:] = }")


if __name__ == "__main__":
    main()

num.value = 1
arr[:] = [-0.1, -0.2, -0.3]


### マネージャー ###

**マネージャー**（manager）は、共有オブジェクトの高レベルな使い方をサポートする。マネージャーでは、共有オブジェクトを管理する子プロセスが使用される。このプロセスがサーバープロセスとなって、他のプロセスはサーバープロセスを通して共有オブジェクトにアクセスすることになる。これにより高レベルな使い方ができる反面、サーバープロセスを使うことによるオーバーヘッドが発生すること、IPC にパイプが使われるためデータのシリアル化/デシリアル化によるオーバーヘッドも発生することに注意する。

マネージャークラスは、`multiprocessing.managers.BaseManager` クラスの派生クラスとして定義される。`BaseManager` クラスで定義されている主なメソッドは次のとおり。

| メソッド | 機能 | 戻り値 |
|:---|:---|:---|
| `start(initializer=None, initargs=())` | サーバープロセスを開始する。`initializer` が `None` でなければ、サーバープロセスは開始時に `initializer(*initargs)` を呼び出す | `None` |
| `shutdown()` | サーバープロセスを停止する | `None` |
| `get_server()` | マネージャーの制御下にある実際のサーバーを表す `Server` オブジェクトを返す。`Server` オブジェクトは `serve_forever()` メソッドを<br />サポートする | `Server` |
| `connect()` | ローカルからリモートのマネージャーオブジェクトへ接続する | `None` |
| `register(typeid, callable=None,`<br />` proxytype=None, exposed=None,`<br />` method_to_typeid=None,`<br />` create_method=True)` | クラスメソッド。マネージャークラスで呼び出し可能オブジェクトや型を登録するために使用される | `None` |

クラスメソッド `register()` は、「プロキシを返すメソッド」をマネージャークラスに登録するために使用される。**プロキシ**は、共有オブジェクトを参照するオブジェクトであり、自身は組み込み型オブジェクトなどの Python オブジェクトと同様に操作される。プロキシを使うと Python らしい書き方で共有オブジェクトを操作することができる。

既に多くの「プロキシを返すメソッド」が登録済みであるマネージャークラス `multiprocessing.managers.SyncManager` が用意されている。これを使えば、プログラマーが `register()` を使ってマネージャークラスを定義する必要がない。ただし、直接インスタンス化するのではなく、次の関数の戻り値としてインスタンスを得ること。

``` python
multiprocessing.Manager()
```

「プロキシを返すメソッド」を使用する前に、`start()` メソッドでサーバープロセスを開始しておく必要がある。プロキシを使用する必要がなくなったときは、`shutdown()` でサーバープロセスを停止する。全てのマネージャーは、コンテキストマネージャーとして使用できる。`__enter__()` は `start()` メソッドを呼び出してからマネージャーオブジェクトを返す。また `__exit__()` は `shutdown()` を呼び出す。

次のコード片

``` python
with Manager() as manager:
    # do something...
```

これは、以下と同じ。

``` python
manager = Manager()
manager.start()
try:
    # do something...
finally:
    manager.shutdown()
```

`SyncManager` は、 `dict` や `list`、`multiprocessing.managers.Namespace` などに対応するプロキシを返すメソッドが登録されている。メソッドの名前はそれぞれの型の名前と同じになっている。 `multiprocessing.managers.Namespace` は、ドット演算子 `.` を使用する属性参照で共有オブジェクトを参照するために使用される。

`Manager()` の使用例:

In [None]:
from multiprocessing import Process, Manager


def f(dct, lst, ns):
    dct[1] = "1"
    dct["2"] = 2
    dct[0.25] = None
    lst.reverse()
    ns.x = 10
    ns.y = "hello"


def main():
    with Manager() as manager:
        dct = manager.dict()
        lst = manager.list(range(10))
        ns = manager.Namespace()

        p = Process(target=f, args=(dct, lst, ns))
        p.start()
        p.join()

        print(f"{dct=!s}")
        print(f"{lst=!s}")
        print(f"{ns.x=}, {ns.y=}")


if __name__ == "__main__":
    main()

dct={1: '1', '2': 2, 0.25: None}
lst=[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
ns.x=10, ns.y='hello'


`BaseManager` を継承して独自のマネージャーを作成し、それをネットワーク経由で他のコンピューター上のプロセスによって共有することもできる。以下のコード例では、ローカルマシンで実行可能とするため、アドレスを `'localhost'` としている。

server.py（共有キューのためにサーバーを作成）:

``` python
from multiprocessing.managers import BaseManager
from queue import Queue

queue = Queue()

class QueueManager(BaseManager):
  pass

# キューに対応するプロキシを返すメソッド get_queue を登録
QueueManager.register('get_queue', callable=lambda: queue)

# マネージャーを取得
manager = QueueManager(address=('localhost', 50000), authkey=b'abracadabra')
# サーバーを取得
server = manager.get_server()
# サーバーを起動する
server.serve_forever()
```

client1.py（データをキューに登録）:

``` python
from multiprocessing.managers import BaseManager

class QueueManager(BaseManager):
    pass

# メソッド get_queue を登録
QueueManager.register('get_queue')

# マネージャーを取得
manager = QueueManager(address=('localhost', 50000), authkey=b'abracadabra')
# サーバーへ接続
manager.connect()
queue = manager.get_queue()
queue.put('hello')
```

client2.py（データをキューから取り出す）:

``` python
from multiprocessing.managers import BaseManager

class QueueManager(BaseManager):
    pass

# メソッド get_queue を登録
QueueManager.register('get_queue')

# マネージャーを取得
manager = QueueManager(address=('localhost', 50000), authkey=b'abracadabra')
# サーバーへ接続
manager.connect()
queue = manager.get_queue()
print(queue.get())
```

ターミナルを 3 つ起動し、まず 1 つのターミナルで `python server.py` を実行してサーバーを立ち上げ、残りのターミナルでそれぞれ `python client1.py` と `python client2.py` を実行する。`client2.py` のプロセスはキューに値がない場合は待ち状態になり、`client1.py` のプロセスで `'hello'` が put されたら、それを出力する。

### 共有メモリ管理 ###

共有オブジェクトを介して共有メモリを利用できるのであるが、直接に共有メモリそのものを管理することもできる。共有メモリを直接管理することによって、マネージャーを利用しなくても柔軟に共有メモリを利用することができる。

`multiprocessing.shared_memory` モジュールは、共有メモリそのものを管理するための `SharedMemory` クラスを提供する。

``` python
multiprocessing.shared_memory.SharedMemory(name=None, create=False, size=0)
```

| 引数 | 意味 |
|:---|:---|
| `name` | 共有メモリの一意の名前を文字列で指定する。`None`（デフォルト）の場合、新しい名前が構成される |
| `create` | `True` の場合、新しい共有メモリの領域を確保して、それに結び付いたインスタンスを作成する。`False`（デフォルト）の場合、既存の共有メモリに結び付いたインスタンスを作成する |
| `size` | 新しい共有メモリの領域を確保するときに要求されるバイト数。実際に確保されるサイズはこれより大きくなることがある。既存の共有メモリを使用する場合は、`size` は無視される |

コンストラクタの `name` と `create` 引数を使うことで、あるプロセスが特定の名前で共有メモリを作成し、別のプロセスが同じ名前を使用して同じ共有メモリを参照することができる。

| 属性 | 意味 |
|:---|:---|
| `buf` | 共有メモリの内容 |
| `name` | 読み取り専用。共有メモリの名前（文字列） |
| `size` | 読み取り専用。実際に割り当てられた共有メモリのサイズ |

`buf` のデータ構造は C の配列であり、オブジェクトの参照ではなくオブジェクトの値そのものが格納される。インデックス参照が可能で、スライスも使える。たとえば、`buf[:4]` は先頭から 4 バイトのデータを参照する。

| メソッド | 機能 | 戻り値 |
|:---|:---|:---|
| `close()` | このインスタンスから共有メモリへのアクセスを閉じる。`close()` を呼び出しても共有メモリ領域自体は破棄されない | `None` |
| `unlink()` | OS に共有メモリの破棄を要求する。このメソッドは全てのインスタンスのうちいずれかが 1 回だけ呼び出すこと。`unlink()` と `close()` はどちらの順序でも呼び出す<br />ことができる | `None` |

共有メモリがどのプロセスでも必要なくなった場合は、適切なクリーンアップが実行されるために `unlink()` メソッドを呼び出す必要がある。OS は、共有メモリがどのプロセスからもアクセスされる可能性がないことを確認してから共有メモリを破棄する。このため、全てのインスタンスは共有メモリが不要になったら `close()` を呼び出す必要がある。

`SharedMemory` の使用例:

In [None]:
import array
from multiprocessing import Process, current_process
from multiprocessing.shared_memory import SharedMemory


def worker(name: str):
    # 共有メモリを構成（既存の共有メモリを使用）
    shm = SharedMemory(name=name)
    print("{}: {}, {}, {}, {}".format(current_process().name, *shm.buf[:4]))
    shm.buf[:6] = b"Python"

    #  共有メモリが不要になったら閉じる
    shm.close()


def main():
    # 共有メモリを構成
    shm = SharedMemory(create=True, size=10)
    print(f"Size of SharedMemor: {shm.size}")
    shm.buf[:3] = array.array("B", [11, 22, 33])
    shm.buf[3] = 44
    assert (shm.buf[0], shm.buf[1], shm.buf[2], shm.buf[3]) == (11, 22, 33, 44)

    p = Process(target=worker, name="ChildProcess", args=(shm.name,))
    p.start()
    p.join()
    assert (shm.buf[0], shm.buf[1], shm.buf[2], shm.buf[3]) == (80, 121, 116, 104)
    print("{}: {}".format(current_process().name, bytes(shm.buf[:6])))

    #  共有メモリが不要になったら閉じる
    shm.close()

    # 共有メモリを破棄
    shm.unlink()


if __name__ == "__main__":
    main()

Size of SharedMemor: 10
ChildProcess: 11, 22, 33, 44
MainProcess: b'Python'


バイト単位で共有メモリを構成する場合、データサイズが 2 バイト以上のデータ型（例: `float`）を扱うことが難しい。そこで、`multiprocessing.shared_memory` モジュールは、Python の組み込みデータ型からなる固定長リストの形で共有メモリを構成するための `ShareableList` クラスを提供する。`ShareableList` は `SharedMemory` のラッパークラスになっている。

``` python
multiprocessing.shared_memory.ShareableList(sequence=None, *, name=None)
```

`sequence` で与えた順番で値が格納された共有メモリ（`name` で一意の名前を付けられる）に結び付けられたオブジェクトを作成する。格納可能な値は次の組み込みデータ型に制限される。

  * `int` （ただし符号付き 64 ビット整数）
  * `float`
  * `bool`
  * `str` （ただし UTF-8 エンコードしたとき 10 MB 未満のもの）
  * `bytes` （ただし 10 MB 未満のもの）
  * `None`

既存の `ShareableList` に結び付ける場合は、`sequence` を `None` に設定したまま、`name` で共有メモリの一意の名前を指定する。

| 属性 | 意味 |
|:---|:---|
| `shm` | 内部で作成された `SharedMemory` インスタンス。`close()` や `unlink()` を呼び出すために使われる |

`ShareableList` オブジェクトは変更可能なリストのように使える（イテラブルであり、`[]` を使ったインデックス参照も可能）。しかし、全体の長さを変更することはできない（つまり、`append()`、`insert()` などを使用できない）。また、スライスによる新しい `ShareableList` インスタンスの動的な作成をサポートしていない。

`str` や `bytes` の値を参照する場合、末尾の連続した `\x00`（ヌル文字またはヌルバイト）は削除される。`str` や `bytes` の値を変更する場合、もとの値より短い値に変更するのであれば末尾が `\x00` で埋められるが、もとの値より長い値に変更しようとすれば `ValueError` が発生する。あらかじめ必要な文字数（バイト数）を確保するために末尾に連続した `\x00` を入れるとよい。ただし、現在、`ShareableList` のバグにより、`sequence` の末尾以外の位置に `\x00` で埋めた `str` や `bytes` の値を指定した場合に、その値をインデックス参照すると、`\x00` と一緒に後ろに格納された値も削除されてしまう。このため、`\x00` で埋めた `str` や `bytes` の値は `sequence` の末尾に置くとよい。

| メソッド | 機能 | 戻り値 |
|:---|:---|:---|
| `count(value)` | `value` の出現回数を返す | `int` |
| `index(value)` | `value` の最初のインデックス位置を返す。値が存在しない場合は `ValueError` を発生させる | `int` |

`ShareableList` の使用例:

In [None]:
from multiprocessing import Process, current_process
from multiprocessing.shared_memory import ShareableList


def worker(name: str):
    # 共有メモリを構成（既存の共有メモリを使用）
    a = ShareableList(name=name)
    print("{}: {}, {}, {}, {}, {}".format(current_process().name, a[0], a[1], a[2], a[3], a[4]))

    # データを変更
    a[0] = 0
    a[1] = 3.141592
    a[2] = False
    a[3] = b"aaa"
    a[4] = "Hello World"

    #  共有メモリが不要になったら閉じる
    a.shm.close()


def main():
    # 共有メモリを構成
    a = ShareableList([100, -273.154, True, b"aaabbb", "Hello\x00\x00\x00\x00\x00\x00"])
    assert a.count(100) == 1
    assert a.index(True) == 2

    p = Process(target=worker, name="ChildProcess", args=(a.shm.name,))
    p.start()
    p.join()
    print("{}: {}, {}, {}, {}, {}".format(current_process().name, a[0], a[1], a[2], a[3], a[4]))

    #  共有メモリが不要になったら閉じる
    a.shm.close()

    # 共有メモリを破棄
    a.shm.unlink()


if __name__ == "__main__":
    main()

ChildProcess: 100, -273.154, True, b'aaabbb', Hello
MainProcess: 0, 3.141592, False, b'aaa', Hello World


`unlink()` 忘れを回避するために、マネージャーを使うことができる。`multiprocessing.managers.SharedMemoryManager` はマネージャーで、`start()` メソッドでサーバープロセスを開始してから以下のメソッドで `SharedMemory` や `ShareableList` のインスタンスを作成することができ、また、`shutdown()` メソッドで `unlink()` を呼び出す。

| メソッド | 機能 | 戻り値 |
|:---|:---|:---|
| `SharedMemory(size)` | `size` バイトの大きさを要求する新しい `SharedMemory` インスタンスを作成して返す | `SharedMemory` |
| `ShareableList(sequence)` | `sequence` の値で初期化された新しい `ShareableList` インスタンスを作成して返す | `ShareableList` |

`SharedMemoryManager` をコンテキストマネージャーとして with 文で使用する場合、`start()` と `shutdown()` が暗黙的に実行される。

次のコードは、`ShareableList` の使用例を `SharedMemoryManager` を利用する形で書き換える例である。

In [None]:
from multiprocessing import Process, current_process
from multiprocessing.managers import SharedMemoryManager


def worker(sl):
    print("{}: {}, {}, {}, {}, {}".format(current_process().name, sl[0], sl[1], sl[2], sl[3], sl[4]))

    # データを変更
    sl[0] = 0
    sl[1] = 3.141592
    sl[2] = False
    sl[3] = b"aaa"
    sl[4] = "Hello World"


def main():
    # 共有メモリ管理プロセスを開始
    with SharedMemoryManager() as smm:
        sl = smm.ShareableList([100, -273.154, True, b"aaabbb", "Hello\x00\x00\x00\x00\x00\x00"])
        assert sl.count(100) == 1
        assert sl.index(True) == 2

        p = Process(target=worker, name="ChildProcess", args=(sl,))
        p.start()
        p.join()
        print("{}: {}, {}, {}, {}, {}".format(current_process().name, sl[0], sl[1], sl[2], sl[3], sl[4]))


if __name__ == "__main__":
    main()

ChildProcess: 100, -273.154, True, b'aaabbb', Hello
MainProcess: 0, 3.141592, False, b'aaa', Hello World


### プロセスプール ###

子プロセスの作成は重い処理となるため、少ない子プロセスを使い回す仕組み、いわれる**プロセスプール**を利用すべきである。`multiprocessing` パッケージはプロセスプールを作成するためのクラス `Pool` を提供する。

``` python
multiprocessing.Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None)
```

| 引数 | 意味 |
|:---|:---|
| `processes` | プールされる子プロセスの数。省略した場合、`os.cpu_count()` によって返される数が使用される |
| `initializer`,<br /> `initargs` | `initializer` が `None` ではない場合、各子プロセスは開始時に `initializer(*initargs)` を呼び出す |
| `maxtasksperchild` | タスクが完了したら終了してもよい子プロセスの数。省略した場合、プロセスがプールと同じ期間だけ生き続ける |

`Pool` インスタンスのメソッドは次のとおり。

| メソッド | 機能 | 戻り値 |
|:---|:---|:---|
| `apply(func, args=(), kwds={})` | プール内の 1 つの子プロセスを使って、引数 `args` とキーワード引数 `kwds` を伴って `func` を呼ぶ。このメソッドは、<br />終了するまで後続の処理をブロックする | `func` の戻り値 |
| `apply_async(func, args=(), kwds={},`<br />` callback=None, error_callback=None)` | `apply()` の非同期版で、`multiprocessing.pool.ApplyResult` クラスのインスタンス（結果オブジェクト）を返す。`callback`<br /> と `error_callback` には 1 個の引数を受け取る呼び出し可能オブジェクトを指定できる。結果を返せるようになったと<br />きに `callback` が結果オブジェクトに対して適用される。ただし呼び出しが失敗した場合は、例外インスタンスを伴って<br /> `error_callback` が適用される | `ApplyResult` |
| `map(func, iterable, chunksize=None)` | 組み込み関数 `map()` の並列版。ただし、`iterable` 引数は 1 つだけサポートされる（`func` の引数は 1 つだけサポート<br />されるということである）。`iterable` から `chunksize `の長さ分だけタスクが切り出されて各プロセスに割り振られる。<br />デフォルトでは、プールされるプロセス数の 4 倍で切り出される | `func` の戻り値<br />のリスト |
| `map_async(func, iterable, chunksize=None,`<br />` callback=None, error_callback=None`) | `map()` メソッドの非同期版。`multiprocessing.pool.MapResult` クラスのインスタンス（結果オブジェクト）を返す。<br />`callback` と `error_callback` については `apply_async()` と同様 | `MapResult` |
| `imap(func, iterable, chunksize=1)` | `map()` の遅延評価版。`func` の戻り値を yield するイテレーターを返す。`chunksize` のデフォルト値は 1 とされる | イテレーター |
| `imap_unordered(func, iterable,`<br />` chunksize=1)` | イテレーターが返す結果の順番が任意の順番でよいと見なされることを除けば `imap()` と同じ | イテレーター |
| `starmap(self, func, iterable,`<br />` chunksize=None)` | `iterable` の要素が引数としてアンパックされるイテレート可能オブジェクトであると期待される以外は `map()` メソッ<br />ドと同様。そのため、`iterable` が `[(1,2), (3, 4)]` なら、結果は `[func(1,2), func(3,4)]` になる | `func` の戻り値<br />のリスト |
| `starmap_async(self, func, iterable,`<br />` chunksize=None, callback=None,`<br />` error_callback=None)` | `starmap()` メソッドの非同期版。引数と戻り値は `map_async()` と同様 | `MapResult` |
| `close()` | これ以上プールでタスクが実行されないようにする。すべてのタスクが完了した後で子プロセスが終了する | `None` |
| `terminate()` | 実行中の処理を完了させずに子プロセスをすぐに停止する | `None` |
| `join()` | 子プロセスが終了するのを待つ。`join()` を使用する前に `close()` か `terminate()` を呼び出す必要がある | `None` |

`Pool` オブジェクトはコンテキストマネージャーとして使用できる。`__enter__()` は `Pool` オブジェクトを返す。また `__exit__()` は `terminate()` を呼び出す。

次のコードは、`apply()` メソッドの使用例である:

In [None]:
from multiprocessing import Pool, current_process
from random import random
import time

def worker(x):
    print("{}(x={}) working...".format(current_process().name, x))
    time.sleep(random())
    return x * x

def main():
    # 同時に最大2個の子プロセス
    with Pool(processes=2) as pool:
        results = []
        for i in range(10):
            res = pool.apply(func=worker, args=(i,))
            results.append(res)
        print(results)
    pool.join()

if __name__ == "__main__":
    main()

ForkPoolWorker-20(x=0) working...
ForkPoolWorker-21(x=1) working...
ForkPoolWorker-20(x=2) working...
ForkPoolWorker-21(x=3) working...
ForkPoolWorker-20(x=4) working...
ForkPoolWorker-21(x=5) working...
ForkPoolWorker-20(x=6) working...
ForkPoolWorker-21(x=7) working...
ForkPoolWorker-20(x=8) working...
ForkPoolWorker-21(x=9) working...
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]


`apply()` で指定した `worker()` 関数は `random()` 関数で処理時間が異なるようにしている。それにもかかわらず、出力結果から、2 つの子プロセスが交互に結果を返していること、つまり `apply()` が同期処理を行うことがわかる。マルチプロセスの並列処理には `apply_async()` のほうが適している。`apply()` は、処理の順序付けをしたい場合に使う。

`apply_async()` で返される `ApplyResult` オブジェクトは、以下のメソッドを持つ。

| メソッド | 機能 | 戻り値 |
|:---|:---|:---|
| `get([timeout])` | 結果を返すが、結果を受け取るまで待ち、結果を受け取ったときに返す。引数に `timeout` を指定すると、結果が `timeout` 秒以内に受け取れない場合に<br /> `multiprocessing.TimeoutError` が発生する | 呼び出しの結果 |
| `wait([timeout])` | その結果が有効になるか timeout 秒経つまで待つ | `None` |
| `ready()` | その呼び出しが完了しているかどうかを返す | `bool` |
| `successful()` | その呼び出しが例外を発生させることなく完了したかどうかを返す。その結果が返せる状態でない場合 `ValueError` が発生する | `bool` |

次のコードは、`apply_async()` メソッドの使用例である:

In [None]:
from multiprocessing import Pool, current_process
from random import random
import time

def worker(x):
    print("{}(x={}) working...".format(current_process().name, x))
    time.sleep(random())
    return x * x

def main():
    # 同時に最大2個の子プロセス
    with Pool(processes=2) as pool:
        results = []
        for i in range(10):
            res = pool.apply_async(func=worker, args=(i,))
            results.append(res)
        # 結果の取得
        reply = []
        for res in results:
            try:
                reply.append(res.get(timeout=2.0))
            except TimeoutError:
                print("タイムアウトしました")
    pool.join()
    print(reply)

if __name__ == "__main__":
    main()

ForkPoolWorker-22(x=0) working...ForkPoolWorker-23(x=1) working...

ForkPoolWorker-22(x=2) working...
ForkPoolWorker-23(x=3) working...
ForkPoolWorker-23(x=4) working...
ForkPoolWorker-22(x=5) working...
ForkPoolWorker-22(x=6) working...
ForkPoolWorker-22(x=7) working...
ForkPoolWorker-23(x=8) working...
ForkPoolWorker-22(x=9) working...
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]


`apply_async()` で指定したタスクは、先にタスクを完了した子プロセスに次々に割り振られていく様子がわかる。また、`ApplyResult` オブジェクトの `get()` メソッドが結果を受け取るまで待つので、順番通りに結果を取得できることもわかる。

`apply()` メソッドの使用例と `apply_async()` メソッドの使用例では、子プロセスで動かす関数が 1 引数なので、for 文で繰り返している部分を、それぞれ `map()` メソッドと `map_async()` メソッドで書き換えることができる。`map_async()` で返される `MapResult` オブジェクトは、 `ApplyResult` のサブクラスで、 `get()` メソッドが結果のリストを返すようにオーバーライドされる。

次のコードは、`map_async()` メソッドの使用例である:

In [None]:
from multiprocessing import Pool, current_process
from random import random
import time

def worker(x):
    print("{}(x={}) working...".format(current_process().name, x))
    time.sleep(random())
    return x * x

def main():
    # 同時に最大2個の子プロセス
    with Pool(processes=2) as pool:
        results = pool.map_async(func=worker, iterable=range(10))
        # 結果の取得
        try:
            reply = results.get(timeout=5.0)
        except TimeoutError:
            print("タイムアウトしました")
    pool.join()
    print(reply)

if __name__ == "__main__":
    main()

ForkPoolWorker-24(x=0) working...
ForkPoolWorker-25(x=2) working...
ForkPoolWorker-24(x=1) working...
ForkPoolWorker-25(x=3) working...
ForkPoolWorker-24(x=4) working...
ForkPoolWorker-24(x=5) working...
ForkPoolWorker-25(x=6) working...
ForkPoolWorker-25(x=7) working...
ForkPoolWorker-24(x=8) working...
ForkPoolWorker-24(x=9) working...
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]


### multiprocessing.dummy ###

`multiprocessing.dummy` サブパッケージを利用すると、 `multiprocessing` の API を利用してマルチスレッドを実現できる。

実は、`multiprocessing.Pool` のサブクラスとして、スレッドプールを使用する `multiprocessing.pool.ThreadPool` クラスが定義されており、`multiprocessing.dummy.Pool()` は、この `ThreadPool` のインスタンスを返す。つまり、以下の 2 つのコードは等価である。

``` python
from multiprocessing.dummy import Pool
pool = Pool(processes, initializer, initargs)
```

``` python
from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes, initializer, initargs)
```

`queue.Queue` を利用して作成したスレッドプールと同じような動作を `Pool()` で実現できる。

In [None]:
from multiprocessing.dummy import Pool
import threading
import time


def worker(item):
    print(f"{threading.current_thread().name}: Working on {item}")
    time.sleep(item * 0.1)
    print(f"{threading.current_thread().name}: Finished {item}")


def main():
    # スレッドプールを設定（2 つのスレッドからなる）
    with Pool(processes=2) as pool:
        results = pool.map_async(func=worker, iterable=range(1, 6))
        # スレッドの開始
        results.get()
    # 終了を待機
    pool.join()
    print("全てのタスクが完了した")


if __name__ == "__main__":
    main()

Thread-29 (worker): Working on 1
Thread-30 (worker): Working on 2
Thread-29 (worker): Finished 1
Thread-29 (worker): Working on 3
Thread-30 (worker): Finished 2
Thread-30 (worker): Working on 4
Thread-29 (worker): Finished 3
Thread-29 (worker): Working on 5
Thread-30 (worker): Finished 4
Thread-29 (worker): Finished 5
全てのタスクが完了した


フューチャーパターン
--------------------

`Pool` の `apply_async()` や `map_async()` で作成される結果オブジェクト（`ApplyResult` インスタンスや `MapResult` インスタンス）は、作成時にはタスクの結果を保持していなくても未来にはその結果を受け取ることができるという不思議なオブジェクトである。このようなオブジェクトは、しばしば future と呼ばれ、以下のようなデザインパターンの下でそのクラスが設計される。

**フューチャーパターン**（future pattern）は、プロキシパターンの一種で、並列処理のためのデザインパターンである。値を取得できる既存クラスに対して、そのラッパーであるクラス（Future クラスと呼ぶ）を使って、次のような機能を追加する。

  1. 既存クラスのインスタンスから値を取得できる場合は、その値を保持する。
  2. 既存クラスのインスタンスから値を取得できない場合は、値を取得できる状態になるまで待つ。

このように、Future クラスは待機状態を持つ。待機状態により値の取得を後回しにすることができ、これにより並列処理でのやり取りをスムーズに行うことができる。

たとえば、ある計算を行うクラス X があって、その計算を要求する処理（receiver）と実際の計算を行う処理（sender）の間で以下のようなやり取りをする。

  * （receiver） X クラスをラップする Future クラスをインスタンス化し、それを何らかの手段を用いて sender に渡す。
  * （sender） X オブジェクトの計算を行い、Future オブジェクトの待機状態を解除する。
  * （receiver） Future オブジェクトを確認し、待機状態が解除されていれば Future オブジェクトから計算結果を取得できるが、待機状態である間はブロックされる。

X で行う計算が時間のかかるものである場合、Future オブジェクトを使うことで並列処理が効率的に行われる。

Future の機能はよく引換券に例えられる（future は現物に対する先物という意味もある）。食券を発行する食堂で考えると、料理を要求する客と、料理を提供する食堂スタッフがいて、食堂ではすぐには料理が渡されないので、客は券売機で食券を購入して待つ。配膳口に食券の番号が表示されたら、客は配膳口で料理を受け取ることができる。食券システムによって客と食堂スタッフのやり取りがスムーズになり、客は料理ができるまでのスキマ時間を有効に活用することもできる。

多くのプログラミング言語では、フューチャーパターンの実装が言語の機能あるいはライブラリとして取り込まれている。

concurrent.futures
------------------

標準ライブラリの `concurrent.futures` モジュールは、`multiprocessing.Pool` や `multiprocessing.dummy.Pool` の機能限定版となるインターフェースを提供する。スレッド間通信やプロセス間通信を使って細かい制御を行う必要がない、あるいは、そのような制御を使わないという制限を課す場合には、`concurrent.futures` モジュールが適している。

### Future ###

`concurrent.futures` モジュールが提供するインターフェースを使用すると、`concurrent.futures.Future` オブジェクトが作成される。これは、フューチャーパターンの Python 実装であり、タスクの未来の値を表す。`multiprocessing.pool.ApplyResult` より高機能である。モジュールの利用者がこのオブジェクトを直接作成する必要はなく、また操作するメソッドは次のメソッドに限られる。

| メソッド | 機能 | 戻り値 |
|:---|:---|:---|
| `cancel()` | 呼び出しのキャンセルを試みる。呼び出しが現在実行中または実行が終了していてキャンセルできない場合、このメソッドは `False` を返し、そうでな<br />い場合、呼び出しはキャンセルされ、このメソッドは `True` を返す | `bool` |
| `cancelled()` | 呼び出しが正常にキャンセルされた場合 `True` を返す | `bool` |
| `running()` | 現在呼び出しが実行中でキャンセルできない場合 `True` を返す | `bool` |
| `done()` | 呼び出しが正常にキャンセルされたか終了した場合 `True` を返す | `bool` |
| `result(timeout=None)` | 呼び出しによって返された値を返す。もし呼び出しがまだ完了していなければ、このメソッドは待機する。`timeout` に浮動小数点数を指定した場合、<br />タイムアウトすると `TimeoutError` が送出される。`Future` オブジェクトが完了する前にキャンセルされた場合、`CancelledError` が送出される。呼び出<br />しが例外を送出した場合、このメソッドは同じ例外を送出する | Unknown |
| `exception(timeout=None)` | 呼び出しによって送出された例外を返す。もし呼び出しがまだ完了されていなければ、このメソッドは待機する。`timeout` に浮動小数点数を指定し<br />た場合および `Future` オブジェクトが完了する前にキャンセルされた場合は `result()` と同様。呼び出しが例外を送出することなく完了した場合、<br />`None` を返す | `Exception`<br /> &#124; `None` |
| `add_done_callback(fn)` | 完了時コールバックとして呼び出し可能なオブジェクト `fn` を追加する。`Future` オブジェクトがキャンセルされたか、完了した際に、`Future` オブジェ<br />クトをそのただ 1 つの引数として `fn` が呼び出される。追加された完了時コールバックは、追加された順番で、追加を行ったプロセスに属するスレッ<br />ド中で呼び出される。もし `Future` オブジェクトが既に完了しているか、キャンセル済みであれば、`fn` は即座に実行される | `None` |

### Executor ###

`concurrent.futures.Executor` は、スレッドまたはプロセスのプールを使用して非同期に呼び出しを行うインターフェースを提供するクラスである。以下のメソッドを持つ。

| メソッド | 機能 | 戻り値 |
|:---|:---|:---|
| `submit(fn, /, *args, **kwargs)` | 呼び出し可能オブジェクト `fn` を `fn(*args, **kwargs)` として実行するようにスケジュールし、`concurrent.futures.Future` オブジェクトを返す | `Future` |
| `map(fn, *iterables, timeout=None,`<br />` chunksize=1)` | 組み込み関数 `map()` の非同期版。`fn` の実行結果（`Future` オブジェクトの `result()` メソッドの戻り値）を返すジェネレーターを返す。返す順<br />番は `iterables` で渡すタスクの順番となる（実行が完了した順番ではない）。`fn` 呼び出しで例外が発生した場合、その値がジェネレーターか<br />ら取得されるときにその例外が発生する。`chunksize` はプロセスプールでのみ有効で、`Pool.map()` と同じ | ｼﾞｪﾈﾚｰﾀｰ |
| `shutdown(wait=True,`<br />` *, cancel_futures=False)` | シャットダウンする。以後 `submit()` と `map()` を呼び出すと `RuntimeError` が発生する。`cancel_futures` が `False`（デフォルト）の場合、実行を<br />開始せず保留中の `Future` オブジェクトがキャンセルされず、それらの全てが完了してリソースが解放される。`wait` が `True`（デフォルト）の場<br />合、リソース解放までメソッドは返らない | `None` |

`Executor` は、コンテキストマネージャーとして使用できる。`shutdown()` は with ブロックを終了するときに呼び出される。

`Executor` クラスを直接使ってはならず、サブクラスである `ThreadPoolExecutor`（マルチスレッド）か `ProcessPoolExecutor` （マルチプロセス）を介して使うこと。

``` python
concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())
```

| 引数 | 意味 |
|:---|:---|
| `max_workers` | プールされるスレッドの数を指定する。`None` か指定を省略する場合のデフォルト値 `min(32, os.cpu_count() + 4)` |
| `thread_name_prefix` | スレッド名の接頭辞を指定する |
| `initializer` | スレッド開始時に実行する関数を指定する |
| `initargs` | `initializer` の引数をタプルで指定する |

``` python
concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None)
```

| 引数 | 意味 |
|:---|:---|
| `max_workers` | プールされるプロセスの数を指定する。`None` か指定を省略する場合のデフォルト値はマシン上の CPU コア数になる。Windows では 61 以下に制約される |
| `mp_context` | プロセス生成時の開始方式を指定する |
| `initializer` | プロセス開始時に実行する関数を指定する |
| `initargs` | `initializer` の引数をタプルで指定する |
| `max_tasks_per_child` | Python 3.11 で追加。1 つのプロセスが実行できるタスクの最大数を指定する。この数を超えるとプロセスは終了する。`None`（デフォルト）の場合、プロセスは<br />プールと同じ期間存続する |

`ProcessPoolExecutor` については、次の 3 点に注意する。

  * `ProcessPoolExecutor` は、Python インタプリターの対話モードでは動作しない。
  * `ProcessPoolExecutor` は、`multiprocessing.Queue` を利用しているため、pickle 化できるオブジェクトを利用する必要がある。pickle 化できないファイルオブジェクトやラムダ式などは、`ProcessPoolExecutor` で実行する呼び出し可能オブジェクト、その引数と戻り値として利用できない。
  * `ProcessPoolExecutor` に渡された呼び出し可能オブジェクトから `Executor` や `Future` のメソッドを呼ぶとデッドロックに陥る。

次のコードは、あえて効率の悪い素数判定アルゴリズムを用いた `is_prime()` 関数を、比較のため、シングルプロセスとマルチプロセスで動かす。マルチプロセスでは、さらに `submit()` メソッドと `map()` メソッドで動かしている。

In [None]:
from concurrent.futures import ProcessPoolExecutor
import time

PRIMES = [
    10897409,
    20292113,
    31518271,
    40003027,
]

def is_prime(n: int) -> bool:
    """効率の悪い素数判定"""
    if n < 2 or not isinstance(n, int):
        raise ValueError("nは2以上の整数を指定してください")
    i = 2
    while i < n:
        if n % i == 0:
            return False
        i += 1
    return True

def main():
    # シングルプロセスで実行
    print(f"single process started at {time.strftime('%X')}")
    for n in PRIMES:
        if is_prime(n):
            print(f"{n} is prime")
    print(f"single process finished at {time.strftime('%X')}")

    # マルチプロセス（submit メソッド）で実行
    print(f"multi process (submit) started at {time.strftime('%X')}")
    with ProcessPoolExecutor() as executor:
        n_futures = []
        for n in PRIMES:
            n_futures.append((n, executor.submit(is_prime, n)))
        for n, future in n_futures:
            if future.result():
                print(f"{n} is prime")
    print(f"multi process (submit) finished at {time.strftime('%X')}")

    # マルチプロセス（map メソッド）で実行
    print(f"multi process (map) started at {time.strftime('%X')}")
    with ProcessPoolExecutor() as executor:
        for n, result in zip(PRIMES, executor.map(is_prime, PRIMES)):
            if result:
                print(f"{n} is prime")
    print(f"multi process (map) finished at {time.strftime('%X')}")

if __name__ == "__main__":
    main()

single process started at 02:40:22
10897409 is prime
20292113 is prime
31518271 is prime
40003027 is prime
single process finished at 02:40:38
multi process (submit) started at 02:40:38
10897409 is prime
20292113 is prime
31518271 is prime
40003027 is prime
multi process (submit) finished at 02:40:51
multi process (map) started at 02:40:51
10897409 is prime
20292113 is prime
31518271 is prime
40003027 is prime
multi process (map) finished at 02:41:05


`submit()` メソッドは、1 個のタスクしか実行できないので、複数のタスクを同時に実行するには for 文を使うなどの工夫が必要である。

`map()` メソッドは、イテラブルが返すタスクを実行できる。`multiprocessing.Pool` の `map_async()` に似ているが、複数の iterable 引数をサポートする（実行する `fn` 関数が複数の引数をとることができる）。`map()` メソッドはジェネレーターを返すが、そのジェネレーターは `Future` オブジェクトを返すのではなく、`Future` オブジェクトの `result()` メソッドの戻り値、つまり呼び出しの結果を返すことに注意する。上記のコードでは、結果表示のために、与えたタスクとその結果の組が欲しかったので、組み込み関数 `zip()` を使っている。`zip()` 関数が問題なく使えるのは、`map()` が返すジェネレーターでは完了した順番ではなく渡したタスクの順番で結果を受け取るからである。

Colab の制約により、上の実行結果にはマルチプロセスの効果が表れなかった。手元のマシンでは、上記のコードを実行した結果にマルチプロセスの効果が表れたことを確認している。

上記のコードは、`ProcessPoolExecutor` を `ThreadPoolExecutor` に単純に置換するだけで、マルチスレッドで動作する。ただし、`is_prime()` 関数は CPU バウンドな処理を行うので、GIL によってマルチスレッドの効果は表れない。

さて、上記のコードの `is_prime()` 関数をタプル `tuple[int, bool]` を返すように変更する。この場合、`submit()` メソッドでは、完了時コールバックを追加し、完了時コールバックの中で結果を取得して上記コードと同様の結果表示を行える。また、`map()` メソッドでは、`zip()` を使う必要はない。

In [None]:
from concurrent.futures import Future, ProcessPoolExecutor
import time

PRIMES = [
    10897409,
    20292113,
    31518271,
    40003027,
]

def is_prime(n: int) -> tuple[int, bool]:
    """効率の悪い素数判定"""
    if n < 2 or not isinstance(n, int):
        raise ValueError("nは2以上の整数を指定してください")
    i = 2
    while i < n:
        if n % i == 0:
            return n, False
        i += 1
    return n, True

def my_callback_function(future: Future):
    n, result = future.result()
    if result:
        print(f"{n} is prime")

def main():
    # マルチプロセス（submit メソッド）で実行
    print(f"multi process (submit) started at {time.strftime('%X')}")
    with ProcessPoolExecutor() as executor:
        for n in PRIMES:
            future = executor.submit(is_prime, n)
            future.add_done_callback(my_callback_function)
    print(f"multi process (submit) finished at {time.strftime('%X')}")

    # マルチプロセス（map メソッド）で実行
    print(f"multi process (map) started at {time.strftime('%X')}")
    with ProcessPoolExecutor() as executor:
        for n, result in executor.map(is_prime, PRIMES):
            if result:
                print(f"{n} is prime")
    print(f"multi process (map) finished at {time.strftime('%X')}")

if __name__ == "__main__":
    main()

multi process (submit) started at 14:07:12
10897409 is prime
20292113 is prime
31518271 is prime
40003027 is prime
multi process (submit) finished at 14:07:25
multi process (map) started at 14:07:25
10897409 is prime
20292113 is prime
31518271 is prime
40003027 is prime
multi process (map) finished at 14:07:38


### as_completed ###

複数のタスクを同時に実行し、完了した `Future` オブジェクトから先に効率よく結果を受け取るということは、単純な `submit()` メソッドの繰り返しや `map()` メソッドを使う限りは実現できない。この場合、次のモジュール関数を使うとよい。

``` python
concurrent.futures.as_completed(fs, timeout=None)
```

この関数は、`Future` オブジェクトのイテレーター `fs` を受け取り、新たに完了順に `Future` オブジェクトを返すジェネレーターを返す。このジェネレーターから受け取る `Future` オブジェクトでは `result()` メソッドの呼び出しから直ちに結果を得る（または例外が発生する）。なお、`fs` は、異なる `Executor` インスタンスによって作成された `Future` オブジェクトを返すものであってもよい。

`timeout` 引数を指定した場合、`timeout` 秒経過してもジェネレーターが `Future` オブジェクトを返さないとき、`concurrent.futures.TimeoutError` 例外を発生させる。

以下は、HTTP リクエストを伴う大量のタスクのバッチ処理を想定したコード例である。処理が I/O バウンドなので、`ThreadPoolExecutor` で並列処理を行っている。例外処理も行うようにしている。ここでは、 [httpstat.us](https://httpstat.us/) にアクセスし HTTP Status Code のレスポンスを得ているだけである。 httpstat.us は、URL に欲しいステータスコードの番号を付けてアクセスするだけで、そのステータスコードのレスポンスを返してくれるサービスを提供している。`https://httpstat.us/200` なら `200 OK` のステータスコードである。`sleep=<millisecond>` の形でクエリ文字列を追加すると、`<millisecond>` ミリ秒経過までレスポンスを遅らせることができる。

In [None]:
from concurrent.futures import Future, ThreadPoolExecutor, TimeoutError, as_completed
import urllib.request

def my_task(millisecond: int) -> tuple[str, int]:
    if millisecond < 0:
        raise ValueError(f"millisecond is {millisecond}, must be greater than or equal to 0")
    with urllib.request.urlopen(f"https://httpstat.us/200?sleep={millisecond}") as response:
        body = response.read().decode("utf-8")
        return body, millisecond

def my_add_done_callback(future: Future):
    try:
        result = future.result()
    except ValueError as err:
        print(f"{type(err).__name__}: {err}")
    else:
        print(f"Task completed: {result}")

def main():
    # タスクのリスト（遅延する時間（ミリ秒）を指定する）
    tasks = [3000, 1000, 2000, 4000, 6000, -1000]
    with ThreadPoolExecutor() as executor:
        futures = [executor.submit(my_task, millisecond=task) for task in tasks]
        try:
            # タイムアウトを 6 秒に設定するので、おそらく 6000 のタスクはタイムアウトする
            for future in as_completed(futures, timeout=6.0):
                future.add_done_callback(my_add_done_callback)
        except TimeoutError as err:
            print(f"{type(err).__name__}: {err}")

if __name__ == "__main__":
    main()

ValueError: millisecond is -1000, must be greater than or equal to 0
Task completed: ('200 OK', 1000)
Task completed: ('200 OK', 2000)
Task completed: ('200 OK', 3000)
Task completed: ('200 OK', 4000)
TimeoutError: 1 (of 6) futures unfinished


呼び出しで発生した例外は、`Future` オブジェクトの `result()` メソッドで結果を取得する際に処理する。`as_completed()` 関数で発生した `concurrent.futures.TimeoutError` 例外は、関数の呼び出しを try-except 文で書いて処理する。

### 排他制御 ###

マルチスレッドでは、競合状態が発生する場合、ロックを使用する必要がある。

In [None]:
from concurrent.futures import ThreadPoolExecutor
from threading import Lock, current_thread
import time

def counter(data: dict, lock: Lock):
    with lock:  # ロックの獲得と解放
        n = data['n']
        time.sleep(0.2)
        data['n'] = n + 1
        print(f"{current_thread().name}: {data['n']=}")

def main():
    data = {"n": 0}
    lock = Lock()  # ロックを作成
    with ThreadPoolExecutor() as executor:
        for _ in range(3):
            executor.submit(counter, data, lock)
    print(f"{current_thread().name}: {data['n']=}")

if __name__ == "__main__":
    main()

ThreadPoolExecutor-1_0: data['n']=1
ThreadPoolExecutor-1_1: data['n']=2
ThreadPoolExecutor-1_2: data['n']=3
MainThread: data['n']=3


subprocess
----------

標準ライブラリの `subprocess` モジュールは、新しいプロセスの開始、入力・出力・エラーパイプの接続、リターンコードの取得を可能とする。

### run ###

``` python
subprocess.run(args, *, **kwargs)
```

このモジュール関数は、`args`（文字列のリストまたは単一の文字列）で指定されたコマンドを実行する。コマンドの完了を待って、`subprocess.CompletedProcess` インスタンスを返す。

`args` 以外の引数は、すべてキーワード専用引数となっており、主なものは次のとおり（全てのキーワード専用引数は[公式ドキュメント](https://docs.python.org/ja/3/library/subprocess.html#using-the-subprocess-module)を参照）。

| kwargs | 意味 | default |
|:---|:---|:--:|
| `stdin` | 標準入力を指定する。以下から選ぶ<br /><br />・`None`: リダイレクトは発生しない<br /><br />・`subprocess.PIPE`: 子プロセスへの新しいパイプを作成する<br /><br />・`subprocess.DEVNULL`: `os.devnull` を使用する<br /><br />・ファイル記述子（正の整数）<br /><br />・有効なファイル記述子を持つファイルオブジェクト | `None` |
| `stdout` | 標準出力を指定する。`stdin` と同じ項目を選べる | `None` |
| `stderr` | 標準エラー出力を指定する。`stdout` で選べる項目に加え、`subprocess.STDOUT` も選べる。これは子プロセスからの `stderr` の内容が `stdout` と同じファイルハン<br />ドルにキャプチャされる必要があることを示す | `None` |
| `capture_output` | `True` の場合、標準出力と標準エラー出力の内容がそれぞれ `CompletedProcess` インスタンスの `stdout` 属性と `stderr` 属性で参照できる | `False` |
| `shell` | `args` が単一の文字列で `shell` が `True` の場合、その文字列がシェルによって実行される。`args` が単一の文字列で `shell` が `False`（デフォルト）の場合、その<br />文字列は引数を指定せずに実行される単なるプログラムの名前でなければならない | `False` |
| `cwd` | 作業ディレクトリのパスを表す path-like オブジェクトを指定する | `None` |
| `timeout` | `timeout` 秒経過後に処理が完了しなかった場合に `subprocess.TimeoutExpired` 例外を送出する。`timeout` が `None`（デフォルト）の場合、タイムアウトしない | `None` |
| `check` | `True` の場合、子プロセスが非ゼロの終了コードで終了したなら、`subprocess.CalledProcessError` 例外を送出する | `False` |

`subprocess.CompletedProcess` の属性:

| 属性 | 意味 |
|:---|:---|
| `args` | プロセスを起動するときに使用された引数。1 個のリストか 1 個の文字列になる |
| `returncode` | 子プロセスの終了コード。正常終了の場合 `0` を返す |
| `stdout` | 子プロセスから補足された標準出力の内容（バイト列） |
| `stderr` | 子プロセスから補足された標準エラー出力の内容（バイト列） |

`subprocess.CompletedProcess` のメソッド:

| メソッド | 機能 | 戻り値 |
|:---|:---|:---|
| `check_returncode()` | 終了コードが非ゼロの場合、`subprocess.CalledProcessError` が送出される | `None` |

次は、`capture_output` 引数を設定して、標準出力と標準エラー出力を捕捉するコード例である。

In [None]:
import subprocess
obj = subprocess.run(["ls", "-l"], capture_output=True)
print("stdout:\n{}".format(obj.stdout.decode("utf-8")))
print("stdout:\n{}".format(obj.stderr.decode("utf-8")))  # 捕捉した内容がない場合は None を返す

stdout:
total 4
drwxr-xr-x 1 root root 4096 Jul  1 13:21 sample_data

stdout:



これは、 `stdout` 引数と `stderr` 引数に `subprocess.PIPE` を設定する次のコードと同じである。

In [None]:
import subprocess
obj = subprocess.run(["ls", "-l"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print("stdout:\n{}".format(obj.stdout.decode("utf-8")))
print("stdout:\n{}".format(obj.stderr.decode("utf-8")))  # 捕捉した内容がない場合は None を返す

stdout:
total 4
drwxr-xr-x 1 root root 4096 Jul  1 13:21 sample_data

stdout:



次は、`check` 引数を設定して、子プロセスが正常終了しなかった場合に `subprocess.CalledProcessError` 例外を送出するコード例である。

In [None]:
import subprocess
try:
    # シェルでコマンド "exit 1" を実行する
    obj = subprocess.run("exit 1", shell=True, check=True)
except subprocess.CalledProcessError as err:
    print(f"{type(err).__name__}: {err}")

CalledProcessError: Command 'exit 1' returned non-zero exit status 1.


これは、戻り値オブジェクトの `check_returncode()`メソッド を呼び出す次のコードと同じである。

In [None]:
import subprocess
# シェルでコマンド "exit 1" を実行する
obj = subprocess.run("exit 1", shell=True)
try:
    obj.check_returncode()
except subprocess.CalledProcessError as err:
    print(f"{type(err).__name__}: {err}")

CalledProcessError: Command 'exit 1' returned non-zero exit status 1.


次は、`timeout` 引数を設定して、タイムアウトするコード例である。

In [None]:
import subprocess
try:
    subprocess.run(["sleep", "10"], timeout=1.0)
except subprocess.TimeoutExpired as err:
    print(f"{type(err).__name__}: {err}")

TimeoutExpired: Command '['sleep', '10']' timed out after 0.9999655629999893 seconds


### Popen ###

実のところ、`subprocess.run()` 関数は、内部的にそのキーワード引数を伴い `subprocess.Popen` インスタンス化を呼び出している。`subprocess.run()` 関数よりも高度な処理を行いたい場合は、`subprocess.Popen` クラスを直接利用する。

`subprocess.Popen` は、`subprocess.CompletedProcess` と同様の属性を持ち、さらに `pid` 属性で子プロセスのプロセス ID を参照できる。

`subprocess.Popen` のメソッド:

| メソッド | 機能 | 戻り値 |
|:---|:---|:---|
| `poll()` | 子プロセスの実行が終了したら終了コードを返す。終了してないなら `None` を返す | `int` &#124; `None` |
| `wait(timeout=None)` | 子プロセスが終了するまで待機し、終了したら終了コードを返す。`timeout` を設定した場合、プロセスが `timeout` <br />秒経過後に終了してない場合、`TimeoutExpired` 例外を送出する | `int` &#124; `None` |
| `communicate(input=None, timeout=None)` | 子プロセスが終了するまで待機し、標準出力と標準エラー出力を読み込む | 標準出力と標準エラー<br />出力の組となるタプル |
| `send_signal(signal)` | `signal` を子プロセスに送る | `None` |
| `terminate()` | 子プロセスを停止する | `None` |
| `kill()` | 子プロセスを強制終了する | `None` |

シェル上で `|` を使ったパイプライン処理は、2 つの子プロセスの標準出力と標準入力をパイプでつなぐことで記述することができる。

In [None]:
import subprocess
# 2つの子プロセスをパイプで繋ぐ
p1 = subprocess.Popen(["ls", "-l", "sample_data"], stdout=subprocess.PIPE)
p2 = subprocess.Popen(["grep", "json"], stdin=p1.stdout, stdout=subprocess.PIPE)
out, err = p2.communicate()  # ls -l sample_data | grep json
print(out.decode())

-rwxr-xr-x 1 root root     1697 Jan  1  2000 anscombe.json



tqdm
----

サードパーティ製パッケージの [tqdm](https://tqdm.github.io/) は、プログレスバー（進捗状況を表現するためのバー）を表示する機能を持つ。ループの進行状況を視覚的に確認できるため、長時間実行される処理や大規模なデータ処理の際に便利である。ライセンスは MIT License と MPL v 2.0 の組み合わせ。インストール方法は次のとおり。

``` shell
pip install tqdm
```

通常は `tqdm` をインポートするが、 Colab で使う場合は `tqdm.notebook` をインポートする。

基本的な使い方は、`tqdm` クラスのインスタンスを for ループと一緒に使うことである。

In [None]:
from time import sleep
# from tqdm import tqdm
from tqdm.notebook import tqdm

for _ in tqdm(range(1000)):
    sleep(0.001)

  0%|          | 0/1000 [00:00<?, ?it/s]

コンストラクタの第 1 引数 `iterable` にイテラブルを指定する場合、 `tqdm` インスタンスは `iterable` のアイテムを返すイテラブルとなる。

進行状況メッセージは、デフォルトでは次のフォーマットが使用される。

　`'{l_bar}{bar}{r_bar}'`

  * `l_bar`=`'{desc}: {percentage:3.0f}%|'`
  *  `r_bar`=`'| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, ' '{rate_fmt}{postfix}]'`

| プレースホルダ | 意味 |
|:---|:---|
| `{bar}` | プログレスバー |
| `{desc}` | 説明文 |
| `{percentage:3.0f}` | パーセンテージの書式指定 |
| `{n_fmt}` | 現在処理されたアイテムの数 |
| `{total_fmt}` | 全体のアイテムの数 |
| `{elapsed}` | 経過時間（秒） |
| `{remaining}` | 完了までに予想される残り時間（秒） |
| `{rate_fmt}` | 処理速度（アイテム/秒） |
| `{postfix}` | 追加の統計 |

このうち `{desc}: ` と `{postfix}` は、デフォルトでは表示されない。

コンストラクタは、進行状況メッセージをカスタマイズするための引数を多数サポートしている。その主なものは次のとおり。

| 引数 | 意味 | デフォルト |
|:---|:---|:---|
| `desc` | `{desc}` に表示される説明文 | `None` |
| `total` | `{total_fmt}` に表示される全体のアイテムの数。`None` の場合、`len(iterable)` が使用される。`iterable` 引数を `None`（デフォルト）とする場合は、<br />`total` を指定する必要がある。この場合、`tqdm` は整数を返すイテラブルとなる | `None` |
| `leave` | `False` の場合、処理完了後にプログレスバーを非表示にする | `True` |
| `file` | 進行状況メッセージを出力先を指定する。`None` の場合は、`sys.stderr` が使用される | `None` |
| `ncols` | 進行状況メッセージ全体の幅を 0 以上の整数で指定できる。指定した場合、この幅に収まるようにプログレスバーのサイズが動的に変更される | `None` |
| `mininterval` | 進行状況表示の最小更新間隔（秒） | `0.1` |
| `disable` | `True` の場合、進行状況メッセージ全体を非表示にする | `False` |
| `unit` | `{rate_fmt}` に使用されるアイテムの単位を指定する | `'it'` |
| `unit_scale` | `True` の場合、`{n_fmt}` と `{total_fmt}` がキロ、メガなどの単位でスケールされる | `False` |
| `bar_format` | 出力の書式設定を指定する。`bar_format` の変更はパフォーマンスに影響を与える可能性がある | `'{l_bar}{bar}{r_bar}'` |
| `initial` | 進行状況の初期値を指定する | `0` |
| `position` | 複数のプログレスバーを使う場合に、プログレスバーの縦位置を指定できる | `None` |
| `postfix` | `{postfix}` に表示される追加の統計。文字列または辞書が使える | `None` |
| `colour` | プログレスバーの色を文字列で指定する（e.g. `'green'`, `'00ff00'`） | `None` |

次は、`tqdm` インスタンス化の際に引数を利用するコード例:

In [None]:
from time import sleep
# from tqdm import tqdm
from tqdm.notebook import tqdm

for i in tqdm(range(1000), desc="Download", unit="B", unit_scale=True, postfix={"loss": 1.2}):
    sleep(0.001)

Download:   0%|          | 0.00/1.00k [00:00<?, ?B/s, loss=1.2]

`tqdm` は、以下のメソッドが使用できる。

| メソッド | 機能 | 戻り値 |
|:---|:---|:---|
| `update(n=1)` | 進行状況表示を手動で更新する。`n` で処理したアイテム数を指定する | `True` または `None` |
| `set_description(desc)` | `{desc}` に表示される説明文を更新する | `None` |
| `set_postfix(**kwargs)` | `{postfix}` に表示される追加の統計を更新する | `None` |
| `close()` | クリーンアップする。`leave` が `False` の場合は、プログレスバーを閉じる | `None` |

`tqdm` はコンテキストマネージャーとして使用できる。`close()` メソッドは、 with ブロックを終了するときに呼び出される。

次は、動的な説明文と追加の統計を表示するコード例:

In [None]:
from time import sleep
# from tqdm import tqdm
from tqdm.notebook import tqdm

with tqdm("abcde") as pbar:
    for i, ch in enumerate(pbar):
        pbar.set_description("[train] Epoch %d" % i)
        pbar.set_postfix(dict(loss=1 - i / 5, acc=i / 10))
        sleep(1.0)

  0%|          | 0/5 [00:00<?, ?it/s]

次は、マルチプロセスで `tqdm` 使用するコード例:

In [None]:
from multiprocessing import Pool
from random import random
import time

# from tqdm import tqdm
from tqdm.notebook import tqdm


def worker(x):
    time.sleep(random())
    return x * x


def main():
    # 同時に最大2個の子プロセス
    with Pool(processes=2) as pool:
        results = []
        data = range(10)
        for i in data:
            res = pool.apply_async(func=worker, args=(i,))
            results.append(res)
        # 結果の取得
        reply = []
        with tqdm(total=len(data)) as t:
            for res in results:
                try:
                    reply.append(res.get(timeout=2.0))
                except TimeoutError:
                    print("タイムアウトしました")
                else:
                    t.update()
    pool.join()
    print(reply)


if __name__ == "__main__":
    main()

  0%|          | 0/10 [00:00<?, ?it/s]

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
