# send でジェネレータにデータを注入するのは避ける

In [2]:
import math

def wave(amplitude, steps):
  """ソフトウェアラジオを使って信号を送出する.

  Args:
      amplitude: 振幅
      steps:

  """
  step_size = 2 * math.pi / steps
  for step in range(steps):
    radians = step * step_size
    fraction = math.sin(radians)
    output = amplitude * fraction
    yield output

In [4]:
def transmit(output):
  if output is None:
    print(f'Output is None')
  else:
    print(f'Output: {output:>5.1f}')

def run(it):
  for output in it:
    transmit(output)

run(wave(3.0, 8))

Output:   0.0
Output:   2.1
Output:   3.0
Output:   2.1
Output:   0.0
Output:  -2.1
Output:  -3.0
Output:  -2.1


基本的な波形を作るにはこれで十分ですが、別の入力に基づいた波形で常に振幅を変動させること（すなわち、AMラジオ信号の生成）はできない
ジェネレータのイテレーションのたびに、振幅を変調する方法が必要

Python のジェネレータには yield 式を双方向に使えるようにした send メソッドがある
send メソッドでは、ジェネレータが出力を yield しながら、ストリーム入力を受け入れられる
通常ジェネレータをイテレーションするとき、yield 式の値は None である

In [8]:
# 通常のジェネレータ関数は yield を使って値を「一方向に」外へ出す（＝出力する）ものです。
# これはループなどで next() を呼ぶたびに順番に値を返します。
def simple_gen():
    yield 1
    yield 2

it = simple_gen()
print(next(it))
print(next(it))

1
2


In [None]:
# Python では yield を使って 値を出すだけでなく、外から値を受け取ることもできます。
# これを「双方向ジェネレータ」と呼びます。
# 具体的には、send(value) を使うと、前回の yield を「式」として評価し、
# その結果に value が渡される という動作になります。
def my_generator():
  received = yield 1 # 単に「1を出力する」だけだが、その後の yield に send() で値を送信可能
  print(f'reversed = {received}')

it = iter(my_generator())
output = next(it) # 最初のジェネレータ出力を取得 1 が出力される
print(f'output = {output}')

try:
  # 終わるまでジェネレータ実行
  # yield に何も送っていないので、received = None になる（デフォルト: None）
  next(it)
except StopIteration:
  pass
else:
  assert False

output = 1
reversed = None


for ループや組み込み関数 next でイテレーションする代わりに send メソッドを呼び出すと、指定した引数がジェネレータを再度呼び出したときの値になる。
しかし、初めてのジェネレータでは、yield 式がまだないので、最初に send を呼び出すときに指定できるのは None だけ（それ以外の引数は実行時に例外が送出される）

In [11]:
it = iter(my_generator())
# 最初のジェネレータ
# 最初の send では、まだ yield に達していないため、値を受け取ることができない
# ので、it.send(42) と値を送ると、エラーになる（最初の send は必ず next(it) or it.send(None)）
output = it.send(None)
print(f'output = {output}')

try:
  it.send('hello')
except StopIteration:
  pass

output = 1
reversed = hello


### 解説：
1. my_generator() を呼ぶと、まだ何も実行されない（イテレータが生成されるだけ）

1. next(it) を実行 → yield 1 まで進み、1 を返す（ここで一時停止）

1. 次の next(it) で、ジェネレータが再開

   * received = yield 1 に戻り、yield の式に None が代入される（nextは何も送れない）

   * print(f'reversed = {received}') で "reversed = None" が出力される

In [24]:
# ラジオ波のジェネレータを yield 式で返された振幅を保存し、それを利用して次の出力を生成
def wave_modulating(steps):
  step_size = 2 * math.pi / steps
  amplitude = yield                  # 最初の振幅を受け取る
  for step in range(steps):
    radians = step * step_size
    fraction = math.sin(radians)
    output = amplitude * fraction
    amplitude = yield output         # 次の振幅を受け取る

In [25]:
def run_modulating(it):
  amplitudes = [
    None, 7, 7, 7, 2, 2, 2, 2, 10, 10, 10, 10, 10
  ]
  for amplitude in amplitudes:
    output = it.send(amplitude)
    transmit(output)

# 1. it.send(None) で amplitude = yield まで進み、None を返す（一時停止）
# 2. it.send(7) で amplitude = yield に戻り、yield 式に 7 が代入される
#    for ループの amplitude = yield output まで進み、output を返す（一時停止）
# 3. it.send(7) で amplitude = yield output に戻り、yield 式に 7 が代入される
#    for ループの amplitude = yield output まで再度進み、output を返す（一時停止）
run_modulating(wave_modulating(12))

Output is None
Output:   0.0
Output:   3.5
Output:   6.1
Output:   2.0
Output:   1.7
Output:   1.0
Output:   0.0
Output:  -5.0
Output:  -8.7
Output: -10.0
Output:  -8.7
Output:  -5.0


初めてこのコードを読む人には難解...

話が変わり、プログラムの要求がもっと複雑になったとする。
搬送波に単純な制限はを使う代わりに、一連の信号からなる複雑な波形を使う必要がでたとする。

In [27]:
# 33 項目の yield from で複雑なジェネレータをつくるより
def complex_wave():
  yield from wave(7.0, 3)
  yield from wave(2.0, 4)
  yield from wave(10.0, 5)

run(complex_wave())

Output:   0.0
Output:   6.1
Output:  -6.1
Output:   0.0
Output:   2.0
Output:   0.0
Output:  -2.0
Output:   0.0
Output:   9.5
Output:   5.9
Output:  -5.9
Output:  -9.5


In [31]:
# yield from 式が簡単な場合を扱えたので、
# ジェネレータの send メソッドでもうまくいくと期待するかもしれない
def complex_wave_modulating():
  yield from wave_modulating(3)
  yield from wave_modulating(4)
  yield from wave_modulating(5)

# None が多数に出力されるので、問題
run_modulating(complex_wave_modulating())

Output is None
Output:   0.0
Output:   6.1
Output:  -6.1
Output is None
Output:   0.0
Output:   2.0
Output:   0.0
Output: -10.0
Output is None
Output:   0.0
Output:   9.5
Output:   5.9


None の問題を回避することはできるが、コードが更に難解になってくる。

ので、「send メソッドをきっぱりあきらめて、より単純な方式にする」のがオススメ

In [32]:
# もっとも簡単な解
# ラジオ波の関数にイテレータを渡すこと
# 組み込み関数 next が呼ばれるたびにイテレータが入力の振幅を返すように
def wave_cascading(amplitude_it, steps):
  step_size = 2 * math.pi / steps
  for step in range(steps):
    radians = step * step_size
    fraction = math.sin(radians)
    amplitude = next(amplitude_it)  # 次の入力取得
    output = amplitude * fraction
    yield output

In [35]:
def complex_wave_cascading(amplitude_it):
  yield from wave_cascading(amplitude_it, 3)
  yield from wave_cascading(amplitude_it, 4)
  yield from wave_cascading(amplitude_it, 5)

In [37]:
def run_cascading():
  amplitudes = [7, 7, 7, 2, 2, 2, 2, 10, 10, 10, 10, 10]
  it = complex_wave_cascading(iter(amplitudes))
  for _ in amplitudes:
    output = next(it)
    transmit(output)

run_cascading()

Output:   0.0
Output:   6.1
Output:  -6.1
Output:   0.0
Output:   2.0
Output:   0.0
Output:  -2.0
Output:   0.0
Output:   9.5
Output:   5.9
Output:  -5.9
Output:  -9.5


この方式で最も良いのは、イテレータはどこからでもよく、完全に動的（例えば、ジェネレータ関数を使って実装）でかまわないこと

唯一の欠点は、このコードではジェネレータが完全にスレッドセーフと仮定していることで、場合によるとそうでない場合がある

スレッドの境界値を超える必要があるなら、async 関数の方が良いかもしれない

### 唯一の欠点について

スレッドセーフ:
複数のスレッドが同時に同じリソースにアクセスしても正しく動作すること

Python のジェネレータは、状態を内部に持っていて、逐次的に進める
* yield によって状態が中断・再開される
* 外から next()・send() などで進める

これは「状態を持つ=排他的に扱わないと壊れやすい」という特徴がある

ここでは、amplitude = next(amplitude_it) のように外部の amplitude_it というイテレータ（=共有リソース）を内部で使っている点に問題が出る可能性がある

問題例:
* 複数のスレッドで同じ amplitude_it を共有して run_cascading() を呼ぶ
* それぞれが同時に next(amplitude_it) を呼び出す
* 結果として、データの順番が狂ったり、StopIteration 例外が予期せず出たりする

なぜなら Python のイテレータやジェネレータはスレッドセーフではない

### async 関数の方が良いかもしれないについて

async 関数は、Python において、マルチタスクを安全に行うための機構
* await を使って、安全なタイミングで他の処理に制御を譲る
* イテレータのような逐次的処理も、安全に並列風に扱える

async のメリット
* 複数の「波形出力タスク」を同時に走らせたい
* amplitude_it のような共有リソースにアクセスの同期制御を加えたい

このようなケースでは、スレッドではなく async の方が制御しやすく、安全に扱える

In [41]:
# wave_cascading のスレッドセーフでない使い方による問題例
import math
import threading
from time import sleep

def wave_cascading(amplitude_it, steps):
    step_size = 2 * math.pi / steps
    for step in range(steps):
        radians = step * step_size
        fraction = math.sin(radians)
        amplitude = next(amplitude_it)  # 共有イテレータから取得（危険）
        output = amplitude * fraction
        print(f"[{threading.current_thread().name}] Amplitude: {amplitude}, Output: {output}")
        sleep(0.1)  # 疑似的な遅延

def complex_wave_cascading(amplitude_it):
    # スレッドセーフではないことにより例外が発生するのでそれをキャッチする処理を追加
    try: 
        wave_cascading(amplitude_it, 3)
        wave_cascading(amplitude_it, 4)
    except StopIteration:
        print(f"StopIteration!")

def run_threaded_cascading():
    amplitudes = iter([1,2,3,4,5,6,7,8,9,10])  # 共有イテレータ
    t1 = threading.Thread(target=complex_wave_cascading, args=(amplitudes,), name="Thread-A")
    t2 = threading.Thread(target=complex_wave_cascading, args=(amplitudes,), name="Thread-B")
    t1.start()
    t2.start()
    t1.join()
    t2.join()

run_threaded_cascading()

[Thread-A] Amplitude: 1, Output: 0.0
[Thread-B] Amplitude: 2, Output: 0.0
[Thread-A] Amplitude: 3, Output: 2.598076211353316
[Thread-B] Amplitude: 4, Output: 3.464101615137755
[Thread-A] Amplitude: 5, Output: -4.330127018922192
[Thread-B] Amplitude: 6, Output: -5.19615242270663
[Thread-A] Amplitude: 7, Output: 0.0
[Thread-B] Amplitude: 8, Output: 0.0
[Thread-A] Amplitude: 9, Output: 9.0
[Thread-B] Amplitude: 10, Output: 10.0
StopIteration!
StopIteration!


### 問題

amplitudes（イテレータ）を複数スレッドで共有している

結果として、next(amplitudes) が同時に呼ばれると、内部状態が競合して
* 値がスキップされたり
* StopIteration が早く来たり
* 結果が順不同・壊れる可能性がある

In [None]:
import asyncio
import math

# 波形処理（終了シグナルが来たら停止）
async def wave_cascading(amplitude_queue, steps):
    step_size = 2 * math.pi / steps
    for step in range(steps):
        radians = step * step_size
        fraction = math.sin(radians)
        amplitude = await amplitude_queue.get() # 中身が空なら待機する（ブロックする）
        print(f'Amplitude: {amplitude}')
        if amplitude is None:
            return  # 明示的な終了シグナル
        output = amplitude * fraction
        yield output

# 波形を複数連続で実行
async def complex_wave_cascading(amplitude_queue):
    async for value in wave_cascading(amplitude_queue, 3):
        yield value
    async for value in wave_cascading(amplitude_queue, 4):
        yield value
    async for value in wave_cascading(amplitude_queue, 5):
        yield value

# 実行関数（キューが終了したら終了）
async def run_cascading(name, amplitude_queue):
    async for output in complex_wave_cascading(amplitude_queue):
        print(f"[{name}] Output: {output}")

# メイン実行関数
async def main():
    amplitudes = [1,2,3,4,5,6,7,8,9,10,11,12]
    queue = asyncio.Queue()

    # データの個数：3+4+5=12 → 各タスクに12個必要なので2倍に
    for a in amplitudes * 2:
        await queue.put(a)

    # 各タスクの終了を伝えるため、None（終了シグナル）を入れる
    await queue.put(None)
    await queue.put(None)

    await asyncio.gather(
        run_cascading("Task A", queue),
        run_cascading("Task B", queue),
    )

# Jupyter での実行
await main()

Amplitude: 1
[Task A] Output: 0.0
Amplitude: 2
[Task A] Output: 1.7320508075688774
Amplitude: 3
[Task A] Output: -2.598076211353315
Amplitude: 4
[Task A] Output: 0.0
Amplitude: 5
[Task A] Output: 5.0
Amplitude: 6
[Task A] Output: 7.347880794884119e-16
Amplitude: 7
[Task A] Output: -7.0
Amplitude: 8
[Task A] Output: 0.0
Amplitude: 9
[Task A] Output: 8.559508646656381
Amplitude: 10
[Task A] Output: 5.877852522924733
Amplitude: 11
[Task A] Output: -6.465637775217203
Amplitude: 12
[Task A] Output: -11.412678195541844
Amplitude: 1
[Task B] Output: 0.0
Amplitude: 2
[Task B] Output: 1.7320508075688774
Amplitude: 3
[Task B] Output: -2.598076211353315
Amplitude: 4
[Task B] Output: 0.0
Amplitude: 5
[Task B] Output: 5.0
Amplitude: 6
[Task B] Output: 7.347880794884119e-16
Amplitude: 7
[Task B] Output: -7.0
Amplitude: 8
[Task B] Output: 0.0
Amplitude: 9
[Task B] Output: 8.559508646656381
Amplitude: 10
[Task B] Output: 5.877852522924733
Amplitude: 11
[Task B] Output: -6.465637775217203
Amplitude: 12