# Pythonによる並行処理のパターン
+ https://qiita.com/kaitolucifer/items/e4ace07bd8e112388c75

#### Pythonでマルチタスクを同時に処理したい時は主に2通りのやり方
+ プライマリースレッドを持つプロセスを複数個立ち上げて, 複数のタスクを処理する
+ 1個のプロセスの中で複数スレッドを立ち上げて, 複数のタスクを処理する
+ 複数プロセスで複数スレッドを立ち上げて複数のタスクを処理もできるがモデルが複雑化するので非推奨

## 1. threading

#### Pythonのスレッド
+ プロセスでシミュレートしたような疑似スレッドではなくOSのネイティブスレッドを使用する
+ Linux系ではPOSIXスレッド
+ WindowsではWindowsスレッド

2つのモジュールが用意されている  
+ _thread 低レベルモジュール
+ threading 高レベルモジュール

In [1]:
from type_hint import *
from import_str import importstr
from log_conf import logging
log = logging.getLogger('nb')

# log.setLevel(logging.WARN)
log.setLevel(logging.INFO)
log.setLevel(logging.DEBUG)

module_parent_dir /Users/inoueshinichi/Desktop/MyGithub/Learn_PythonConcurrent/..
module_parent_dir /Users/inoueshinichi/Desktop/MyGithub/Learn_PythonConcurrent/..
module_parent_dir /Users/inoueshinichi/Desktop/MyGithub/Learn_PythonConcurrent/..


ConcurrentPatternフォルダの各アプリクラスを実行する関数

In [2]:
# running everything app
def run(app, *argv):
    argv = list(argv)
    log.info('Running: {}({!r}).main()'.format(app, argv))
    print("*app.rsplit('.', 1) : ", *app.rsplit('.', 1))

    app_cls = importstr(*app.rsplit('.', 1)) # __import__を実行
    app_cls(argv).main()

    log.info("Finished: {}.({!r}).main()".format(app, argv))

#### 1-1. インスタンス化
t1とt2が交替で実行されていることが確認できます。交替ルールの1つはIO操作（ここではprint操作が該当する）

In [3]:
# run("concurrent_pattern.thread_app.SimpleThreadApp")

#### 1-2. カスタイマイズ
threading.Threadクラスを継承してrun()メソッドをオーバーライドして独自のカスタムスレッドを使用する

In [4]:
# run('concurrent_pattern.thread_app.ClassStyleThreadApp')

#### 1-3. スレッド数を計算
`active_count`でアクティブなスレッド数を数えることができる.

In [5]:
# run('concurrent_pattern.thread_app.CountingThreadApp')

#### 1-4. デーモンスレッド
+ デーモンスレッドという実態は存在しない. 
+ 通常のプライマリースレッドに対するワーカースレッドのことをデーモンスレッドと呼んでいるだけ
+ デーモンスレッドはプライマリースレッドの終了まで破棄が遅延される

In [6]:
# run('concurrent_pattern.thread_app.DaemonThreadApp')

![GIL.jpeg](image/GIL.jpg)

![CPythonThreading](image/CPythonThread.jpg)

![BoundTask](image/BoundTask_ConvoyEffect.jpg)

#### データ競合(Data Race)

![分解不可操作(Atomic)](image/DecompAtomicOpe.jpg)

![DataRace](image/DataRace.jpg)

In [7]:
# run('concurrent_pattern.data_race.DataRaceThreadApp')

#### 1-6-1. 排他制御(mutex)

In [8]:
# run('concurrent_pattern.mutex.MutexForDataRaceApp')

#### 1-6-2. 再帰排他制御(recursive_mutex)

In [9]:
# run('concurrent_pattern.mutex.RecursiveMutexForDataRaceApp')

#### 1-6-3. 有限セマフォ（BoundedSemaphore）制御

+ 排他制御は、ある時刻において、リソースを処理できるのは1つのスレッドのみに制限する
+ セマフォは一定数のスレッドorプロセスの同時処理を許容する制限
+ トイレに3つの便座があって、同時に3人が使っていて他の人は並んで待つシチュエーションがセマフォに該当
+ Pythonの場合, GILがあるので有限セマフォを使うシーンは1リソースに対して複数プロセスがアクセスするような状況だと思う

In [10]:
# run('concurrent_pattern.semaphore.BoundedSemaphoreApp')

#### 1-6-4. イベント（Event）制御

スレッドのイベントはメインスレッドが他のスレッドをコントロールするためのもの  

| メソッド | 説明 |
| :-- | :-- |
| clear | flagをFalseにする |
| set | flagをTrueにする |
| is_set | flagがTrueのときTrueを返す |
| wait | flagをモニタリングし続ける. flagがFalseの時はブロッキング（blocking）する |


In [11]:
# run('concurrent_pattern.event.EventApp')

#### 1-6-5. タイマー（Timer）制御

タイマーでスレッドを起動できる

In [12]:
# run('concurrent_pattern.timer.TimerThreadApp')

#### 1-6-6. 条件（Condition）制御

+ 条件判定でスレッドを制御する
+ Conditionクラス

| メソッド | 説明 |
| :-- | :-- |
| wait | 通知されるか引数のtimeout時間に達するまでスレッドをハングアップする |
| notify | ハングアップされたスレッド（デフォルトn=1）に通知する. ロックを取得した状態でしか使えない |
| notifyAll | ハングアップされた全てのスレッドに通知する |


In [13]:
# run('concurrent_pattern.condition.ConditionControlThreadsApp')

#### 1-6-7. バリア（Barrier）制御

+ 指定された数のスレッドがバリアを通ったら、まとめて実行される制御
+ オンライン対戦ゲームで、チームが指定人数になるまで一定時間待機するのをバリアで実装できる

| メソッド | 説明 |
| :-- | :-- |
| wait | スレッドがバリアを通る. <br>指定された数のスレッドが通ったら、waitしているスレッドが全部解放される. |
| reset | バリアを空にする. <br>waitしているスレッドにBrokenBarrierErrorを返す. |
| abort | バリアをbroke状態にする. <br>現在の全てのスレッドが終了する. <br>これ以降にバリアを通ろうとするスレッドにBrokenBarrierErrorを返す. |

In [14]:
# run('concurrent_pattern.barrier.BarrierGamePlayersJoinApp')

#### 1-7. ThreadLocal

+ スレッド独自のローカル変数(スタック変数)を使用する  
+ localを辞書のように各スレッド固有のデータを保存するものだと見ることができる

ThreadLocalの使い方として、それぞれのスレッドに独自のDBコネクション、httpリクエストなどを作ることができる.  
スレッドからすると、受け取った全てのデータはローカル変数同然で、他のスレッドに構わず操作することが可能.  

In [15]:
# run('concurrent_pattern.thread_local.ThreadLocalApp')

## 2. multiprocesing

+ Unix系は, fork()システムコールを呼び出す
+ Windowsは, fork()システムコールを持っていないので, pickleによる擬似フォークを行う
+ (親プロセスの全てのPythonオブジェクトをPickleでシリアライズして、子プロセスに渡す)

PIDのチェック(Unix系)

In [16]:
import os

print('Process ({}) start...'.format(os.getpid()))

# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
    print('I am child process ({}) and my parent is {}.'.format(os.getpid(), os.getppid()))
else:
    print('I ({}) just created a child process ({}).'.format(os.getpid(), pid))

Process (24311) start...
I (24311) just created a child process (24344).
Process (24311) start...
I am child process (24344) and my parent is 24311.


#### 2.1 Process

In [17]:
# run('concurrent_pattern.process.SubProcessApp')

#### 2.2 Process Pool

+ プロセスは生成のオーバヘッドが大きいのでPoolでプールしておくことで効率が上がる

| メソッド | 説明 |
| :-- | :-- |
| apply | 同期処理 |
| apply_acync | 非同期処理 |
| terminate | 直ちに強制終了 |
| join | 親プロセスは, 子プロセスが終了するまで待機する. プロセスのjoinは, terminateかcloseの以降で使用できる |
| close | すべてのプロセスの処理が終了したら終了する |


In [18]:
# run('concurrent_pattern.process.ProcessPoolApp')

#### 2.3 プロセス間通信

| 手法 | 説明 |
| :-- | :-- |
| Queue | FIFOデータ構造 |
| Pipe | TxとRxがあるデータ転送構造 |
| Shared memory | プロセスデータの共有空間データ | 

#### 2-3-1. キュー（Queue）

In [19]:
# run('concurrent_pattern.connection_processes.ProcessConnectWithQueueApp')

#### 2-3-2. パイプ（Pipe）

+ パイプの片方にデータを入れて（sendメソッド）,もう片方にデータ受け取る（recvメソッド）というふうにデータが伝達される
+ 2つのプロセスが同時に同じパイプにデータを入れたり受け取ったりすると, データが破損するので注意

In [20]:
# run('concurrent_pattern.connection_processes.ProcessConnectWithPipeApp')

#### 2-3-3. 名前付き共有メモリ（Named shared memory）

+ PythonではValueとArrayで、数値データと配列デートを共有メモリ上に保持することができる.
+ 余談ですが、ValueとArrayはC言語のデータ構造をそのまま利用している.

In [21]:
# run('concurrent_pattern.connection_processes.ProcessConnectSharedMemoryApp')

python 3.8からmultiprocessing.shared_memoryモジュールが追加され、共有メモリを使ってプロセス間のデータ交換ができるようになった.

In [22]:
# run('concurrent_pattern.connection_processes.ProcessConnectWithNamedSharedMemoryApp')

#### 2-3-4. マネージャ（Manager）

+ Topic形式
+ Manager()はマネージャーオブジェクトを返してサーバープロセスを作る.
+ サーバープロセスを通して, 他のプロセスはプロキシ方式で, Pythonオブジェクトを操作する.

サポート済みの型<br>
+ list
+ dict
+ Namespace
+ Lock
+ RLock
+ Semaphore
+ BoundedSemaphore
+ Condition
+ Event
+ Barrier
+ Queue
+ Value
+ Array

In [23]:
# run('concurrent_pattern.connection_processes.ProcessConnectWithManagerApp')

Python 3.8からmultiprocessing.managers.SharedMemoryManagerモジュールが追加され、共有メモリマネージャーが使える

In [24]:
# run('concurrent_pattern.connection_processes.ProcessConnectWithSharedMemoryManagerApp')

### 2-3-5. プロセス制御

Mutexによるプロセスのロック  
ロックがかかったことで数字が順番に出力されています。ただし、マルチプロセスの性能を発揮できない

In [25]:
# run('concurrent_pattern.process_control.ProcessLockApp')

### 2-4. 分散型プロセス処理

multiprocessingモジュールのmanagersサブモジュールはプロセスを複数のマシンに分散できます。  
通信プロトコルが分からなくても、分散型プロセス処理のプログラムが書けます。

分散型プロセス処理にはタスクをスケジューリングするサーバープロセスとタスクを実際に処理するワーカープロセスが必要です。

サーバープロセスのtask_server_master.py  
ワーカープロセスのtask_client_worker.py

In [26]:
run('concurrent_pattern.process_control.ProcessServerClientApp')

2023-07-24 20:38:19,810 INFO     pid:24311 nb:004:run Running: concurrent_pattern.process_control.ProcessServerClientApp([]).main()
2023-07-24 20:38:19,998 INFO     pid:24311 concurrent_pattern.process_control:114:main Starting ProcessServerClientApp, Namespace()


*app.rsplit('.', 1) :  concurrent_pattern.process_control ProcessServerClientApp
module_parent_dir /Users/inoueshinichi/Desktop/MyGithub/Learn_PythonConcurrent/concurrent_pattern/..
module_parent_dir /Users/inoueshinichi/Desktop/MyGithub/Learn_PythonConcurrent/concurrent_pattern/..
module_parent_dir /Users/inoueshinichi/Desktop/MyGithub/Learn_PythonConcurrent/concurrent_pattern/..
module_parent_dir /Users/inoueshinichi/Desktop/MyGithub/Learn_PythonConcurrent/concurrent_pattern/..
module_parent_dir /Users/inoueshinichi/Desktop/MyGithub/Learn_PythonConcurrent/..
module_parent_dir /Users/inoueshinichi/Desktop/MyGithub/Learn_PythonConcurrent/..
module_parent_dir /Users/inoueshinichi/Desktop/MyGithub/Learn_PythonConcurrent/concurrent_pattern/..
module_parent_dir /Users/inoueshinichi/Desktop/MyGithub/Learn_PythonConcurrent/concurrent_pattern/..
[Start] server_process 24346
[START] Queue Server
module_parent_dir /Users/inoueshinichi/Desktop/MyGithub/Learn_PythonConcurrent/concurrent_pattern/.

Result: 8600 * 8600 = 73960000
run task 6848 * 6848...
Result: 6848 * 6848 = 46895104
run task 7918 * 7918...
Result: 7918 * 7918 = 62694724
run task 8623 * 8623...
Result: 8623 * 8623 = 74356129
run task 7805 * 7805...
Result: 7805 * 7805 = 60918025
run task 9905 * 9905...
Result: 9905 * 9905 = 98109025
run task 667 * 667...
Result: 667 * 667 = 444889
run task 4593 * 4593...
Result: 4593 * 4593 = 21095649
run task 8816 * 8816...
Result: 8816 * 8816 = 77721856
run task 6464 * 6464...


2023-07-24 20:38:31,566 INFO     pid:24311 nb:010:run Finished: concurrent_pattern.process_control.ProcessServerClientApp.([]).main()


worker exit.
[END] Worker Process
Result: 6464 * 6464 = 41783296
master exit.
[END] Queue Server
Finish


### 3. subprocess