Skip to content

Commit

Permalink
Merge pull request #3247 from djspiewak/release/3.4.0-major
Browse files Browse the repository at this point in the history
Backport merge 3.4.0 into major branch
  • Loading branch information
djspiewak committed Nov 13, 2022
2 parents 6d63602 + 56162ac commit 832079a
Show file tree
Hide file tree
Showing 60 changed files with 1,319 additions and 371 deletions.
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,41 +11,41 @@

## Getting Started

- Wired: **3.3.14**
- Wired: **3.4.0**
- Tired: **2.5.5** (end of life)

```scala
libraryDependencies += "org.typelevel" %% "cats-effect" % "3.3.14"
libraryDependencies += "org.typelevel" %% "cats-effect" % "3.4.0"
```

The above represents the core, stable dependency which brings in the entirety of Cats Effect. This is *most likely* what you want. All current Cats Effect releases are published for Scala 2.12, 2.13, 3.0, and Scala.js 1.7.

Or, if you prefer a less bare-bones starting point, you can try [the Giter8 template](https://github.com/typelevel/ce3.g8):

```bash
$ sbt -Dsbt.version=1.5.5 new typelevel/ce3.g8
$ sbt new typelevel/ce3.g8
```

Depending on your use-case, you may want to consider one of the several other modules which are made available within the Cats Effect release. If you're a datatype implementer (like [Monix](https://monix.io)), you probably only want to depend on **kernel** (the typeclasses) in your compile scope and **laws** in your test scope:

```scala
libraryDependencies ++= Seq(
"org.typelevel" %% "cats-effect-kernel" % "3.3.14",
"org.typelevel" %% "cats-effect-laws" % "3.3.14" % Test)
"org.typelevel" %% "cats-effect-kernel" % "3.4.0",
"org.typelevel" %% "cats-effect-laws" % "3.4.0" % Test)
```

If you're a middleware framework (like [Fs2](https://fs2.io/)), you probably want to depend on **std**, which gives you access to `Queue`, `Semaphore`, and much more without introducing a hard-dependency on `IO` outside of your tests:

```scala
libraryDependencies ++= Seq(
"org.typelevel" %% "cats-effect-std" % "3.3.14",
"org.typelevel" %% "cats-effect" % "3.3.14" % Test)
"org.typelevel" %% "cats-effect-std" % "3.4.0",
"org.typelevel" %% "cats-effect" % "3.4.0" % Test)
```

You may also find some utility in the **testkit** and **kernel-testkit** projects, which contain `TestContext`, generators for `IO`, and a few other things:

```scala
libraryDependencies += "org.typelevel" %% "cats-effect-testkit" % "3.3.14" % Test
libraryDependencies += "org.typelevel" %% "cats-effect-testkit" % "3.4.0" % Test
```

Cats Effect provides backward binary compatibility within the 2.x and 3.x version lines, and both forward and backward compatibility within any major/minor line. This is analogous to the versioning scheme used by Cats itself, as well as other major projects such as Scala.js. Thus, any project depending upon Cats Effect 2.2.1 can be used with libraries compiled against Cats Effect 2.0.0 or 2.2.3, but *not* with libraries compiled against 2.3.0 or higher.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import java.util.concurrent.TimeUnit
@OutputTimeUnit(TimeUnit.MINUTES)
class QueueBenchmark {

@Param(Array("100000"))
@Param(Array("32768")) // must be a power of 2
var size: Int = _

@Benchmark
Expand All @@ -62,6 +62,13 @@ class QueueBenchmark {
.flatMap(enqueueDequeueContended(_))
.unsafeRunSync()

@Benchmark
def boundedConcurrentEnqueueDequeueContendedSingleConsumer(): Unit =
Queue
.boundedForConcurrent[IO, Unit](size / 8)
.flatMap(enqueueDequeueContendedSingleConsumer(_))
.unsafeRunSync()

@Benchmark
def boundedAsyncEnqueueDequeueOne(): Unit =
Queue.boundedForAsync[IO, Unit](size).flatMap(enqueueDequeueOne(_)).unsafeRunSync()
Expand All @@ -77,6 +84,13 @@ class QueueBenchmark {
.flatMap(enqueueDequeueContended(_))
.unsafeRunSync()

@Benchmark
def boundedAsyncEnqueueDequeueContendedSingleConsumer(): Unit =
Queue
.boundedForAsync[IO, Unit](size / 8)
.flatMap(enqueueDequeueContendedSingleConsumer(_))
.unsafeRunSync()

@Benchmark
def unboundedConcurrentEnqueueDequeueOne(): Unit =
Queue.unboundedForConcurrent[IO, Unit].flatMap(enqueueDequeueOne(_)).unsafeRunSync()
Expand Down Expand Up @@ -129,7 +143,7 @@ class QueueBenchmark {
private[this] def enqueueDequeueContended(q: Queue[IO, Unit]): IO[Unit] = {
def par(action: IO[Unit], num: Int): IO[Unit] =
if (num <= 10)
action
action.replicateA_(num)
else
par(action, num / 2) &> par(action, num / 2)

Expand All @@ -138,4 +152,14 @@ class QueueBenchmark {

offerers &> takers
}

private[this] def enqueueDequeueContendedSingleConsumer(q: Queue[IO, Unit]): IO[Unit] = {
def par(action: IO[Unit], num: Int): IO[Unit] =
if (num <= 10)
action.replicateA_(num)
else
par(action, num / 2) &> par(action, num / 2)

par(q.offer(()), size / 4) &> q.take.replicateA_(size / 4)
}
}
14 changes: 7 additions & 7 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ ThisBuild / apiURL := Some(url("https://typelevel.org/cats-effect/api/3.x/"))

ThisBuild / autoAPIMappings := true

val CatsVersion = "2.8.0"
val CatsVersion = "2.9.0"
val Specs2Version = "4.17.0"
val ScalaCheckVersion = "1.17.0"
val DisciplineVersion = "1.4.0"
Expand Down Expand Up @@ -551,11 +551,6 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform)
// changes to `cats.effect.unsafe` package private code
ProblemFilters.exclude[MissingClassProblem]("cats.effect.unsafe.WorkerThread$"),
ProblemFilters.exclude[MissingClassProblem]("cats.effect.unsafe.WorkerThread$Data"),
// introduced by #2732, lambda lifting for private[this] queue
ProblemFilters.exclude[ReversedMissingMethodProblem](
"cats.effect.IOApp.cats$effect$IOApp$_setter_$cats$effect$IOApp$$queue_="),
ProblemFilters.exclude[ReversedMissingMethodProblem](
"cats.effect.IOApp.cats$effect$IOApp$$queue"),
// introduced by #2844, Thread local fallback weak bag
// changes to `cats.effect.unsafe` package private code
ProblemFilters.exclude[MissingClassProblem]("cats.effect.unsafe.SynchronizedWeakBag"),
Expand Down Expand Up @@ -730,7 +725,12 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform)
"cats.effect.unsafe.PolyfillExecutionContext$"),
ProblemFilters.exclude[MissingClassProblem]("cats.effect.unsafe.WorkerThread"),
ProblemFilters.exclude[Problem]("cats.effect.IOFiberConstants.*"),
ProblemFilters.exclude[Problem]("cats.effect.SyncIOConstants.*")
ProblemFilters.exclude[Problem]("cats.effect.SyncIOConstants.*"),
// introduced by #3196. Changes in an internal API.
ProblemFilters.exclude[DirectMissingMethodProblem](
"cats.effect.unsafe.FiberAwareExecutionContext.liveFibers"),
// introduced by #3222. Optimized ArrayStack internal API
ProblemFilters.exclude[Problem]("cats.effect.ArrayStack*")
)
},
mimaBinaryIssueFilters ++= {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ private[effect] sealed abstract class WorkStealingThreadPool private ()
def reportFailure(cause: Throwable): Unit
private[effect] def reschedule(runnable: Runnable): Unit
private[effect] def canExecuteBlockingCode(): Boolean
private[unsafe] def liveFibers()
: (Set[Runnable], Map[WorkerThread, (Option[Runnable], Set[Runnable])], Set[Runnable])
private[unsafe] def liveTraces(): (
Map[Runnable, Trace],
Map[WorkerThread, (Thread.State, Option[(Runnable, Trace)], Map[Runnable, Trace])],
Map[Runnable, Trace])
}

private[unsafe] sealed abstract class WorkerThread private () extends Thread {
Expand Down
62 changes: 62 additions & 0 deletions core/js/src/main/scala/cats/effect/ArrayStack.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2020-2022 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect

import scala.scalajs.js

private final class ArrayStack[A <: AnyRef](val buffer: js.Array[A]) extends AnyVal {

@inline def init(bound: Int): Unit = {
val _ = bound
()
}

@inline def push(a: A): Unit = {
buffer.push(a)
()
}

@inline def pop(): A = {
buffer.pop()
}

@inline def peek(): A = buffer(buffer.length - 1)

@inline def isEmpty(): Boolean = buffer.length == 0

// to allow for external iteration
@inline def unsafeBuffer(): js.Array[A] = buffer
@inline def unsafeIndex(): Int = buffer.length

@inline def invalidate(): Unit = {
buffer.length = 0 // javascript is crazy!
}

}

private object ArrayStack {

@inline def apply[A <: AnyRef](size: Int): ArrayStack[A] = {
val _ = size
apply()
}

@inline def apply[A <: AnyRef](): ArrayStack[A] = {
new ArrayStack(new js.Array[A])
}

}
71 changes: 71 additions & 0 deletions core/js/src/main/scala/cats/effect/ByteStack.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2020-2022 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect

import scala.scalajs.js

private object ByteStack {

type T = js.Array[Int]

@inline final def create(initialMaxOps: Int): js.Array[Int] = {
val _ = initialMaxOps
js.Array(0)
}

@inline final def growIfNeeded(stack: js.Array[Int], count: Int): js.Array[Int] = {
if ((1 + ((count + 1) >> 3)) < stack.length) {
stack
} else {
stack.push(0)
stack
}
}

@inline final def push(stack: js.Array[Int], op: Byte): js.Array[Int] = {
val c = stack(0) // current count of elements
val use = growIfNeeded(stack, c) // alias so we add to the right place
val s = (c >> 3) + 1 // current slot in `use`
val shift = (c & 7) << 2 // BEGIN MAGIC
use(s) = (use(s) & ~(0xffffffff << shift)) | (op << shift) // END MAGIC
use(0) += 1 // write the new count
use
}

@inline final def size(stack: js.Array[Int]): Int =
stack(0)

@inline final def isEmpty(stack: js.Array[Int]): Boolean =
stack(0) < 1

@inline final def read(stack: js.Array[Int], pos: Int): Byte = {
if (pos < 0 || pos >= stack(0)) throw new ArrayIndexOutOfBoundsException()
((stack((pos >> 3) + 1) >>> ((pos & 7) << 2)) & 0x0000000f).toByte
}

@inline final def peek(stack: js.Array[Int]): Byte = {
val c = stack(0) - 1
if (c < 0) throw new ArrayIndexOutOfBoundsException()
((stack((c >> 3) + 1) >>> ((c & 7) << 2)) & 0x0000000f).toByte
}

@inline final def pop(stack: js.Array[Int]): Byte = {
val op = peek(stack)
stack(0) -= 1
op
}
}
8 changes: 7 additions & 1 deletion core/js/src/main/scala/cats/effect/IOApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package cats.effect

import cats.effect.metrics.JsCpuStarvationMetrics
import cats.effect.tracing.TracingConstants._

import scala.concurrent.CancellationException
Expand Down Expand Up @@ -230,7 +231,12 @@ trait IOApp {

var cancelCode = 1 // So this can be updated by external cancellation
val fiber = Spawn[IO]
.raceOutcome[ExitCode, Nothing](run(argList), keepAlive)
.raceOutcome[ExitCode, Nothing](
CpuStarvationCheck
.run(runtimeConfig, JsCpuStarvationMetrics())
.background
.surround(run(argList)),
keepAlive)
.flatMap {
case Left(Outcome.Canceled()) =>
IO.raiseError(new CancellationException("IOApp main fiber was canceled"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2020-2022 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.metrics

import cats.effect.IO

import scala.concurrent.duration.FiniteDuration

private[effect] class JsCpuStarvationMetrics extends CpuStarvationMetrics {
override def incCpuStarvationCount: IO[Unit] = IO.unit

override def recordClockDrift(drift: FiniteDuration): IO[Unit] = IO.unit
}

private[effect] object JsCpuStarvationMetrics {
private[effect] def apply(): CpuStarvationMetrics = new JsCpuStarvationMetrics
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import scala.concurrent.ExecutionContext

private final class FiberAwareExecutionContext(ec: ExecutionContext) extends ExecutionContext {

def liveFibers(): Set[IOFiber[_]] = fiberBag.toSet
def liveTraces(): Map[IOFiber[_], Trace] =
fiberBag.iterator.filterNot(_.isDone).map(f => f -> f.captureTrace()).toMap

private[this] val fiberBag = mutable.Set.empty[IOFiber[_]]

Expand Down
17 changes: 12 additions & 5 deletions core/js/src/main/scala/cats/effect/unsafe/FiberMonitor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package cats.effect
package unsafe

import scala.collection.mutable
import scala.concurrent.ExecutionContext
import scala.scalajs.{js, LinkingInfo}

Expand Down Expand Up @@ -55,19 +56,25 @@ private final class ES2021FiberMonitor(
override def monitorSuspended(fiber: IOFiber[_]): WeakBag.Handle =
bag.insert(fiber)

def foreignTraces(): Map[IOFiber[_], Trace] = {
val foreign = mutable.Map.empty[IOFiber[Any], Trace]
bag.forEach(fiber =>
if (!fiber.isDone) foreign += (fiber.asInstanceOf[IOFiber[Any]] -> fiber.captureTrace()))
foreign.toMap
}

def liveFiberSnapshot(print: String => Unit): Unit =
Option(compute).foreach { compute =>
val queued = compute.liveFibers().filterNot(_.isDone)
val rawForeign = bag.toSet.filterNot(_.isDone)
val queued = compute.liveTraces()
val rawForeign = foreignTraces()

// We trust the sources of data in the following order, ordered from
// most trustworthy to least trustworthy.
// 1. Fibers from the macrotask executor
// 2. Fibers from the foreign fallback weak GC map

val allForeign = rawForeign -- queued
val suspended = allForeign.filter(_.get())
val foreign = allForeign.filterNot(_.get())
val allForeign = rawForeign -- queued.keys
val (suspended, foreign) = allForeign.partition { case (f, _) => f.get() }

printFibers(queued, "YIELDING")(print)
printFibers(foreign, "YIELDING")(print)
Expand Down
Loading

0 comments on commit 832079a

Please sign in to comment.