# Essential effects notes

## Chapter 3

### Parallel typeclass

* Cats-effect `Applicative` instance for `IO` is non parallel
* Cats-effect `IO` has a separate data type for parallel effects: `IO.Par`

In [1]:
import $ivy.`org.typelevel::cats-effect:2.3.1`
import cats.effect._, cats.syntax._, cats.syntax.apply._

trait ExplicitChanges[A,B,C] {
    import scala.concurrent.ExecutionContext
    // Par Applicative instance needs a ContextShift in scope
    implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
    val ia: IO[A] = IO(???)
    val ib: IO[B] = IO(???)
    def f(a: A, b: B): C = ???
    val ipa: IO.Par[A] = IO.Par(ia)
    val ipb: IO.Par[B] = IO.Par(ib)
    val ipc: IO.Par[C] = (ipa, ipb).mapN(f)
    val ic: IO[C] = IO.Par.unwrap(ipc)
}

[32mimport [39m[36m$ivy.$                                 
[39m
[32mimport [39m[36mcats.effect._, cats.syntax._, cats.syntax.apply._

[39m
defined [32mtrait[39m [36mExplicitChanges[39m

* To make transformations easier there's the `Parallel` typeclass

```scala
trait Parallel[S[_]] {
  type P[_]
  def monad: Monad[S]
  def applicative: Applicative[P]
  // nat transformations
  def sequential: P ~> S
  def parallel: S ~> P
}
```

<img src="files/images/parallel-tc.png">

In [2]:
import cats.syntax.parallel._
trait ImplicitChanges[A,B,C] {
    val ia: IO[A] = IO(???)
    val ib: IO[B] = IO(???)
    def f(a: A, b: B): C = ???
    
    import scala.concurrent.ExecutionContext
    // Implicit conversions from IO to IO.Par need a ContextShift in scope
    implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
    val ic: IO[C] = (ia,ib).parMapN(f)
}

[32mimport [39m[36mcats.syntax.parallel._
[39m
defined [32mtrait[39m [36mImplicitChanges[39m

In [3]:
/** Extension methods for an effect of type `F[A]`. */
implicit class DebugHelper[A](ioa: IO[A]) {

/** Print to the console the value of the effect
 * along with the thread it was computed on. */
def debug: IO[A] =
  for {
    a <- ioa
    tn = Thread.currentThread.getName
    _ = println(s"[${tn}] $a") // <1>
  } yield a
}

defined [32mclass[39m [36mDebugHelper[39m

In [4]:
object DebugExample extends IOApp {
  def run(args: List[String]): IO[ExitCode] =
    seq.as(ExitCode.Success)

  val hello = IO("hello").debug
  val world = IO("world").debug
  val seq = (hello, world).mapN((h, w) => s"$h $w").debug
}

DebugExample.main(Array.empty)

[ioapp-compute-0] hello
[ioapp-compute-0] world
[ioapp-compute-0] hello world


defined [32mobject[39m [36mDebugExample[39m

In [5]:
object ParMapN extends IOApp {
  def run(args: List[String]): IO[ExitCode] = par.as(ExitCode.Success)

  val hello = IO("hello").debug
  val world = IO("world").debug
  val par =
    (hello, world) 
    .parMapN((h, w) => s"$h $w")
    .debug
}
ParMapN.main(Array.empty)

[ioapp-compute-1] hello
[ioapp-compute-2] world
[ioapp-compute-1] hello world


defined [32mobject[39m [36mParMapN[39m

In [6]:
object ParMapNErrors extends IOApp {
  def run(args: List[String]): IO[ExitCode] =
    e1.attempt.debug *>
    IO("---").debug *>
    e2.attempt.debug *>
    IO("---").debug *>
    e3.attempt.debug *>
    IO.pure(ExitCode.Success)
  val ok = IO("hi").debug
  val ko1 = IO.raiseError[String](new RuntimeException("oh!")).debug
  val ko2 = IO.raiseError[String](new RuntimeException("noes!")).debug
  val e1 = (ok, ko1).parMapN((_, _) => ())
  val e2 = (ko1, ok).parMapN((_, _) => ())
  val e3 = (ko1, ko2).parMapN((_, _) => ())
}
ParMapNErrors.main(Array.empty)

[ioapp-compute-4] hi
[ioapp-compute-5] Left(java.lang.RuntimeException: oh!)
[ioapp-compute-5] ---
[ioapp-compute-6] Left(java.lang.RuntimeException: oh!)
[ioapp-compute-6] ---
[ioapp-compute-5] Left(java.lang.RuntimeException: oh!)


defined [32mobject[39m [36mParMapNErrors[39m

### Summary
1. IO does not support parallel operations itself, because it is a Monad.
2. The Parallel typeclass specifies the translation between a pair of effect types:
one that is a Monad and the other that is “only” an Applicative.
3. Parallel[IO] connects the IO effect to its parallel counterpart, IO.Par.
4. Parallel IO composition requires the ability to shift computations to other
threads within the current ExecutionContext. This is how parallelism is
“implemented”.
5. parMapN, parTraverse, parSequence are the parallel versions of (the sequential)
mapN, traverse, and sequence. Errors are managed in a fail-fast manner.

# Chapter 4: Concurrency

In [7]:
import scala.concurrent.duration._
object JoinAfterStart extends IOApp {
  def run(args: List[String]): IO[ExitCode] =
     for {
        fiber <- task.start
        _ <- IO("pre-join").debug
        _ <- fiber.join.debug
        _ <- IO("post-join").debug
    } yield ExitCode.Success
  val task: IO[String] =
    IO.sleep(2.seconds) *> IO("task").debug
}
JoinAfterStart.main(Array.empty)

[ioapp-compute-1] pre-join
[ioapp-compute-3] task
[ioapp-compute-3] task
[ioapp-compute-3] post-join


[32mimport [39m[36mscala.concurrent.duration._
[39m
defined [32mobject[39m [36mJoinAfterStart[39m

> When we join a Fiber, execution continues on the thread the Fiber was running on.

## Cancellation

> To define the behavior of cancelation, Cats Effect uses the concept of a _cancelation boundary_. As an effect executes, if a cancelation boundary—whatever that is—is encountered, then the cancelation status for the current effect is checked, and if that effect has been canceled then execution will stop.

---

> From one perspective, cancelation is “automatic” because Cats Effect itself periodically inserts a cancelation boundary during effect execution. Alternatively, one can "manually" insert a cancellation boundary with `IO.cancellationBoundary`.

> In Cats Effect 2, a cancelation boundary is inserted after every 512 flatMap calls. In Cats Effect 3, every
flatMap is treated as cancelation boundary.

> IO.cancelBoundary is removed in Cats Effect 3, since flatMap itself is defined as a cancelation boundary.

# Chapter 5

> The Cats Effect library supports this pattern by encouraging
separate contexts:

> • CPU-bound work will be scheduled on a fixed-size thread pool, where the
number of threads is the number of cores available to the JVM. All things being
equal, you can’t compute more than _number of CPUs_ things at a time, so don’t try to do more.

> • I/O-bound work will be scheduled on an unbounded thread pool so that blocked
threads merely take up memory instead of stopping the progress of other tasks.