# 第9章 値としてのストリーム

学習ステップ:

1. IOだけを使う
2. 再帰でIOを使う
3. StreamでIOを使う

## 9.2 未知数の値に対処する

要件: オンライン両替

1. ユーザーは、ある通貨の特定の金額を別の通貨に両替することを要求できる。
2. 要求された両替が実行されるのは、その通貨ペアの為替レートが**上昇傾向**にある場合に限られる。  
   上昇傾向とは、直近のn個のレートがそれぞれ1つ前のレートよりも高いことを意味する。  
   たとえばn=3の場合、0.81, 0.82, 0.85は上昇傾向であり、0.81, 0.80, 0.85は上昇傾向ではない。
3. API呼び出し関数exchangeTableが用意されている。  
   この関数は、両替元の通貨から、サポートされている他の通貨への、現在の為替レートテーブルだけを取得する。

In [1]:
// 副作用のあるAPIを呼び出す。
// 特定のレートがない為替テーブルが返される場合がある。
//
// 実行例:
// exchangeRatesTableApiCall("USD")
// → Map("JPY" -> 104.54, "EUR" -> 0.81)
//
// exchangeRatesTableApiCall("USD")
// → Exception in thread "main": Connection error
//
// exchangeRatesTableApiCall("USD")
// → Map("EUR" -> 0.79)
def exchangeRatesTableApiCall(currency: String): Map[String, BigDecimal] = ???


defined [32mfunction[39m [36mexchangeRatesTableApiCall[39m

In [2]:
object model {
  opaque type Currency = String
  object Currency {
    def apply(name: String): Currency = name
    extension (currency: Currency) def name: String = currency
  }
}
import model._


defined [32mobject[39m [36mmodel[39m
[32mimport [39m[36mmodel._

[39m

In [3]:
import $ivy.`org.typelevel::cats-effect:3.3.1`
import cats.effect._
import cats.implicits._

// 次に作りたいやつのシグネチャ
def exchangeTable(from: Currency): IO[Map[Currency, BigDecimal]] = ???


[32mimport [39m[36m$ivy.$                                 
[39m
[32mimport [39m[36mcats.effect._
[39m
[32mimport [39m[36mcats.implicits._

// 次に作りたいやつのシグネチャ
[39m
defined [32mfunction[39m [36mexchangeTable[39m

## 9.6 実習: イミュータブルマップ

In [4]:
val m1: Map[String, String] = Map("key" -> "value")
val m2: Map[String, String] = m1.updated("key2", "value2")
val m3: Map[String, String] = m2.updated("key2", "another2")
val m4: Map[String, String] = m2.removed("key")
val valueFromM3: Option[String] = m3.get("key")
val valueFromM4: Option[String] = m4.get("key")


[36mm1[39m: [32mMap[39m[[32mString[39m, [32mString[39m] = [33mMap[39m([32m"key"[39m -> [32m"value"[39m)
[36mm2[39m: [32mMap[39m[[32mString[39m, [32mString[39m] = [33mMap[39m([32m"key"[39m -> [32m"value"[39m, [32m"key2"[39m -> [32m"value2"[39m)
[36mm3[39m: [32mMap[39m[[32mString[39m, [32mString[39m] = [33mMap[39m([32m"key"[39m -> [32m"value"[39m, [32m"key2"[39m -> [32m"another2"[39m)
[36mm4[39m: [32mMap[39m[[32mString[39m, [32mString[39m] = [33mMap[39m([32m"key2"[39m -> [32m"value2"[39m)
[36mvalueFromM3[39m: [32mOption[39m[[32mString[39m] = [33mSome[39m(value = [32m"value"[39m)
[36mvalueFromM4[39m: [32mOption[39m[[32mString[39m] = [32mNone[39m

## 9.7 IO呼び出しは何回必要か


In [5]:
// 最終的に作りたいやつのシグネチャ
def exchangeIfTrending(amount: BigDecimal, from: Currency, to: Currency): IO[BigDecimal] = ???


defined [32mfunction[39m [36mexchangeIfTrending[39m

## 9.8 ボトムアップ設計

解決しなければならず、すぐに取り組むことができる**小さな問題**:

1. レートが上昇傾向にあるかどうかをチェックする
    ```scala
    def trending(rates: List[BigDecimal]): Boolean = ???
    ```
2. テーブルから通過を1つ抽出する
    ```scala
    def extractSingeCurrencyRate(currencyToExtract: Currency)(table: Map[Currency, BigDecimal]): Option[BigDecimal] = ???
    ```

In [6]:
def trending(rates: List[BigDecimal]): Boolean = {
  rates.size > 1 &&
  rates.zip(rates.drop(1))
       .forall(ratePair => ratePair match {
          case (previousRate, rate) => rate > previousRate
       })
}


defined [32mfunction[39m [36mtrending[39m

## 9.13 コーヒーブレイク: マップとタプルを操作する

In [7]:
def extractSingleCurrencyRate(currencyToExtract: Currency)(table: Map[Currency, BigDecimal]): Option[BigDecimal] = {
  table.get(currencyToExtract)
}


defined [32mfunction[39m [36mextractSingleCurrencyRate[39m

In [8]:
def exchangeTable(from: Currency): IO[Map[Currency, BigDecimal]] = {
  IO.delay(exchangeRatesTableApiCall(from.name)).map(table =>
    table.map(kv =>
      kv match {
        case (currencyName, rate) => (Currency(currencyName), rate)
      }))
}


defined [32mfunction[39m [36mexchangeTable[39m

In [9]:
def retry[A](action: IO[A], maxRetries: Int): IO[A] = {
  List
    .range(0, maxRetries)
    .map(_ => action)
    .foldLeft(action)((program, retryAction) =>
      program.orElse(retryAction)
    )
}

// ハードコーディング
def lastRates(from: Currency, to: Currency): IO[List[BigDecimal]] = {
  for {
    table1 <- retry(exchangeTable(from), 10)
    table2 <- retry(exchangeTable(from), 10)
    table3 <- retry(exchangeTable(from), 10)
    lastTables = List(table1, table2, table3)
  } yield lastTables.flatMap(extractSingleCurrencyRate(to))
}

def exchangeIfTrending(amount: BigDecimal, from: Currency, to: Currency): IO[Option[BigDecimal]] = {
  lastRates(from, to).map(rates =>
    if (trending(rates)) Some(amount * rates.last) else None
  )
}


defined [32mfunction[39m [36mretry[39m
defined [32mfunction[39m [36mlastRates[39m
defined [32mfunction[39m [36mexchangeIfTrending[39m

## 9.18 再帰関数

exchangeIfTrendingを修正する

In [10]:
def exchangeIfTrending(amount: BigDecimal, from: Currency, to: Currency): IO[BigDecimal] = {
  for {
    rates <- lastRates(from, to)
    result <- if (trending(rates))
                IO.pure(amount * rates.last)
              else exchangeIfTrending(amount, from, to)
  } yield result
}


defined [32mfunction[39m [36mexchangeIfTrending[39m

## 9.23 コーヒーブレイク: 再帰と無限

In [11]:
def currencyRate(from: Currency, to: Currency): IO[BigDecimal] = {
  for {
    table <- retry(exchangeTable(from), 10)
    rate <- extractSingleCurrencyRate(to)(table) match {
      case Some(value) => IO.pure(value)
      case None        => currencyRate(from, to)
    }
  } yield rate
}


defined [32mfunction[39m [36mcurrencyRate[39m

In [12]:
def lastRates(from: Currency, to: Currency, n: Int): IO[List[BigDecimal]] = {
  List.range(0, n).map(_ => currencyRate(from, to)).sequence
}


defined [32mfunction[39m [36mlastRates[39m

In [13]:

def exchangeIfTrending(amount: BigDecimal, from: Currency, to: Currency): IO[BigDecimal] = {
  for {
    rates <- lastRates(from, to, 3)
    result <- if (trending(rates))
                IO.pure(amount * rates.last)
              else exchangeIfTrending(amount, from, to)
  } yield result
}

exchangeIfTrending(BigDecimal(1000), Currency("USD"), Currency("EUR"))


defined [32mfunction[39m [36mexchangeIfTrending[39m
[36mres13_1[39m: [32mIO[39m[[32mBigDecimal[39m] = [33mFlatMap[39m(
  ioe = [33mMap[39m(
    ioe = [33mMap[39m(
      ioe = [33mFlatMap[39m(
        ioe = [33mFlatMap[39m(
          ioe = [33mHandleErrorWith[39m(
            ioa = [33mHandleErrorWith[39m(
              ioa = [33mHandleErrorWith[39m(
                ioa = [33mHandleErrorWith[39m(
                  ioa = [33mHandleErrorWith[39m(
                    ioa = [33mHandleErrorWith[39m(
                      ioa = [33mHandleErrorWith[39m(
                        ioa = [33mHandleErrorWith[39m(
                          ioa = [33mHandleErrorWith[39m(
                            ioa = [33mHandleErrorWith[39m(
                              ioa = [33mMap[39m(
                                ioe = [33mDelay[39m(
                                  thunk = ammonite.$sess.cell8$Helper$$Lambda$3715/0x0000000801692eb0@2b6533d6,
                  

# 9.28 データストリームの導入

無限実行の可能性がある場合たいていストリームを使うことになる

~~1. IOだけを使う~~  
~~2. 再帰でIOを使う~~  
3. StreamでIOを使う

## 9.31 ストリームの処理、プロデューサ、コンシューマ

プロデューサ/コンシューマパターン(p.344を見よ)


## 9.40 実習: ストリーム処理

In [14]:
import $ivy.`org.typelevel::cats-effect:3.2.9`
import $ivy.`co.fs2::fs2-core:3.1.2`

import cats.effect._
import fs2.Stream

def castTheDieImpure(): Int = ???
def castTheDie(): IO[Int] = IO.delay(castTheDieImpure())
val infiniteDieCasts: Stream[IO, Int] = Stream.eval(castTheDie()).repeat

// 1. 奇数をフィルタリングし、最初の3つの奇数を返す
infiniteDieCasts.map(_ % 2 != 0).take(3).compile.toList

// 2. サイコロを振って最初の5つ目を返すが、6の目はすべて2倍にする
// (したがって、[1, 2, 3, 6, 4]は[1, 2, 3, 12, 4]になる)。
infiniteDieCasts.take(5).map(x => if (x == 6) x * 2 else x).compile.toList

// 3. 最初の3つの目の合計を返す
infiniteDieCasts.take(3).compile.toList.map(_.sum)

// 4. 5が出るまでサイコロを降ってから、さらに2回サイコロを振り、最後の3つの結果(5とさらに2つの目)を返す
infiniteDieCasts.filter(_ == 5).take(1).append(infiniteDieCasts.take(2)).compile.toList

//5. サイコロを100回振り、それらの値が排出されるようにする
infiniteDieCasts.take(100).compile.drain

// 6. 最初の3つの目を変更せずにそのまま返し、次の3つの目を3倍にして返す(合計6つの目)
infiniteDieCasts.take(3).append(infiniteDieCasts.take(3).map(_ * 3)).compile.toList

// 7. 6が2回連続で出るまでサイコロを振る
infiniteDieCasts.scan(0)((sixesInRow, current) =>
  if (current == 6) sixesInRow + 1 else 0)
  .filter(_ == 2).take(1).compile.toList


[32mimport [39m[36m$ivy.$                                 
[39m
[32mimport [39m[36m$ivy.$                       

[39m
[32mimport [39m[36mcats.effect._
[39m
[32mimport [39m[36mfs2.Stream

[39m
defined [32mfunction[39m [36mcastTheDieImpure[39m
defined [32mfunction[39m [36mcastTheDie[39m
[36minfiniteDieCasts[39m: [32mStream[39m[fs2.Stream[[A >: scala.Nothing <: scala.Any] => _root_.cats.effect.IO[A], scala.Int], [32mInt[39m] = Stream(..)
[36mres14_7[39m: [32mIO[39m[[32mList[39m[[32mBoolean[39m]] = [33mFlatMap[39m(
  ioe = [33mPure[39m(value = ()),
  f = fs2.Stream$CompileOps$$Lambda$3789/0x00000008016d5418@7307c336,
  event = cats.effect.tracing.TracingEvent$StackTrace
)
[36mres14_8[39m: [32mIO[39m[[32mList[39m[[32mInt[39m]] = [33mFlatMap[39m(
  ioe = [33mPure[39m(value = ()),
  f = fs2.Stream$CompileOps$$Lambda$3789/0x00000008016d5418@38bdb891,
  event = cats.effect.tracing.TracingEvent$StackTrace
)
[36mres14_9[39m: [32mI

In [15]:
def rates(from: Currency, to: Currency): Stream[IO, BigDecimal] = {
  Stream
    .eval(exchangeTable(from))
    .repeat
    .map(extractSingleCurrencyRate(to))
    .unNone
    .orElse(rates(from, to))
}


defined [32mfunction[39m [36mrates[39m

In [16]:
// ratesストリームを使ってexchangeIfTrendingをアップデート
def exchangeIfTrending(amount: BigDecimal, from: Currency, to: Currency): IO[BigDecimal] = {
  rates(from, to)
    .sliding(3)
    .map(_.toList)
    .filter(trending)
    .map(_.last)
    .take(1)
    .compile
    .lastOrError
    .map(_ * amount)
}


defined [32mfunction[39m [36mexchangeIfTrending[39m

## 9.46 IO呼び出しの間で待機する

In [17]:
import scala.concurrent.duration._
import java.util.concurrent.TimeUnit

// 待機のためにStream.fixedRateを使う
val delay: FiniteDuration = FiniteDuration(1, TimeUnit.SECONDS)
val ticks: Stream[IO, Unit] = Stream.fixedRate[IO](delay)


[32mimport [39m[36mscala.concurrent.duration._
[39m
[32mimport [39m[36mjava.util.concurrent.TimeUnit

// 待機のためにStream.fixedRateを使う
[39m
[36mdelay[39m: [32mFiniteDuration[39m = 1 second
[36mticks[39m: [32mStream[39m[_root_.fs2.Stream[[A >: scala.Nothing <: scala.Any] => _root_.cats.effect.IO[A], scala.Unit], [32mUnit[39m] = Stream(..)

> IOベースのプログラムは、実行時はフェアプレイに徹し、必要なとき以外はスレッドプールのスレッドを使わない。(p.360)

delayもticksも単なる値なので、スレッドプールのことを考えずに済む。

In [18]:
import cats.effect.unsafe.implicits.global

val firstThreeRates: IO[List[(BigDecimal, Unit)]] =
  rates(Currency("USD"), Currency("EUR"))
    .zip(ticks).take(3).compile.toList

// firstThreeRates.unsafeRunSync()


[32mimport [39m[36mcats.effect.unsafe.implicits.global

[39m
[36mfirstThreeRates[39m: [32mIO[39m[[32mList[39m[([32mBigDecimal[39m, [32mUnit[39m)]] = [33mFlatMap[39m(
  ioe = [33mPure[39m(value = ()),
  f = fs2.Stream$CompileOps$$Lambda$3789/0x00000008016d5418@7020d833,
  event = cats.effect.tracing.TracingEvent$StackTrace
)

## 9.47 ストリームをzipする

待機のために2つのストリームをzipしたが、両方の要素に関心があるわけではない。  
そのような場合はzipLeft(zipRight)を使うことができる。  

In [19]:
// zipLeftを使ってfirstThreeRatesをアップデート
val firstThreeRates: IO[List[(BigDecimal)]] =
  rates(Currency("USD"), Currency("EUR"))
    .zipLeft(ticks).take(3).compile.toList


[36mfirstThreeRates[39m: [32mIO[39m[[32mList[39m[[32mBigDecimal[39m]] = [33mFlatMap[39m(
  ioe = [33mPure[39m(value = ()),
  f = fs2.Stream$CompileOps$$Lambda$3789/0x00000008016d5418@5308bde8,
  event = cats.effect.tracing.TracingEvent$StackTrace
)

In [20]:
// zipLeftを使用してexchangeIfTrendingをアップデート
def exchangeIfTrending(amount: BigDecimal, from: Currency, to: Currency): IO[BigDecimal] = {
  rates(from, to)
    .zipLeft(ticks)
    .sliding(3)
    .map(_.toList)
    .filter(trending)
    .map(_.last)
    .take(1)
    .compile
    .lastOrError
    .map(_ * amount)
}


defined [32mfunction[39m [36mexchangeIfTrending[39m

## 9.48 ストリームベースのアプローチを使うことの利点

- **ストリームの定義がそれを使う場所から切り離されている**
  - つまり、その定義は無限かもしれず、実際に必要な要素の数は呼び出し元が定義する
- **本当に必要になるまで何も行われない**
  - すべての演算が遅延評価される
- **高レベルのAPIのおかげで、実装上の詳細ではなくビジネスドメインに集中できる**
  - 本質的な複雑さと付随的な複雑さのもう1つの例である
- **関心事がより分離される**
  - Streamのコンビネータに引数として渡される関数は、それらがストリームの中で使われることを知らない
- **合成可能性**
  - 開発者は独立した小さな部品を理解してから、それらの部分の結び付きを理解することで、  
    より大きな機能を分析することができる
- **非同期の境界のカプセル化**
  - 潜在的に異なるコンピュータ(ノード)で多くのストリームを同時に実行し、  
    より大きなストリームを使って結果を結合することも、実装上の詳細(付随的な関心事)である。  
    このようにして、ノード間の境界を超えて結果を同期することに関する詳細をすべてカプセル化できる。