From 177f73f2d39955779e838242f486e4499b49aa0d Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Mon, 14 May 2018 15:30:36 +0300 Subject: [PATCH 01/17] Initial MVar --- .../scala/cats/effect/concurrent/MVar.scala | 149 ++++++++++++ .../cats/effect/internals/Callback.scala | 4 +- .../cats/effect/internals/LinkedMap.scala | 13 +- .../cats/effect/internals/MVarAsync.scala | 165 ++++++++++++++ .../effect/internals/MVarConcurrent.scala | 212 ++++++++++++++++++ 5 files changed, 540 insertions(+), 3 deletions(-) create mode 100644 core/shared/src/main/scala/cats/effect/concurrent/MVar.scala create mode 100644 core/shared/src/main/scala/cats/effect/internals/MVarAsync.scala create mode 100644 core/shared/src/main/scala/cats/effect/internals/MVarConcurrent.scala diff --git a/core/shared/src/main/scala/cats/effect/concurrent/MVar.scala b/core/shared/src/main/scala/cats/effect/concurrent/MVar.scala new file mode 100644 index 0000000000..fb761a3d3d --- /dev/null +++ b/core/shared/src/main/scala/cats/effect/concurrent/MVar.scala @@ -0,0 +1,149 @@ +/* + * Copyright (c) 2017-2018 The Typelevel Cats-effect Project Developers + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect +package concurrent + +import cats.effect.internals.{MVarAsync, MVarConcurrent} + +/** + * A mutable location, that is either empty or contains + * a value of type `A`. + * + * It has 2 fundamental atomic operations: + * + * - [[put]] which fills the var if empty, or blocks + * (asynchronously) until the var is empty again + * - [[take]] which empties the var if full, returning the contained + * value, or blocks (asynchronously) otherwise until there is + * a value to pull + * + * The `MVar` is appropriate for building synchronization + * primitives and performing simple inter-thread communications. + * If it helps, it's similar with a `BlockingQueue(capacity = 1)`, + * except that it doesn't block any threads, all waiting being + * done asynchronously (via [[Async]] or [[Concurrent]] data types, + * such as [[IO]]). + * + * Given its asynchronous, non-blocking nature, it can be used on + * top of Javascript as well. + * + * Inspired by `Control.Concurrent.MVar` from Haskell and + * by `scalaz.concurrent.MVar`. + */ +abstract class MVar[F[_], A] { + /** Fills the `MVar` if it is empty, or blocks (asynchronously) + * if the `MVar` is full, until the given value is next in + * line to be consumed on [[take]]. + * + * This operation is atomic. + ** + * @return a task that on evaluation will complete when the + * `put` operation succeeds in filling the `MVar`, + * with the given value being next in line to + * be consumed + */ + def put(a: A): F[Unit] + + /** Empties the `MVar` if full, returning the contained value, + * or blocks (asynchronously) until a value is available. + * + * This operation is atomic. + * + * @return a task that on evaluation will be completed after + * a value was retrieved + */ + def take: F[A] +} + +/** Builders for [[MVar]]. */ +object MVar { + /** + * Builds an [[MVar]] value for `F` data types that are [[Concurrent]]. + * + * Due to `Concurrent`'s capabilities, the yielded values by [[MVar.take]] + * and [[MVar.put]] are cancelable. + * + * This builder uses the + * [[https://typelevel.org/cats/guidelines.html#partially-applied-type-params Partially-Applied Type]] + * technique. + * + * For creating an empty `MVar`: + * {{{ + * MVar.cancelable[IO].empty[Int] + * }}} + * + * For creating an `MVar` with an initial value: + * {{{ + * MVar.cancelable[IO]("hello") + * }}} + * + * @see [[async]] + */ + def apply[F[_]](implicit F: Concurrent[F]): ConcurrentBuilder[F] = + new ConcurrentBuilder[F](F) + + /** + * Builds an [[MVar]] value for `F` data types that are [[Async]]. + * + * Due to `Async`'s restrictions the yielded values by [[MVar.take]] + * and [[MVar.put]] are not cancelable. + * + * This builder uses the + * [[https://typelevel.org/cats/guidelines.html#partially-applied-type-params Partially-Applied Type]] + * technique. + * + * For creating an empty `MVar`: + * {{{ + * MVar.async[IO].empty[Int] + * }}} + * + * For creating an `MVar` with an initial value: + * {{{ + * MVar.async[IO]("hello") + * }}} + * + * @see [[apply]] + */ + def async[F[_]](implicit F: Async[F]): AsyncBuilder[F] = + new AsyncBuilder[F](F) + + /** + * Returned by the [[async]] builder. + */ + final class AsyncBuilder[F[_]](val F: Async[F]) extends AnyVal { + /** Builds an `MVar` with an initial value. */ + def apply[A](a: A): F[MVar[F, A]] = + F.delay(MVarAsync[F, A](a)(F)) + + /** Builds an empty `MVar`. */ + def empty[A]: F[MVar[F, A]] = + F.delay(MVarAsync.empty[F, A](F)) + } + + /** + * Returned by the [[apply]] builder. + */ + final class ConcurrentBuilder[F[_]](val F: Concurrent[F]) extends AnyVal { + /** Builds an `MVar` with an initial value. */ + def apply[A](a: A): F[MVar[F, A]] = + F.delay(MVarConcurrent[F, A](a)(F)) + + /** Builds an empty `MVar`. */ + def empty[A]: F[MVar[F, A]] = + F.delay(MVarConcurrent.empty[F, A](F)) + } +} \ No newline at end of file diff --git a/core/shared/src/main/scala/cats/effect/internals/Callback.scala b/core/shared/src/main/scala/cats/effect/internals/Callback.scala index 6513c3d657..851d78ef48 100644 --- a/core/shared/src/main/scala/cats/effect/internals/Callback.scala +++ b/core/shared/src/main/scala/cats/effect/internals/Callback.scala @@ -41,10 +41,10 @@ private[effect] object Callback { } /** Reusable `Right(())` reference. */ - final val rightUnit = Right(()) + val rightUnit = Right(()) /** Reusable no-op, side-effectful `Function1` reference. */ - final val dummy1: Any => Unit = _ => () + val dummy1: Any => Unit = _ => () /** Builds a callback with async execution. */ def async[A](cb: Type[A]): Type[A] = diff --git a/core/shared/src/main/scala/cats/effect/internals/LinkedMap.scala b/core/shared/src/main/scala/cats/effect/internals/LinkedMap.scala index a57a3bcb34..8df9cd3326 100644 --- a/core/shared/src/main/scala/cats/effect/internals/LinkedMap.scala +++ b/core/shared/src/main/scala/cats/effect/internals/LinkedMap.scala @@ -30,6 +30,10 @@ private[effect] class LinkedMap[K, +V]( private[this] val insertionOrder: LongMap[K], private[this] val nextId: Long) { + /** Returns `true` if this map is empty, or `false` otherwise. */ + def isEmpty: Boolean = + entries.isEmpty + /** Returns a new map with the supplied key/value added. */ def updated[V2 >: V](k: K, v: V2): LinkedMap[K, V2] = { val insertionOrderOldRemoved = entries.get(k).fold(insertionOrder) { case (_, id) => insertionOrder - id } @@ -51,7 +55,14 @@ private[effect] class LinkedMap[K, +V]( /** The values in this map, in the order they were added. */ def values: Iterable[V] = keys.flatMap(k => entries.get(k).toList.map(_._1)) - override def toString = keys.zip(values).mkString("LinkedMap(", ", ", ")") + /** Pulls the first value from this `LinkedMap`, in FIFO order. */ + def dequeue: (V, LinkedMap[K, V]) = { + val k = insertionOrder.head._2 + (entries(k)._1, this - k) + } + + override def toString: String = + keys.zip(values).mkString("LinkedMap(", ", ", ")") } private[effect] object LinkedMap { diff --git a/core/shared/src/main/scala/cats/effect/internals/MVarAsync.scala b/core/shared/src/main/scala/cats/effect/internals/MVarAsync.scala new file mode 100644 index 0000000000..8b68265070 --- /dev/null +++ b/core/shared/src/main/scala/cats/effect/internals/MVarAsync.scala @@ -0,0 +1,165 @@ +/* + * Copyright (c) 2017-2018 The Typelevel Cats-effect Project Developers + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect +package internals + +import java.util.concurrent.atomic.AtomicReference +import cats.effect.concurrent.MVar +import cats.effect.internals.Callback.{Type => Listener, rightUnit} +import scala.annotation.tailrec +import scala.collection.immutable.Queue + +/** + * [[MVar]] implementation for [[Async]] data types. + */ +private[effect] final class MVarAsync[F[_], A] private ( + initial: MVarAsync.State[A])(implicit F: Async[F]) + extends MVar[F, A] { + + import MVarAsync._ + + private[this] val stateRef = new AtomicReference[State[A]](initial) + + def put(a: A): F[Unit] = + F.async { cb => + if (unsafePut(a, cb)) cb(rightUnit) + } + + def take: F[A] = + F.async { cb => + val r = unsafeTake(cb) + if (r != null) cb(Right(r)) + } + + @tailrec + private def unsafePut(a: A, await: Listener[Unit]): Boolean = { + if (a == null) throw new NullPointerException("null not supported in MVarAsync/MVar") + val current: State[A] = stateRef.get + + current match { + case _: Empty[_] => + if (stateRef.compareAndSet(current, WaitForTake(a, Queue.empty))) true + else unsafePut(a, await) // retry + + case WaitForTake(value, queue) => + val update = WaitForTake(value, queue.enqueue(a -> await)) + if (stateRef.compareAndSet(current, update)) false + else unsafePut(a, await) // retry + + case current @ WaitForPut(first, _) => + if (stateRef.compareAndSet(current, current.dequeue)) { first(Right(a)); true } + else unsafePut(a, await) // retry + } + } + + @tailrec + private def unsafeTake(await: Listener[A]): A = { + val current: State[A] = stateRef.get + current match { + case _: Empty[_] => + if (stateRef.compareAndSet(current, WaitForPut(await, Queue.empty))) + null.asInstanceOf[A] + else + unsafeTake(await) // retry + + case WaitForTake(value, queue) => + if (queue.isEmpty) { + if (stateRef.compareAndSet(current, State.empty)) + value + else + unsafeTake(await) + } + else { + val ((ax, notify), xs) = queue.dequeue + if (stateRef.compareAndSet(current, WaitForTake(ax, xs))) { + notify(rightUnit) // notification + value + } else { + unsafeTake(await) // retry + } + } + + case WaitForPut(first, queue) => + if (stateRef.compareAndSet(current, WaitForPut(first, queue.enqueue(await)))) + null.asInstanceOf[A] + else + unsafeTake(await) + } + } +} + +private[effect] object MVarAsync { + /** Builds an [[MVarAsync]] instance with an `initial` value. */ + def apply[F[_], A](initial: A)(implicit F: Async[F]): MVar[F, A] = + new MVarAsync[F, A](State(initial)) + + /** Returns an empty [[MVarAsync]] instance. */ + def empty[F[_], A](implicit F: Async[F]): MVar[F, A] = + new MVarAsync[F, A](State.empty) + + /** ADT modelling the internal state of `MVar`. */ + private sealed trait State[A] + + /** Private [[State]] builders.*/ + private object State { + private[this] val ref = Empty() + def apply[A](a: A): State[A] = WaitForTake(a, Queue.empty) + /** `Empty` state, reusing the same instance. */ + def empty[A]: State[A] = ref.asInstanceOf[State[A]] + } + + /** + * `MVarAsync` state signaling an empty location. + * + * Evolves into [[WaitForPut]] or [[WaitForTake]], + * depending on which operation happens first. + */ + private final case class Empty[A]() extends State[A] + + /** + * `MVarAsync` state signaling it has `take` callbacks + * registered and we are waiting for one or multiple + * `put` operations. + * + * @param first is the first request waiting in line + * @param queue are the rest of the requests waiting in line, + * if more than one `take` requests were registered + */ + private final case class WaitForPut[A](first: Listener[A], queue: Queue[Listener[A]]) + extends State[A] { + + def dequeue: State[A] = + if (queue.isEmpty) State.empty[A] else { + val (x, xs) = queue.dequeue + WaitForPut(x, xs) + } + } + + /** + * `MVarAsync` state signaling it has one or more values enqueued, + * to be signaled on the next `take`. + * + * @param value is the first value to signal + * @param queue are the rest of the `put` requests, along with the + * callbacks that need to be called whenever the corresponding + * value is first in line (i.e. when the corresponding `put` + * is unblocked from the user's point of view) + */ + private final case class WaitForTake[A](value: A, queue: Queue[(A, Listener[Unit])]) + extends State[A] +} + diff --git a/core/shared/src/main/scala/cats/effect/internals/MVarConcurrent.scala b/core/shared/src/main/scala/cats/effect/internals/MVarConcurrent.scala new file mode 100644 index 0000000000..4dc4f3a218 --- /dev/null +++ b/core/shared/src/main/scala/cats/effect/internals/MVarConcurrent.scala @@ -0,0 +1,212 @@ +/* + * Copyright (c) 2017-2018 The Typelevel Cats-effect Project Developers + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect +package internals + +import java.util.concurrent.atomic.AtomicReference + +import cats.effect.concurrent.MVar +import cats.effect.internals.Callback.{rightUnit, Type => Listener} + +import scala.annotation.tailrec + +/** + * [[MVar]] implementation for [[Concurrent]] data types. + */ +private[effect] final class MVarConcurrent[F[_], A] private ( + initial: MVarConcurrent.State[A])(implicit F: Concurrent[F]) + extends MVar[F, A] { + + import MVarConcurrent._ + + private[this] val stateRef = new AtomicReference[State[A]](initial) + + def put(a: A): F[Unit] = + F.cancelable(unsafePut(a)) + + def take: F[A] = + F.cancelable(unsafeTake) + + private def unregisterPut(id: Id): IO[Unit] = { + @tailrec def loop(): Unit = + stateRef.get() match { + case current @ WaitForTake(_, listeners) => + val update = current.copy(listeners = listeners - id) + if (!stateRef.compareAndSet(current, update)) loop() + case _ => + () + } + IO(loop()) + } + + private def unsafePut(a: A)(onPut: Listener[Unit]): IO[Unit] = { + @tailrec def loop(): IO[Unit] = { + val current: State[A] = stateRef.get + + current match { + case _: Empty[_] => + if (stateRef.compareAndSet(current, WaitForTake(a, LinkedMap.empty))) { + onPut(rightUnit) + IO.unit + } else { + loop() // retry + } + case WaitForTake(value, listeners) => + val id = new Id + val newMap = listeners.updated(id, (a, onPut)) + val update = WaitForTake(value, newMap) + + if (stateRef.compareAndSet(current, update)) + unregisterPut(id) + else + loop() // retry + + case current @ WaitForPut(listeners) => + if (listeners.isEmpty) { + val update = WaitForTake(a, LinkedMap.empty) + if (stateRef.compareAndSet(current, update)) { + onPut(rightUnit) + IO.unit + } else { + loop() + } + } else { + val (cb, update) = listeners.dequeue + if (stateRef.compareAndSet(current, WaitForPut(update))) { + cb(Right(a)) + onPut(rightUnit) + IO.unit + } else { + loop() + } + } + } + } + + if (a == null) { + onPut(Left(new NullPointerException("null not supported in MVar"))) + IO.unit + } else { + loop() + } + } + + private def unregisterTake(id: Id): IO[Unit] = { + @tailrec def loop(): Unit = + stateRef.get() match { + case current @ WaitForPut(listeners) => + val newMap = listeners - id + val update: State[A] = if (newMap.isEmpty) State.empty else WaitForPut(newMap) + if (!stateRef.compareAndSet(current, update)) loop() + case _ => () + } + IO(loop()) + } + + @tailrec + private def unsafeTake(onTake: Listener[A]): IO[Unit] = { + val current: State[A] = stateRef.get + current match { + case _: Empty[_] => + val id = new Id + if (stateRef.compareAndSet(current, WaitForPut(LinkedMap.empty.updated(id, onTake)))) + unregisterTake(id) + else + unsafeTake(onTake) // retry + + case WaitForTake(value, queue) => + if (queue.isEmpty) { + if (stateRef.compareAndSet(current, State.empty)) { + onTake(Right(value)) + IO.unit + } else { + unsafeTake(onTake) // retry + } + } else { + val ((ax, notify), xs) = queue.dequeue + if (stateRef.compareAndSet(current, WaitForTake(ax, xs))) { + onTake(Right(value)) + notify(rightUnit) + IO.unit + } else { + unsafeTake(onTake) // retry + } + } + + case WaitForPut(queue) => + val id = new Id + val newQueue = queue.updated(id, onTake) + if (stateRef.compareAndSet(current, WaitForPut(newQueue))) + unregisterTake(id) + else + unsafeTake(onTake) + } + } +} + +private[effect] object MVarConcurrent { + /** Builds an [[MVarConcurrent]] instance with an `initial` value. */ + def apply[F[_], A](initial: A)(implicit F: Concurrent[F]): MVar[F, A] = + new MVarConcurrent[F, A](State(initial)) + + /** Returns an empty [[MVarConcurrent]] instance. */ + def empty[F[_], A](implicit F: Concurrent[F]): MVar[F, A] = + new MVarConcurrent[F, A](State.empty) + + private final class Id extends Serializable + + /** ADT modelling the internal state of `MVar`. */ + private sealed trait State[A] + + /** Private [[State]] builders.*/ + private object State { + private[this] val ref = Empty() + def apply[A](a: A): State[A] = WaitForTake(a, LinkedMap.empty) + /** `Empty` state, reusing the same instance. */ + def empty[A]: State[A] = ref.asInstanceOf[State[A]] + } + + /** + * `MVarConcurrent` state signaling an empty location. + * + * Evolves into [[WaitForPut]] or [[WaitForTake]], + * depending on which operation happens first. + */ + private final case class Empty[A]() extends State[A] + + /** + * `MVarConcurrent` state signaling it has `take` callbacks + * registered and we are waiting for one or multiple + * `put` operations. + */ + private final case class WaitForPut[A](listeners: LinkedMap[Id, Listener[A]]) + extends State[A] + + /** + * `MVarConcurrent` state signaling it has one or more values enqueued, + * to be signaled on the next `take`. + * + * @param value is the first value to signal + * @param listeners are the rest of the `put` requests, along with the + * callbacks that need to be called whenever the corresponding + * value is first in line (i.e. when the corresponding `put` + * is unblocked from the user's point of view) + */ + private final case class WaitForTake[A](value: A, listeners: LinkedMap[Id, (A, Listener[Unit])]) + extends State[A] +} + From 0ee507c0004d1407639c92570b26f7ac748ae2da Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Mon, 14 May 2018 15:47:18 +0300 Subject: [PATCH 02/17] Fix builders --- .../scala/cats/effect/concurrent/MVar.scala | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/core/shared/src/main/scala/cats/effect/concurrent/MVar.scala b/core/shared/src/main/scala/cats/effect/concurrent/MVar.scala index fb761a3d3d..48dfa7881e 100644 --- a/core/shared/src/main/scala/cats/effect/concurrent/MVar.scala +++ b/core/shared/src/main/scala/cats/effect/concurrent/MVar.scala @@ -45,12 +45,13 @@ import cats.effect.internals.{MVarAsync, MVarConcurrent} * by `scalaz.concurrent.MVar`. */ abstract class MVar[F[_], A] { - /** Fills the `MVar` if it is empty, or blocks (asynchronously) + /** + * Fills the `MVar` if it is empty, or blocks (asynchronously) * if the `MVar` is full, until the given value is next in * line to be consumed on [[take]]. * * This operation is atomic. - ** + * * @return a task that on evaluation will complete when the * `put` operation succeeds in filling the `MVar`, * with the given value being next in line to @@ -58,7 +59,8 @@ abstract class MVar[F[_], A] { */ def put(a: A): F[Unit] - /** Empties the `MVar` if full, returning the contained value, + /** + * Empties the `MVar` if full, returning the contained value, * or blocks (asynchronously) until a value is available. * * This operation is atomic. @@ -83,12 +85,12 @@ object MVar { * * For creating an empty `MVar`: * {{{ - * MVar.cancelable[IO].empty[Int] + * MVar[IO].empty[Int] * }}} * * For creating an `MVar` with an initial value: * {{{ - * MVar.cancelable[IO]("hello") + * MVar[IO].init("hello") * }}} * * @see [[async]] @@ -113,7 +115,7 @@ object MVar { * * For creating an `MVar` with an initial value: * {{{ - * MVar.async[IO]("hello") + * MVar.async[IO].init("hello") * }}} * * @see [[apply]] @@ -126,11 +128,11 @@ object MVar { */ final class AsyncBuilder[F[_]](val F: Async[F]) extends AnyVal { /** Builds an `MVar` with an initial value. */ - def apply[A](a: A): F[MVar[F, A]] = + def init[A](a: A): F[MVar[F, A]] = F.delay(MVarAsync[F, A](a)(F)) /** Builds an empty `MVar`. */ - def empty[A]: F[MVar[F, A]] = + def empty[A](implicit F: Async[F]): F[MVar[F, A]] = F.delay(MVarAsync.empty[F, A](F)) } @@ -139,7 +141,7 @@ object MVar { */ final class ConcurrentBuilder[F[_]](val F: Concurrent[F]) extends AnyVal { /** Builds an `MVar` with an initial value. */ - def apply[A](a: A): F[MVar[F, A]] = + def init[A](a: A): F[MVar[F, A]] = F.delay(MVarConcurrent[F, A](a)(F)) /** Builds an empty `MVar`. */ From 5ec0706a0b62ba9e460b6616b992269eb3ab3102 Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Mon, 14 May 2018 19:15:03 +0300 Subject: [PATCH 03/17] Atomic read --- .../scala/cats/effect/concurrent/MVar.scala | 11 ++ .../cats/effect/internals/LinkedMap.scala | 5 +- .../cats/effect/internals/MVarAsync.scala | 145 ++++++++++-------- .../effect/internals/MVarConcurrent.scala | 123 +++++++++------ .../effect/concurrent/MVarAsyncTests.scala | 120 +++++++++++++++ .../concurrent/MVarConcurrentTests.scala | 119 ++++++++++++++ 6 files changed, 407 insertions(+), 116 deletions(-) create mode 100644 core/shared/src/test/scala/cats/effect/concurrent/MVarAsyncTests.scala create mode 100644 core/shared/src/test/scala/cats/effect/concurrent/MVarConcurrentTests.scala diff --git a/core/shared/src/main/scala/cats/effect/concurrent/MVar.scala b/core/shared/src/main/scala/cats/effect/concurrent/MVar.scala index 48dfa7881e..1a6ac4b8d2 100644 --- a/core/shared/src/main/scala/cats/effect/concurrent/MVar.scala +++ b/core/shared/src/main/scala/cats/effect/concurrent/MVar.scala @@ -69,6 +69,17 @@ abstract class MVar[F[_], A] { * a value was retrieved */ def take: F[A] + + /** + * Tries reading the current value, or blocks (asynchronously) + * until there is a value available. + * + * This operation is atomic. + * + * @return a task that on evaluation will be completed after + * a value has been read + */ + def read: F[A] } /** Builders for [[MVar]]. */ diff --git a/core/shared/src/main/scala/cats/effect/internals/LinkedMap.scala b/core/shared/src/main/scala/cats/effect/internals/LinkedMap.scala index 8df9cd3326..b498e0a7cf 100644 --- a/core/shared/src/main/scala/cats/effect/internals/LinkedMap.scala +++ b/core/shared/src/main/scala/cats/effect/internals/LinkedMap.scala @@ -67,5 +67,8 @@ private[effect] class LinkedMap[K, +V]( private[effect] object LinkedMap { def empty[K, V]: LinkedMap[K, V] = - new LinkedMap[K, V](Map.empty, LongMap.empty, 0) + emptyRef.asInstanceOf[LinkedMap[K, V]] + + private val emptyRef = + new LinkedMap[Any, Any](Map.empty, LongMap.empty, 0) } diff --git a/core/shared/src/main/scala/cats/effect/internals/MVarAsync.scala b/core/shared/src/main/scala/cats/effect/internals/MVarAsync.scala index 8b68265070..cd30c35b19 100644 --- a/core/shared/src/main/scala/cats/effect/internals/MVarAsync.scala +++ b/core/shared/src/main/scala/cats/effect/internals/MVarAsync.scala @@ -35,69 +35,100 @@ private[effect] final class MVarAsync[F[_], A] private ( private[this] val stateRef = new AtomicReference[State[A]](initial) def put(a: A): F[Unit] = - F.async { cb => - if (unsafePut(a, cb)) cb(rightUnit) - } + F.async(unsafePut(a)) def take: F[A] = - F.async { cb => - val r = unsafeTake(cb) - if (r != null) cb(Right(r)) - } - - @tailrec - private def unsafePut(a: A, await: Listener[Unit]): Boolean = { - if (a == null) throw new NullPointerException("null not supported in MVarAsync/MVar") - val current: State[A] = stateRef.get + F.async(unsafeTake) - current match { - case _: Empty[_] => - if (stateRef.compareAndSet(current, WaitForTake(a, Queue.empty))) true - else unsafePut(a, await) // retry + def read: F[A] = + F.async(unsafeRead) - case WaitForTake(value, queue) => - val update = WaitForTake(value, queue.enqueue(a -> await)) - if (stateRef.compareAndSet(current, update)) false - else unsafePut(a, await) // retry + private def streamAll(value: Either[Throwable, A], listeners: Iterable[Listener[A]]): Unit = { + val cursor = listeners.iterator + while (cursor.hasNext) + cursor.next().apply(value) + } - case current @ WaitForPut(first, _) => - if (stateRef.compareAndSet(current, current.dequeue)) { first(Right(a)); true } - else unsafePut(a, await) // retry + @tailrec + private def unsafePut(a: A)(onPut: Listener[Unit]): Unit = { + if (a == null) { + onPut(Left(new NullPointerException("null not supported in MVarAsync/MVar"))) + return + } + + stateRef.get match { + case current @ WaitForTake(value, puts) => + val update = WaitForTake(value, puts.enqueue(a -> onPut)) + if (!stateRef.compareAndSet(current, update)) + unsafePut(a)(onPut) // retry + + case current @ WaitForPut(reads, takes) => + var first: Listener[A] = null + val update: State[A] = + if (takes.isEmpty) State(a) else { + val (x, rest) = takes.dequeue + first = x + if (rest.isEmpty) State.empty[A] + else WaitForPut(Queue.empty, rest) + } + + if (!stateRef.compareAndSet(current, update)) + unsafePut(a)(onPut) // retry + else { + val value = Right(a) + // Satisfies all current `read` requests found + streamAll(value, reads) + // Satisfies the first `take` request found + if (first ne null) first(value) + // Signals completion of `put` + onPut(rightUnit) + } } } @tailrec - private def unsafeTake(await: Listener[A]): A = { + private def unsafeTake(onTake: Listener[A]): Unit = { val current: State[A] = stateRef.get current match { - case _: Empty[_] => - if (stateRef.compareAndSet(current, WaitForPut(await, Queue.empty))) - null.asInstanceOf[A] - else - unsafeTake(await) // retry - case WaitForTake(value, queue) => if (queue.isEmpty) { if (stateRef.compareAndSet(current, State.empty)) - value + // Signals completion of `take` + onTake(Right(value)) else - unsafeTake(await) - } - else { + unsafeTake(onTake) // retry + } else { val ((ax, notify), xs) = queue.dequeue - if (stateRef.compareAndSet(current, WaitForTake(ax, xs))) { - notify(rightUnit) // notification - value + val update = WaitForTake(ax, xs) + if (stateRef.compareAndSet(current, update)) { + // Signals completion of `take` + onTake(Right(value)) + // Complete the `put` request waiting on a notification + notify(rightUnit) } else { - unsafeTake(await) // retry + unsafeTake(onTake) // retry } } - case WaitForPut(first, queue) => - if (stateRef.compareAndSet(current, WaitForPut(first, queue.enqueue(await)))) - null.asInstanceOf[A] - else - unsafeTake(await) + case WaitForPut(reads, takes) => + if (!stateRef.compareAndSet(current, WaitForPut(reads, takes.enqueue(onTake)))) + unsafeTake(onTake) + } + } + + @tailrec + private def unsafeRead(onRead: Listener[A]): Unit = { + val current: State[A] = stateRef.get + current match { + case WaitForTake(value, _) => + // A value is available, so complete `read` immediately without + // changing the sate + onRead(Right(value)) + + case WaitForPut(reads, takes) => + // No value available, enqueue the callback + if (!stateRef.compareAndSet(current, WaitForPut(reads.enqueue(onRead), takes))) + unsafeRead(onRead) // retry } } } @@ -116,50 +147,34 @@ private[effect] object MVarAsync { /** Private [[State]] builders.*/ private object State { - private[this] val ref = Empty() + private[this] val ref = WaitForPut[Any](Queue.empty, Queue.empty) def apply[A](a: A): State[A] = WaitForTake(a, Queue.empty) /** `Empty` state, reusing the same instance. */ def empty[A]: State[A] = ref.asInstanceOf[State[A]] } - /** - * `MVarAsync` state signaling an empty location. - * - * Evolves into [[WaitForPut]] or [[WaitForTake]], - * depending on which operation happens first. - */ - private final case class Empty[A]() extends State[A] - /** * `MVarAsync` state signaling it has `take` callbacks * registered and we are waiting for one or multiple * `put` operations. * - * @param first is the first request waiting in line - * @param queue are the rest of the requests waiting in line, + * @param takes are the rest of the requests waiting in line, * if more than one `take` requests were registered */ - private final case class WaitForPut[A](first: Listener[A], queue: Queue[Listener[A]]) - extends State[A] { - - def dequeue: State[A] = - if (queue.isEmpty) State.empty[A] else { - val (x, xs) = queue.dequeue - WaitForPut(x, xs) - } - } + private final case class WaitForPut[A](reads: Queue[Listener[A]], takes: Queue[Listener[A]]) + extends State[A] /** * `MVarAsync` state signaling it has one or more values enqueued, * to be signaled on the next `take`. * * @param value is the first value to signal - * @param queue are the rest of the `put` requests, along with the + * @param puts are the rest of the `put` requests, along with the * callbacks that need to be called whenever the corresponding * value is first in line (i.e. when the corresponding `put` * is unblocked from the user's point of view) */ - private final case class WaitForTake[A](value: A, queue: Queue[(A, Listener[Unit])]) + private final case class WaitForTake[A](value: A, puts: Queue[(A, Listener[Unit])]) extends State[A] } diff --git a/core/shared/src/main/scala/cats/effect/internals/MVarConcurrent.scala b/core/shared/src/main/scala/cats/effect/internals/MVarConcurrent.scala index 4dc4f3a218..4846751d74 100644 --- a/core/shared/src/main/scala/cats/effect/internals/MVarConcurrent.scala +++ b/core/shared/src/main/scala/cats/effect/internals/MVarConcurrent.scala @@ -18,10 +18,8 @@ package cats.effect package internals import java.util.concurrent.atomic.AtomicReference - import cats.effect.concurrent.MVar import cats.effect.internals.Callback.{rightUnit, Type => Listener} - import scala.annotation.tailrec /** @@ -41,6 +39,9 @@ private[effect] final class MVarConcurrent[F[_], A] private ( def take: F[A] = F.cancelable(unsafeTake) + def read: F[A] = + F.cancelable(unsafeRead) + private def unregisterPut(id: Id): IO[Unit] = { @tailrec def loop(): Unit = stateRef.get() match { @@ -53,18 +54,17 @@ private[effect] final class MVarConcurrent[F[_], A] private ( IO(loop()) } + private def streamAll(value: Either[Throwable, A], listeners: LinkedMap[Id, Listener[A]]): Unit = { + val cursor = listeners.values.iterator + while (cursor.hasNext) + cursor.next().apply(value) + } + private def unsafePut(a: A)(onPut: Listener[Unit]): IO[Unit] = { @tailrec def loop(): IO[Unit] = { val current: State[A] = stateRef.get current match { - case _: Empty[_] => - if (stateRef.compareAndSet(current, WaitForTake(a, LinkedMap.empty))) { - onPut(rightUnit) - IO.unit - } else { - loop() // retry - } case WaitForTake(value, listeners) => val id = new Id val newMap = listeners.updated(id, (a, onPut)) @@ -75,24 +75,27 @@ private[effect] final class MVarConcurrent[F[_], A] private ( else loop() // retry - case current @ WaitForPut(listeners) => - if (listeners.isEmpty) { - val update = WaitForTake(a, LinkedMap.empty) - if (stateRef.compareAndSet(current, update)) { - onPut(rightUnit) - IO.unit - } else { - loop() - } - } else { - val (cb, update) = listeners.dequeue - if (stateRef.compareAndSet(current, WaitForPut(update))) { - cb(Right(a)) - onPut(rightUnit) - IO.unit - } else { - loop() + case current @ WaitForPut(reads, takes) => + var first: Listener[A] = null + val update: State[A] = + if (takes.isEmpty) State(a) else { + val (x, rest) = takes.dequeue + first = x + if (rest.isEmpty) State.empty[A] + else WaitForPut(LinkedMap.empty, rest) } + + if (!stateRef.compareAndSet(current, update)) + unsafePut(a)(onPut) + else { + val value = Right(a) + // Satisfies all current `read` requests found + streamAll(value, reads) + // Satisfies the first `take` request found + if (first ne null) first(value) + // Signals completion of `put` + onPut(rightUnit) + IO.unit } } } @@ -108,9 +111,9 @@ private[effect] final class MVarConcurrent[F[_], A] private ( private def unregisterTake(id: Id): IO[Unit] = { @tailrec def loop(): Unit = stateRef.get() match { - case current @ WaitForPut(listeners) => - val newMap = listeners - id - val update: State[A] = if (newMap.isEmpty) State.empty else WaitForPut(newMap) + case current @ WaitForPut(reads, takes) => + val newMap = takes - id + val update: State[A] = WaitForPut(reads, newMap) if (!stateRef.compareAndSet(current, update)) loop() case _ => () } @@ -121,13 +124,6 @@ private[effect] final class MVarConcurrent[F[_], A] private ( private def unsafeTake(onTake: Listener[A]): IO[Unit] = { val current: State[A] = stateRef.get current match { - case _: Empty[_] => - val id = new Id - if (stateRef.compareAndSet(current, WaitForPut(LinkedMap.empty.updated(id, onTake)))) - unregisterTake(id) - else - unsafeTake(onTake) // retry - case WaitForTake(value, queue) => if (queue.isEmpty) { if (stateRef.compareAndSet(current, State.empty)) { @@ -147,15 +143,48 @@ private[effect] final class MVarConcurrent[F[_], A] private ( } } - case WaitForPut(queue) => + case WaitForPut(reads, takes) => val id = new Id - val newQueue = queue.updated(id, onTake) - if (stateRef.compareAndSet(current, WaitForPut(newQueue))) + val newQueue = takes.updated(id, onTake) + if (stateRef.compareAndSet(current, WaitForPut(reads, newQueue))) unregisterTake(id) else unsafeTake(onTake) } } + + @tailrec + private def unsafeRead(onRead: Listener[A]): IO[Unit] = { + val current: State[A] = stateRef.get + current match { + case WaitForTake(value, _) => + // A value is available, so complete `read` immediately without + // changing the sate + onRead(Right(value)) + IO.unit + + case WaitForPut(reads, takes) => + // No value available, enqueue the callback + val id = new Id + val newQueue = reads.updated(id, onRead) + if (!stateRef.compareAndSet(current, WaitForPut(newQueue, takes))) + unsafeRead(onRead) // retry + else + unregisterRead(id) + } + } + + private def unregisterRead(id: Id): IO[Unit] = { + @tailrec def loop(): Unit = + stateRef.get() match { + case current @ WaitForPut(reads, takes) => + val newMap = reads - id + val update: State[A] = WaitForPut(newMap, takes) + if (!stateRef.compareAndSet(current, update)) loop() + case _ => () + } + IO(loop()) + } } private[effect] object MVarConcurrent { @@ -174,26 +203,19 @@ private[effect] object MVarConcurrent { /** Private [[State]] builders.*/ private object State { - private[this] val ref = Empty() + private[this] val ref = WaitForPut[Any](LinkedMap.empty, LinkedMap.empty) def apply[A](a: A): State[A] = WaitForTake(a, LinkedMap.empty) /** `Empty` state, reusing the same instance. */ def empty[A]: State[A] = ref.asInstanceOf[State[A]] } - /** - * `MVarConcurrent` state signaling an empty location. - * - * Evolves into [[WaitForPut]] or [[WaitForTake]], - * depending on which operation happens first. - */ - private final case class Empty[A]() extends State[A] - /** * `MVarConcurrent` state signaling it has `take` callbacks * registered and we are waiting for one or multiple * `put` operations. */ - private final case class WaitForPut[A](listeners: LinkedMap[Id, Listener[A]]) + private final case class WaitForPut[A]( + reads: LinkedMap[Id, Listener[A]], takes: LinkedMap[Id, Listener[A]]) extends State[A] /** @@ -206,7 +228,8 @@ private[effect] object MVarConcurrent { * value is first in line (i.e. when the corresponding `put` * is unblocked from the user's point of view) */ - private final case class WaitForTake[A](value: A, listeners: LinkedMap[Id, (A, Listener[Unit])]) + private final case class WaitForTake[A]( + value: A, listeners: LinkedMap[Id, (A, Listener[Unit])]) extends State[A] } diff --git a/core/shared/src/test/scala/cats/effect/concurrent/MVarAsyncTests.scala b/core/shared/src/test/scala/cats/effect/concurrent/MVarAsyncTests.scala new file mode 100644 index 0000000000..61e9d15a12 --- /dev/null +++ b/core/shared/src/test/scala/cats/effect/concurrent/MVarAsyncTests.scala @@ -0,0 +1,120 @@ +/* + * Copyright (c) 2017-2018 The Typelevel Cats-effect Project Developers + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect +package concurrent + +import org.scalatest.{AsyncFunSuite, Matchers} +import scala.concurrent.ExecutionContext + +class MVarAsyncTests extends AsyncFunSuite with Matchers { + implicit override def executionContext: ExecutionContext = + ExecutionContext.Implicits.global + + test("empty; put; take; put; take") { + val task = for { + av <- MVar.async[IO].empty[Int] + _ <- av.put(10) + r1 <- av.take + _ <- av.put(20) + r2 <- av.take + } yield List(r1,r2) + + for (r <- task.unsafeToFuture()) yield { + r shouldBe List(10, 20) + } + } + + test("empty; take; put; take; put") { + val task = for { + av <- MVar[IO].empty[Int] + f1 <- av.take.start + _ <- av.put(10) + f2 <- av.take.start + _ <- av.put(20) + r1 <- f1.join + r2 <- f2.join + } yield List(r1,r2) + + for (r <- task.unsafeToFuture()) yield { + r shouldBe List(10, 20) + } + } + + test("empty; put; put; put; take; take; take") { + val task = for { + av <- MVar.async[IO].empty[Int] + f1 <- av.put(10).start + f2 <- av.put(20).start + f3 <- av.put(30).start + r1 <- av.take + r2 <- av.take + r3 <- av.take + _ <- f1.join + _ <- f2.join + _ <- f3.join + } yield List(r1, r2, r3) + + for (r <- task.unsafeToFuture()) yield { + r shouldBe List(10, 20, 30) + } + } + + test("empty; take; take; take; put; put; put") { + val task = for { + av <- MVar.async[IO].empty[Int] + f1 <- av.take.start + f2 <- av.take.start + f3 <- av.take.start + _ <- av.put(10) + _ <- av.put(20) + _ <- av.put(30) + r1 <- f1.join + r2 <- f2.join + r3 <- f3.join + } yield List(r1, r2, r3) + + for (r <- task.unsafeToFuture()) yield { + r shouldBe List(10, 20, 30) + } + } + + test("initial; take; put; take") { + val task = for { + av <- MVar.async[IO].init(10) + r1 <- av.take + _ <- av.put(20) + r2 <- av.take + } yield List(r1,r2) + + for (v <- task.unsafeToFuture()) yield { + v shouldBe List(10, 20) + } + } + + + test("initial; read; take") { + val task = for { + av <- MVar.async[IO].init(10) + read <- av.read + take <- av.take + } yield read + take + + for (v <- task.unsafeToFuture()) yield { + v shouldBe 20 + } + } +} diff --git a/core/shared/src/test/scala/cats/effect/concurrent/MVarConcurrentTests.scala b/core/shared/src/test/scala/cats/effect/concurrent/MVarConcurrentTests.scala new file mode 100644 index 0000000000..70ffcf4fcd --- /dev/null +++ b/core/shared/src/test/scala/cats/effect/concurrent/MVarConcurrentTests.scala @@ -0,0 +1,119 @@ +/* + * Copyright (c) 2017-2018 The Typelevel Cats-effect Project Developers + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect +package concurrent + +import org.scalatest.{AsyncFunSuite, Matchers} +import scala.concurrent.ExecutionContext + +class MVarConcurrentTests extends AsyncFunSuite with Matchers { + implicit override def executionContext: ExecutionContext = + ExecutionContext.Implicits.global + + test("empty; put; take; put; take") { + val task = for { + av <- MVar[IO].empty[Int] + _ <- av.put(10) + r1 <- av.take + _ <- av.put(20) + r2 <- av.take + } yield List(r1,r2) + + for (r <- task.unsafeToFuture()) yield { + r shouldBe List(10, 20) + } + } + + test("empty; take; put; take; put") { + val task = for { + av <- MVar[IO].empty[Int] + f1 <- av.take.start + _ <- av.put(10) + f2 <- av.take.start + _ <- av.put(20) + r1 <- f1.join + r2 <- f2.join + } yield List(r1,r2) + + for (r <- task.unsafeToFuture()) yield { + r shouldBe List(10, 20) + } + } + + test("empty; put; put; put; take; take; take") { + val task = for { + av <- MVar[IO].empty[Int] + f1 <- av.put(10).start + f2 <- av.put(20).start + f3 <- av.put(30).start + r1 <- av.take + r2 <- av.take + r3 <- av.take + _ <- f1.join + _ <- f2.join + _ <- f3.join + } yield List(r1, r2, r3) + + for (r <- task.unsafeToFuture()) yield { + r shouldBe List(10, 20, 30) + } + } + + test("empty; take; take; take; put; put; put") { + val task = for { + av <- MVar[IO].empty[Int] + f1 <- av.take.start + f2 <- av.take.start + f3 <- av.take.start + _ <- av.put(10) + _ <- av.put(20) + _ <- av.put(30) + r1 <- f1.join + r2 <- f2.join + r3 <- f3.join + } yield List(r1, r2, r3) + + for (r <- task.unsafeToFuture()) yield { + r shouldBe List(10, 20, 30) + } + } + + test("initial; take; put; take") { + val task = for { + av <- MVar[IO].init(10) + r1 <- av.take + _ <- av.put(20) + r2 <- av.take + } yield List(r1,r2) + + for (v <- task.unsafeToFuture()) yield { + v shouldBe List(10, 20) + } + } + + test("initial; read; take") { + val task = for { + av <- MVar[IO].init(10) + read <- av.read + take <- av.take + } yield read + take + + for (v <- task.unsafeToFuture()) yield { + v shouldBe 20 + } + } +} From 26b8c61e45a0fb7b5800d590da78e75b48ba3507 Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Mon, 14 May 2018 21:35:33 +0300 Subject: [PATCH 04/17] Add tests --- .../cats/effect/internals/MVarAsync.scala | 29 +- .../effect/internals/MVarConcurrent.scala | 38 +- .../effect/concurrent/MVarAsyncTests.scala | 120 ------- .../concurrent/MVarConcurrentTests.scala | 119 ------- .../cats/effect/concurrent/MVarTests.scala | 332 ++++++++++++++++++ 5 files changed, 385 insertions(+), 253 deletions(-) delete mode 100644 core/shared/src/test/scala/cats/effect/concurrent/MVarAsyncTests.scala delete mode 100644 core/shared/src/test/scala/cats/effect/concurrent/MVarConcurrentTests.scala create mode 100644 core/shared/src/test/scala/cats/effect/concurrent/MVarTests.scala diff --git a/core/shared/src/main/scala/cats/effect/internals/MVarAsync.scala b/core/shared/src/main/scala/cats/effect/internals/MVarAsync.scala index cd30c35b19..2190c26798 100644 --- a/core/shared/src/main/scala/cats/effect/internals/MVarAsync.scala +++ b/core/shared/src/main/scala/cats/effect/internals/MVarAsync.scala @@ -32,6 +32,7 @@ private[effect] final class MVarAsync[F[_], A] private ( import MVarAsync._ + /** Shared mutable state. */ private[this] val stateRef = new AtomicReference[State[A]](initial) def put(a: A): F[Unit] = @@ -59,8 +60,11 @@ private[effect] final class MVarAsync[F[_], A] private ( stateRef.get match { case current @ WaitForTake(value, puts) => val update = WaitForTake(value, puts.enqueue(a -> onPut)) - if (!stateRef.compareAndSet(current, update)) + if (!stateRef.compareAndSet(current, update)) { + // $COVERAGE-OFF$ unsafePut(a)(onPut) // retry + // $COVERAGE-ON$ + } case current @ WaitForPut(reads, takes) => var first: Listener[A] = null @@ -72,9 +76,11 @@ private[effect] final class MVarAsync[F[_], A] private ( else WaitForPut(Queue.empty, rest) } - if (!stateRef.compareAndSet(current, update)) + if (!stateRef.compareAndSet(current, update)) { + // $COVERAGE-OFF$ unsafePut(a)(onPut) // retry - else { + // $COVERAGE-ON$ + } else { val value = Right(a) // Satisfies all current `read` requests found streamAll(value, reads) @@ -95,8 +101,11 @@ private[effect] final class MVarAsync[F[_], A] private ( if (stateRef.compareAndSet(current, State.empty)) // Signals completion of `take` onTake(Right(value)) - else + else { + // $COVERAGE-OFF$ unsafeTake(onTake) // retry + // $COVERAGE-ON$ + } } else { val ((ax, notify), xs) = queue.dequeue val update = WaitForTake(ax, xs) @@ -106,13 +115,18 @@ private[effect] final class MVarAsync[F[_], A] private ( // Complete the `put` request waiting on a notification notify(rightUnit) } else { + // $COVERAGE-OFF$ unsafeTake(onTake) // retry + // $COVERAGE-ON$ } } case WaitForPut(reads, takes) => - if (!stateRef.compareAndSet(current, WaitForPut(reads, takes.enqueue(onTake)))) + if (!stateRef.compareAndSet(current, WaitForPut(reads, takes.enqueue(onTake)))) { + // $COVERAGE-ON$ unsafeTake(onTake) + // $COVERAGE-OFF$ + } } } @@ -127,8 +141,11 @@ private[effect] final class MVarAsync[F[_], A] private ( case WaitForPut(reads, takes) => // No value available, enqueue the callback - if (!stateRef.compareAndSet(current, WaitForPut(reads.enqueue(onRead), takes))) + if (!stateRef.compareAndSet(current, WaitForPut(reads.enqueue(onRead), takes))) { + // $COVERAGE-OFF$ unsafeRead(onRead) // retry + // $COVERAGE-ON$ + } } } } diff --git a/core/shared/src/main/scala/cats/effect/internals/MVarConcurrent.scala b/core/shared/src/main/scala/cats/effect/internals/MVarConcurrent.scala index 4846751d74..d5027fc1d8 100644 --- a/core/shared/src/main/scala/cats/effect/internals/MVarConcurrent.scala +++ b/core/shared/src/main/scala/cats/effect/internals/MVarConcurrent.scala @@ -31,6 +31,7 @@ private[effect] final class MVarConcurrent[F[_], A] private ( import MVarConcurrent._ + /** Shared mutable state. */ private[this] val stateRef = new AtomicReference[State[A]](initial) def put(a: A): F[Unit] = @@ -47,7 +48,11 @@ private[effect] final class MVarConcurrent[F[_], A] private ( stateRef.get() match { case current @ WaitForTake(_, listeners) => val update = current.copy(listeners = listeners - id) - if (!stateRef.compareAndSet(current, update)) loop() + if (!stateRef.compareAndSet(current, update)) { + // $COVERAGE-OFF$ + loop() // retry + // $COVERAGE-ON$ + } case _ => () } @@ -72,8 +77,11 @@ private[effect] final class MVarConcurrent[F[_], A] private ( if (stateRef.compareAndSet(current, update)) unregisterPut(id) - else + else { + // $COVERAGE-OFF$ loop() // retry + // $COVERAGE-ON$ + } case current @ WaitForPut(reads, takes) => var first: Listener[A] = null @@ -130,7 +138,9 @@ private[effect] final class MVarConcurrent[F[_], A] private ( onTake(Right(value)) IO.unit } else { + // $COVERAGE-OFF$ unsafeTake(onTake) // retry + // $COVERAGE-ON$ } } else { val ((ax, notify), xs) = queue.dequeue @@ -139,7 +149,9 @@ private[effect] final class MVarConcurrent[F[_], A] private ( notify(rightUnit) IO.unit } else { + // $COVERAGE-OFF$ unsafeTake(onTake) // retry + // $COVERAGE-ON$ } } @@ -148,8 +160,11 @@ private[effect] final class MVarConcurrent[F[_], A] private ( val newQueue = takes.updated(id, onTake) if (stateRef.compareAndSet(current, WaitForPut(reads, newQueue))) unregisterTake(id) - else - unsafeTake(onTake) + else { + // $COVERAGE-OFF$ + unsafeTake(onTake) // retry + // $COVERAGE-ON$ + } } } @@ -167,10 +182,13 @@ private[effect] final class MVarConcurrent[F[_], A] private ( // No value available, enqueue the callback val id = new Id val newQueue = reads.updated(id, onRead) - if (!stateRef.compareAndSet(current, WaitForPut(newQueue, takes))) - unsafeRead(onRead) // retry - else + if (stateRef.compareAndSet(current, WaitForPut(newQueue, takes))) unregisterRead(id) + else { + // $COVERAGE-OFF$ + unsafeRead(onRead) // retry + // $COVERAGE-ON$ + } } } @@ -180,7 +198,11 @@ private[effect] final class MVarConcurrent[F[_], A] private ( case current @ WaitForPut(reads, takes) => val newMap = reads - id val update: State[A] = WaitForPut(newMap, takes) - if (!stateRef.compareAndSet(current, update)) loop() + if (!stateRef.compareAndSet(current, update)) { + // $COVERAGE-OFF$ + loop() + // $COVERAGE-ON$ + } case _ => () } IO(loop()) diff --git a/core/shared/src/test/scala/cats/effect/concurrent/MVarAsyncTests.scala b/core/shared/src/test/scala/cats/effect/concurrent/MVarAsyncTests.scala deleted file mode 100644 index 61e9d15a12..0000000000 --- a/core/shared/src/test/scala/cats/effect/concurrent/MVarAsyncTests.scala +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Copyright (c) 2017-2018 The Typelevel Cats-effect Project Developers - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cats.effect -package concurrent - -import org.scalatest.{AsyncFunSuite, Matchers} -import scala.concurrent.ExecutionContext - -class MVarAsyncTests extends AsyncFunSuite with Matchers { - implicit override def executionContext: ExecutionContext = - ExecutionContext.Implicits.global - - test("empty; put; take; put; take") { - val task = for { - av <- MVar.async[IO].empty[Int] - _ <- av.put(10) - r1 <- av.take - _ <- av.put(20) - r2 <- av.take - } yield List(r1,r2) - - for (r <- task.unsafeToFuture()) yield { - r shouldBe List(10, 20) - } - } - - test("empty; take; put; take; put") { - val task = for { - av <- MVar[IO].empty[Int] - f1 <- av.take.start - _ <- av.put(10) - f2 <- av.take.start - _ <- av.put(20) - r1 <- f1.join - r2 <- f2.join - } yield List(r1,r2) - - for (r <- task.unsafeToFuture()) yield { - r shouldBe List(10, 20) - } - } - - test("empty; put; put; put; take; take; take") { - val task = for { - av <- MVar.async[IO].empty[Int] - f1 <- av.put(10).start - f2 <- av.put(20).start - f3 <- av.put(30).start - r1 <- av.take - r2 <- av.take - r3 <- av.take - _ <- f1.join - _ <- f2.join - _ <- f3.join - } yield List(r1, r2, r3) - - for (r <- task.unsafeToFuture()) yield { - r shouldBe List(10, 20, 30) - } - } - - test("empty; take; take; take; put; put; put") { - val task = for { - av <- MVar.async[IO].empty[Int] - f1 <- av.take.start - f2 <- av.take.start - f3 <- av.take.start - _ <- av.put(10) - _ <- av.put(20) - _ <- av.put(30) - r1 <- f1.join - r2 <- f2.join - r3 <- f3.join - } yield List(r1, r2, r3) - - for (r <- task.unsafeToFuture()) yield { - r shouldBe List(10, 20, 30) - } - } - - test("initial; take; put; take") { - val task = for { - av <- MVar.async[IO].init(10) - r1 <- av.take - _ <- av.put(20) - r2 <- av.take - } yield List(r1,r2) - - for (v <- task.unsafeToFuture()) yield { - v shouldBe List(10, 20) - } - } - - - test("initial; read; take") { - val task = for { - av <- MVar.async[IO].init(10) - read <- av.read - take <- av.take - } yield read + take - - for (v <- task.unsafeToFuture()) yield { - v shouldBe 20 - } - } -} diff --git a/core/shared/src/test/scala/cats/effect/concurrent/MVarConcurrentTests.scala b/core/shared/src/test/scala/cats/effect/concurrent/MVarConcurrentTests.scala deleted file mode 100644 index 70ffcf4fcd..0000000000 --- a/core/shared/src/test/scala/cats/effect/concurrent/MVarConcurrentTests.scala +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Copyright (c) 2017-2018 The Typelevel Cats-effect Project Developers - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cats.effect -package concurrent - -import org.scalatest.{AsyncFunSuite, Matchers} -import scala.concurrent.ExecutionContext - -class MVarConcurrentTests extends AsyncFunSuite with Matchers { - implicit override def executionContext: ExecutionContext = - ExecutionContext.Implicits.global - - test("empty; put; take; put; take") { - val task = for { - av <- MVar[IO].empty[Int] - _ <- av.put(10) - r1 <- av.take - _ <- av.put(20) - r2 <- av.take - } yield List(r1,r2) - - for (r <- task.unsafeToFuture()) yield { - r shouldBe List(10, 20) - } - } - - test("empty; take; put; take; put") { - val task = for { - av <- MVar[IO].empty[Int] - f1 <- av.take.start - _ <- av.put(10) - f2 <- av.take.start - _ <- av.put(20) - r1 <- f1.join - r2 <- f2.join - } yield List(r1,r2) - - for (r <- task.unsafeToFuture()) yield { - r shouldBe List(10, 20) - } - } - - test("empty; put; put; put; take; take; take") { - val task = for { - av <- MVar[IO].empty[Int] - f1 <- av.put(10).start - f2 <- av.put(20).start - f3 <- av.put(30).start - r1 <- av.take - r2 <- av.take - r3 <- av.take - _ <- f1.join - _ <- f2.join - _ <- f3.join - } yield List(r1, r2, r3) - - for (r <- task.unsafeToFuture()) yield { - r shouldBe List(10, 20, 30) - } - } - - test("empty; take; take; take; put; put; put") { - val task = for { - av <- MVar[IO].empty[Int] - f1 <- av.take.start - f2 <- av.take.start - f3 <- av.take.start - _ <- av.put(10) - _ <- av.put(20) - _ <- av.put(30) - r1 <- f1.join - r2 <- f2.join - r3 <- f3.join - } yield List(r1, r2, r3) - - for (r <- task.unsafeToFuture()) yield { - r shouldBe List(10, 20, 30) - } - } - - test("initial; take; put; take") { - val task = for { - av <- MVar[IO].init(10) - r1 <- av.take - _ <- av.put(20) - r2 <- av.take - } yield List(r1,r2) - - for (v <- task.unsafeToFuture()) yield { - v shouldBe List(10, 20) - } - } - - test("initial; read; take") { - val task = for { - av <- MVar[IO].init(10) - read <- av.read - take <- av.take - } yield read + take - - for (v <- task.unsafeToFuture()) yield { - v shouldBe 20 - } - } -} diff --git a/core/shared/src/test/scala/cats/effect/concurrent/MVarTests.scala b/core/shared/src/test/scala/cats/effect/concurrent/MVarTests.scala new file mode 100644 index 0000000000..53e128c033 --- /dev/null +++ b/core/shared/src/test/scala/cats/effect/concurrent/MVarTests.scala @@ -0,0 +1,332 @@ +/* + * Copyright (c) 2017-2018 The Typelevel Cats-effect Project Developers + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect +package concurrent + +import catalysts.Platform +import org.scalatest.{AsyncFunSuite, Matchers} + +import scala.concurrent.ExecutionContext +import cats.implicits._ + +class MVarConcurrentTests extends BaseMVarTests { + def init[A](a: A): IO[MVar[IO, A]] = + MVar[IO].init(a) + + def empty[A]: IO[MVar[IO, A]] = + MVar[IO].empty[A] + + test("put is cancelable") { + val task = for { + mVar <- init(0) + _ <- mVar.put(1).start + p2 <- mVar.put(2).start + _ <- mVar.put(3).start + _ <- p2.cancel + _ <- mVar.take + r1 <- mVar.take + r3 <- mVar.take + } yield List(r1, r3) + + for (r <- task.unsafeToFuture()) yield { + r shouldBe List(1, 3) + } + } + + test("take is cancelable") { + val task = for { + mVar <- empty[Int] + t1 <- mVar.take.start + t2 <- mVar.take.start + t3 <- mVar.take.start + _ <- t2.cancel + _ <- mVar.put(1) + _ <- mVar.put(3) + r1 <- t1.join + r3 <- t3.join + } yield List(r1, r3) + + for (r <- task.unsafeToFuture()) yield { + r shouldBe List(1, 3) + } + } +} + +class MVarAsyncTests extends BaseMVarTests { + def init[A](a: A): IO[MVar[IO, A]] = + MVar.async[IO].init(a) + + def empty[A]: IO[MVar[IO, A]] = + MVar.async[IO].empty[A] +} + +abstract class BaseMVarTests extends AsyncFunSuite with Matchers { + implicit override def executionContext: ExecutionContext = + ExecutionContext.Implicits.global + + def init[A](a: A): IO[MVar[IO, A]] + def empty[A]: IO[MVar[IO, A]] + + test("empty; put; take; put; take") { + val task = for { + av <- empty[Int] + _ <- av.put(10) + r1 <- av.take + _ <- av.put(20) + r2 <- av.take + } yield List(r1,r2) + + for (r <- task.unsafeToFuture()) yield { + r shouldBe List(10, 20) + } + } + + test("empty; take; put; take; put") { + val task = for { + av <- empty[Int] + f1 <- av.take.start + _ <- av.put(10) + f2 <- av.take.start + _ <- av.put(20) + r1 <- f1.join + r2 <- f2.join + } yield List(r1,r2) + + for (r <- task.unsafeToFuture()) yield { + r shouldBe List(10, 20) + } + } + + test("empty; put; put; put; take; take; take") { + val task = for { + av <- empty[Int] + f1 <- av.put(10).start + f2 <- av.put(20).start + f3 <- av.put(30).start + r1 <- av.take + r2 <- av.take + r3 <- av.take + _ <- f1.join + _ <- f2.join + _ <- f3.join + } yield List(r1, r2, r3) + + for (r <- task.unsafeToFuture()) yield { + r shouldBe List(10, 20, 30) + } + } + + test("empty; take; take; take; put; put; put") { + val task = for { + av <- empty[Int] + f1 <- av.take.start + f2 <- av.take.start + f3 <- av.take.start + _ <- av.put(10) + _ <- av.put(20) + _ <- av.put(30) + r1 <- f1.join + r2 <- f2.join + r3 <- f3.join + } yield List(r1, r2, r3) + + for (r <- task.unsafeToFuture()) yield { + r shouldBe List(10, 20, 30) + } + } + + test("initial; take; put; take") { + val task = for { + av <- init(10) + r1 <- av.take + _ <- av.put(20) + r2 <- av.take + } yield List(r1,r2) + + for (v <- task.unsafeToFuture()) yield { + v shouldBe List(10, 20) + } + } + + + test("initial; read; take") { + val task = for { + av <- init(10) + read <- av.read + take <- av.take + } yield read + take + + for (v <- task.unsafeToFuture()) yield { + v shouldBe 20 + } + } + + test("empty; read; put") { + val task = for { + av <- empty[Int] + read <- av.read.start + _ <- av.put(10) + r <- read.join + } yield r + + for (v <- task.unsafeToFuture()) yield { + v shouldBe 10 + } + } + + test("put(null) raises NullPointerException") { + val task = empty[String].flatMap(_.put(null)).attempt + + for (Left(v) <- task.unsafeToFuture()) yield { + v shouldBe an[NullPointerException] + } + } + + test("producer-consumer parallel loop") { + // Signaling option, because we need to detect completion + type Channel[A] = MVar[IO, Option[A]] + + def producer(ch: Channel[Int], list: List[Int]): IO[Unit] = + list match { + case Nil => + ch.put(None) // we are done! + case head :: tail => + // next please + ch.put(Some(head)).flatMap(_ => producer(ch, tail)) + } + + def consumer(ch: Channel[Int], sum: Long): IO[Long] = + ch.take.flatMap { + case Some(x) => + // next please + consumer(ch, sum + x) + case None => + IO.pure(sum) // we are done! + } + + val count = 1000000 + val sumTask = for { + channel <- init(Option(0)) + // Ensure they run in parallel + producerFiber <- (IO.shift *> producer(channel, (0 until count).toList)).start + consumerFiber <- (IO.shift *> consumer(channel, 0L)).start + _ <- producerFiber.join + sum <- consumerFiber.join + } yield sum + + // Evaluate + for (r <- sumTask.unsafeToFuture()) yield { + r shouldBe (count.toLong * (count - 1) / 2) + } + } + + test("stack overflow test") { + // Signaling option, because we need to detect completion + type Channel[A] = MVar[IO, Option[A]] + val count = 100000 + + def consumer(ch: Channel[Int], sum: Long): IO[Long] = + ch.take.flatMap { + case Some(x) => + // next please + consumer(ch, sum + x) + case None => + IO.pure(sum) // we are done! + } + + def exec(channel: Channel[Int]): IO[Long] = { + val consumerTask = consumer(channel, 0L) + val tasks = for (i <- 0 until count) yield channel.put(Some(i)) + val producerTask = tasks.toList.parSequence.flatMap(_ => channel.put(None)) + + for { + f1 <- producerTask.start + f2 <- consumerTask.start + _ <- f1.join + r <- f2.join + } yield r + } + + val task = init(Option(0)).flatMap(exec) + for (r <- task.unsafeToFuture()) yield { + r shouldBe count.toLong * (count - 1) / 2 + } + } + + test("take/put test is stack safe") { + def loop(n: Int, acc: Int)(ch: MVar[IO, Int]): IO[Int] = + if (n <= 0) IO.pure(acc) else + ch.take.flatMap { x => + ch.put(1).flatMap(_ => loop(n - 1, acc + x)(ch)) + } + + val count = if (Platform.isJvm) 100000 else 5000 + val task = init(1).flatMap(loop(count, 0)) + + for (r <- task.unsafeToFuture()) yield { + r shouldBe count + } + } + + def testStackSequential(channel: MVar[IO, Int]): (Int, IO[Int], IO[Unit]) = { + val count = if (Platform.isJvm) 100000 else 5000 + + def readLoop(n: Int, acc: Int): IO[Int] = { + if (n > 0) + channel.read *> + channel.take.flatMap(_ => readLoop(n - 1, acc + 1)) + else + IO.pure(acc) + } + + def writeLoop(n: Int): IO[Unit] = { + if (n > 0) + channel.put(1).flatMap(_ => writeLoop(n - 1)) + else + IO.pure(()) + } + + (count, readLoop(count, 0), writeLoop(count)) + } + + test("put is stack safe when repeated sequentially") { + val task = for { + channel <- empty[Int] + (count, reads, writes) = testStackSequential(channel) + _ <- writes.start + r <- reads + } yield r == count + + for (r <- task.unsafeToFuture()) yield { + r shouldBe true + } + } + + test("take is stack safe when repeated sequentially") { + val task = for { + channel <- empty[Int] + (count, reads, writes) = testStackSequential(channel) + fr <- reads.start + _ <- writes + r <- fr.join + } yield r == count + + for (r <- task.unsafeToFuture()) yield { + r shouldBe true + } + } +} From 3b6d8d7ef276f7ab70363ebc9ba9c97f66083ce8 Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Mon, 14 May 2018 22:35:57 +0300 Subject: [PATCH 05/17] Add documentation --- .../scala/cats/effect/concurrent/MVar.scala | 5 +- .../cats/effect/internals/MVarAsync.scala | 4 +- .../effect/internals/MVarConcurrent.scala | 20 ++- .../cats/effect/concurrent/MVarTests.scala | 20 ++- .../main/resources/microsite/data/menu.yml | 6 +- site/src/main/tut/datatypes/index.md | 4 + site/src/main/tut/datatypes/mvar.md | 156 ++++++++++++++++++ 7 files changed, 202 insertions(+), 13 deletions(-) create mode 100644 site/src/main/tut/datatypes/mvar.md diff --git a/core/shared/src/main/scala/cats/effect/concurrent/MVar.scala b/core/shared/src/main/scala/cats/effect/concurrent/MVar.scala index 1a6ac4b8d2..a5258732ce 100644 --- a/core/shared/src/main/scala/cats/effect/concurrent/MVar.scala +++ b/core/shared/src/main/scala/cats/effect/concurrent/MVar.scala @@ -23,13 +23,16 @@ import cats.effect.internals.{MVarAsync, MVarConcurrent} * A mutable location, that is either empty or contains * a value of type `A`. * - * It has 2 fundamental atomic operations: + * It has 3 fundamental atomic operations: * * - [[put]] which fills the var if empty, or blocks * (asynchronously) until the var is empty again * - [[take]] which empties the var if full, returning the contained * value, or blocks (asynchronously) otherwise until there is * a value to pull + * - [[read]] which reads the current value without touching it, + * assuming there is one, or otherwise it waits until a value + * is made available via `put` * * The `MVar` is appropriate for building synchronization * primitives and performing simple inter-thread communications. diff --git a/core/shared/src/main/scala/cats/effect/internals/MVarAsync.scala b/core/shared/src/main/scala/cats/effect/internals/MVarAsync.scala index 2190c26798..6cd0f9aaec 100644 --- a/core/shared/src/main/scala/cats/effect/internals/MVarAsync.scala +++ b/core/shared/src/main/scala/cats/effect/internals/MVarAsync.scala @@ -123,9 +123,9 @@ private[effect] final class MVarAsync[F[_], A] private ( case WaitForPut(reads, takes) => if (!stateRef.compareAndSet(current, WaitForPut(reads, takes.enqueue(onTake)))) { - // $COVERAGE-ON$ - unsafeTake(onTake) // $COVERAGE-OFF$ + unsafeTake(onTake) + // $COVERAGE-ON$ } } } diff --git a/core/shared/src/main/scala/cats/effect/internals/MVarConcurrent.scala b/core/shared/src/main/scala/cats/effect/internals/MVarConcurrent.scala index d5027fc1d8..00631908a3 100644 --- a/core/shared/src/main/scala/cats/effect/internals/MVarConcurrent.scala +++ b/core/shared/src/main/scala/cats/effect/internals/MVarConcurrent.scala @@ -75,9 +75,9 @@ private[effect] final class MVarConcurrent[F[_], A] private ( val newMap = listeners.updated(id, (a, onPut)) val update = WaitForTake(value, newMap) - if (stateRef.compareAndSet(current, update)) + if (stateRef.compareAndSet(current, update)) { unregisterPut(id) - else { + } else { // $COVERAGE-OFF$ loop() // retry // $COVERAGE-ON$ @@ -93,9 +93,7 @@ private[effect] final class MVarConcurrent[F[_], A] private ( else WaitForPut(LinkedMap.empty, rest) } - if (!stateRef.compareAndSet(current, update)) - unsafePut(a)(onPut) - else { + if (stateRef.compareAndSet(current, update)) { val value = Right(a) // Satisfies all current `read` requests found streamAll(value, reads) @@ -104,6 +102,10 @@ private[effect] final class MVarConcurrent[F[_], A] private ( // Signals completion of `put` onPut(rightUnit) IO.unit + } else { + // $COVERAGE-OFF$ + unsafePut(a)(onPut) // retry + // $COVERAGE-ON$ } } } @@ -122,8 +124,12 @@ private[effect] final class MVarConcurrent[F[_], A] private ( case current @ WaitForPut(reads, takes) => val newMap = takes - id val update: State[A] = WaitForPut(reads, newMap) - if (!stateRef.compareAndSet(current, update)) loop() - case _ => () + if (!stateRef.compareAndSet(current, update)) { + // $COVERAGE-OFF$ + loop() + // $COVERAGE-ON$ + } + case _ => } IO(loop()) } diff --git a/core/shared/src/test/scala/cats/effect/concurrent/MVarTests.scala b/core/shared/src/test/scala/cats/effect/concurrent/MVarTests.scala index 53e128c033..268f573ec6 100644 --- a/core/shared/src/test/scala/cats/effect/concurrent/MVarTests.scala +++ b/core/shared/src/test/scala/cats/effect/concurrent/MVarTests.scala @@ -18,10 +18,10 @@ package cats.effect package concurrent import catalysts.Platform +import cats.implicits._ import org.scalatest.{AsyncFunSuite, Matchers} - import scala.concurrent.ExecutionContext -import cats.implicits._ +import scala.concurrent.duration._ class MVarConcurrentTests extends BaseMVarTests { def init[A](a: A): IO[MVar[IO, A]] = @@ -64,6 +64,22 @@ class MVarConcurrentTests extends BaseMVarTests { r shouldBe List(1, 3) } } + + test("read is cancelable") { + val task = for { + mVar <- MVar[IO].empty[Int] + finished <- Deferred.async[IO, Int] + fiber <- mVar.read.flatMap(finished.complete).start + _ <- fiber.cancel + _ <- mVar.put(10) + fallback = IO.sleep(100.millis) *> IO.pure(0) + v <- IO.race(finished.get, fallback) + } yield v + + for (r <- task.unsafeToFuture()) yield { + r shouldBe Right(0) + } + } } class MVarAsyncTests extends BaseMVarTests { diff --git a/site/src/main/resources/microsite/data/menu.yml b/site/src/main/resources/microsite/data/menu.yml index b8f37c7e59..deb8f82a63 100644 --- a/site/src/main/resources/microsite/data/menu.yml +++ b/site/src/main/resources/microsite/data/menu.yml @@ -15,7 +15,11 @@ options: - title: Timer url: datatypes/timer.html menu_section: timer - + + - title: MVar + url: datatypes/mvar.html + menu_section: mvar + - title: Type Classes url: typeclasses/ menu_section: typeclasses diff --git a/site/src/main/tut/datatypes/index.md b/site/src/main/tut/datatypes/index.md index 049abb1324..fe15c1850d 100644 --- a/site/src/main/tut/datatypes/index.md +++ b/site/src/main/tut/datatypes/index.md @@ -9,3 +9,7 @@ position: 1 - **[IO](./io.html)**: data type for encoding side effects as pure values - **[Fiber](./fiber.html)**: the pure result of a [Concurrent](../typeclasses/concurrent.html) data type being started concurrently and that can be either joined or canceled - **[Timer](./timer.html)**: a pure scheduler exposing operations for measuring time and for delaying execution + +## Concurrent + +- **[MVar](./mvar.html)**: a purely functional concurrency primitive that works like a concurrent queue diff --git a/site/src/main/tut/datatypes/mvar.md b/site/src/main/tut/datatypes/mvar.md new file mode 100644 index 0000000000..49fffa843a --- /dev/null +++ b/site/src/main/tut/datatypes/mvar.md @@ -0,0 +1,156 @@ +--- +layout: docsplus +title: "IO" +number: 12 +source: "shared/src/main/scala/cats/effect/concurrent/MVar.scala" +scaladoc: "#cats.effect.concurrent.MVar" +--- + +An `MVar` is a mutable location that can be empty or contains a value, +asynchronously blocking reads when empty and blocking writes when full. + +## Introduction + +Use-cases: + +1. As synchronized, thread-safe mutable variables +2. As channels, with `take` and `put` acting as "receive" and "send" +3. As a binary semaphore, with `take` and `put` acting as "acquire" and "release" + +It has two fundamental (atomic) operations: + +- `put`: fills the `MVar` if it is empty, or blocks (asynchronously) + if the `MVar` is full, until the given value is next in line to be + consumed on `take` +- `take`: tries reading the current value, or blocks (asynchronously) + until there is a value available, at which point the operation resorts + to a `take` followed by a `put` + +An additional but non-atomic operation is `read`, which tries reading the +current value, or blocks (asynchronously) until there is a value available, +at which point the operation resorts to a `take` followed by a `put`. + +

+In this context "asynchronous blocking" means that we are not blocking +any threads. Instead the implementation uses callbacks to notify clients +when the operation has finished (notifications exposed by means of [Task](./task.html)) +and it thus works on top of Javascript as well. +

+ +### Inspiration + +This data type is inspired by `Control.Concurrent.MVar` from Haskell, introduced in the paper +[Concurrent Haskell](http://research.microsoft.com/~simonpj/papers/concurrent-haskell.ps.gz), +by Simon Peyton Jones, Andrew Gordon and Sigbjorn Finne, though some details of +their implementation are changed (in particular, a put on a full `MVar` used +to error, but now merely blocks). + +Appropriate for building synchronization primitives and performing simple +interthread communication, it's the equivalent of a `BlockingQueue(capacity = 1)`, +except that there's no actual thread blocking involved and it is powered by `Task`. + +## Use-case: Synchronized Mutable Variables + +```tut:invisible +import cats.effect.laws.util.TestContext +implicit val ec = TestContext() +``` + +```tut:silent +import cats.effect._ +import cats.effect.concurrent._ +import cats.syntax.all._ + +def sum(state: MVar[IO,Int], list: List[Int]): IO[Int] = + list match { + case Nil => state.take + case x :: xs => + state.take.flatMap { current => + state.put(current + x).flatMap(_ => sum(state, xs)) + } + } + +MVar[IO].init(0).flatMap(sum(_, (0 until 100).toList)) +``` + +This sample isn't very useful, except to show how `MVar` can be used +as a variable. The `take` and `put` operations are atomic. +The `take` call will (asynchronously) block if there isn't a value +available, whereas the call to `put` blocks if the `MVar` already +has a value in it waiting to be consumed. + +Obviously after the call for `take` and before the call for `put` happens +we could have concurrent logic that can update the same variable. +While the two operations are atomic by themselves, a combination of them +isn't atomic (i.e. atomic operations don't compose), therefore if we want +this sample to be *safe*, then we need extra synchronization. + +## Use-case: Asynchronous Lock (Binary Semaphore, Mutex) + +The `take` operation can act as "acquire" and `put` can act as the "release". +Let's do it: + +```tut:silent +final class MLock(mvar: MVar[IO, Unit]) { + def acquire: IO[Unit] = + mvar.take + + def release: IO[Unit] = + mvar.put(()) + + def greenLight[A](fa: IO[A]): IO[A] = + acquire.bracket(_ => fa)(_ => release) +} + +object MLock { + def apply(): IO[MLock] = + MVar[IO].empty[Unit].map(ref => new MLock(ref)) +} +``` + +## Use-case: Producer/Consumer Channel + +An obvious use-case is to model a simple producer-consumer channel. + +Say that you have a producer that needs to push events. +But we also need some back-pressure, so we need to wait on the +consumer to consume the last event before being able to generate +a new event. + +```tut:silent +// Signaling option, because we need to detect completion +type Channel[A] = MVar[IO, Option[A]] + +def producer(ch: Channel[Int], list: List[Int]): IO[Unit] = + list match { + case Nil => + ch.put(None) // we are done! + case head :: tail => + // next please + ch.put(Some(head)).flatMap(_ => producer(ch, tail)) + } + +def consumer(ch: Channel[Int], sum: Long): IO[Long] = + ch.take.flatMap { + case Some(x) => + // next please + consumer(ch, sum + x) + case None => + IO.pure(sum) // we are done! + } + +for { + channel <- MVar[IO].empty[Option[Int]] + count = 100000 + producerTask = IO.shift *> producer(channel, (0 until count).toList) + consumerTask = IO.shift *> consumer(channel, 0L) + + fp <- producerTask.start + fc <- consumerTask.start + _ <- fp.join + sum <- fc.join +} yield sum +``` + +Running this will work as expected. Our `producer` pushes values +into our `MVar` and our `consumer` will consume all of those values. From 88fc481bb39dd32eeb9710bd37b548cf77e1bde6 Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Mon, 14 May 2018 22:39:40 +0300 Subject: [PATCH 06/17] Relax tests --- .../src/test/scala/cats/effect/concurrent/MVarTests.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/shared/src/test/scala/cats/effect/concurrent/MVarTests.scala b/core/shared/src/test/scala/cats/effect/concurrent/MVarTests.scala index 268f573ec6..ed4c7da6c6 100644 --- a/core/shared/src/test/scala/cats/effect/concurrent/MVarTests.scala +++ b/core/shared/src/test/scala/cats/effect/concurrent/MVarTests.scala @@ -234,7 +234,7 @@ abstract class BaseMVarTests extends AsyncFunSuite with Matchers { IO.pure(sum) // we are done! } - val count = 1000000 + val count = 10000 val sumTask = for { channel <- init(Option(0)) // Ensure they run in parallel @@ -253,7 +253,7 @@ abstract class BaseMVarTests extends AsyncFunSuite with Matchers { test("stack overflow test") { // Signaling option, because we need to detect completion type Channel[A] = MVar[IO, Option[A]] - val count = 100000 + val count = 10000 def consumer(ch: Channel[Int], sum: Long): IO[Long] = ch.take.flatMap { @@ -290,7 +290,7 @@ abstract class BaseMVarTests extends AsyncFunSuite with Matchers { ch.put(1).flatMap(_ => loop(n - 1, acc + x)(ch)) } - val count = if (Platform.isJvm) 100000 else 5000 + val count = if (Platform.isJvm) 10000 else 5000 val task = init(1).flatMap(loop(count, 0)) for (r <- task.unsafeToFuture()) yield { @@ -299,7 +299,7 @@ abstract class BaseMVarTests extends AsyncFunSuite with Matchers { } def testStackSequential(channel: MVar[IO, Int]): (Int, IO[Int], IO[Unit]) = { - val count = if (Platform.isJvm) 100000 else 5000 + val count = if (Platform.isJvm) 10000 else 5000 def readLoop(n: Int, acc: Int): IO[Int] = { if (n > 0) From 5a91dcf51edda950a8fe501f5ed08417e8c5c73e Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Tue, 15 May 2018 08:52:56 +0300 Subject: [PATCH 07/17] Remove null check --- .../cats/effect/internals/MVarAsync.scala | 15 +-- .../effect/internals/MVarConcurrent.scala | 92 +++++++++---------- .../cats/effect/concurrent/MVarTests.scala | 11 ++- 3 files changed, 59 insertions(+), 59 deletions(-) diff --git a/core/shared/src/main/scala/cats/effect/internals/MVarAsync.scala b/core/shared/src/main/scala/cats/effect/internals/MVarAsync.scala index 6cd0f9aaec..2e03bd2152 100644 --- a/core/shared/src/main/scala/cats/effect/internals/MVarAsync.scala +++ b/core/shared/src/main/scala/cats/effect/internals/MVarAsync.scala @@ -19,7 +19,7 @@ package internals import java.util.concurrent.atomic.AtomicReference import cats.effect.concurrent.MVar -import cats.effect.internals.Callback.{Type => Listener, rightUnit} +import cats.effect.internals.Callback.rightUnit import scala.annotation.tailrec import scala.collection.immutable.Queue @@ -44,7 +44,7 @@ private[effect] final class MVarAsync[F[_], A] private ( def read: F[A] = F.async(unsafeRead) - private def streamAll(value: Either[Throwable, A], listeners: Iterable[Listener[A]]): Unit = { + private def streamAll(value: Either[Nothing, A], listeners: Iterable[Listener[A]]): Unit = { val cursor = listeners.iterator while (cursor.hasNext) cursor.next().apply(value) @@ -52,11 +52,6 @@ private[effect] final class MVarAsync[F[_], A] private ( @tailrec private def unsafePut(a: A)(onPut: Listener[Unit]): Unit = { - if (a == null) { - onPut(Left(new NullPointerException("null not supported in MVarAsync/MVar"))) - return - } - stateRef.get match { case current @ WaitForTake(value, puts) => val update = WaitForTake(value, puts.enqueue(a -> onPut)) @@ -159,6 +154,12 @@ private[effect] object MVarAsync { def empty[F[_], A](implicit F: Async[F]): MVar[F, A] = new MVarAsync[F, A](State.empty) + /** + * Internal API — Matches the callack type in `cats.effect.Async`, + * but we don't care about about the error. + */ + private type Listener[-A] = Either[Nothing, A] => Unit + /** ADT modelling the internal state of `MVar`. */ private sealed trait State[A] diff --git a/core/shared/src/main/scala/cats/effect/internals/MVarConcurrent.scala b/core/shared/src/main/scala/cats/effect/internals/MVarConcurrent.scala index 00631908a3..ae6793f71d 100644 --- a/core/shared/src/main/scala/cats/effect/internals/MVarConcurrent.scala +++ b/core/shared/src/main/scala/cats/effect/internals/MVarConcurrent.scala @@ -19,7 +19,7 @@ package internals import java.util.concurrent.atomic.AtomicReference import cats.effect.concurrent.MVar -import cats.effect.internals.Callback.{rightUnit, Type => Listener} +import cats.effect.internals.Callback.rightUnit import scala.annotation.tailrec /** @@ -59,62 +59,53 @@ private[effect] final class MVarConcurrent[F[_], A] private ( IO(loop()) } - private def streamAll(value: Either[Throwable, A], listeners: LinkedMap[Id, Listener[A]]): Unit = { + private def streamAll(value: Either[Nothing, A], listeners: LinkedMap[Id, Listener[A]]): Unit = { val cursor = listeners.values.iterator while (cursor.hasNext) cursor.next().apply(value) } - private def unsafePut(a: A)(onPut: Listener[Unit]): IO[Unit] = { - @tailrec def loop(): IO[Unit] = { - val current: State[A] = stateRef.get - - current match { - case WaitForTake(value, listeners) => - val id = new Id - val newMap = listeners.updated(id, (a, onPut)) - val update = WaitForTake(value, newMap) + @tailrec private def unsafePut(a: A)(onPut: Listener[Unit]): IO[Unit] = { + val current: State[A] = stateRef.get - if (stateRef.compareAndSet(current, update)) { - unregisterPut(id) - } else { - // $COVERAGE-OFF$ - loop() // retry - // $COVERAGE-ON$ - } + current match { + case WaitForTake(value, listeners) => + val id = new Id + val newMap = listeners.updated(id, (a, onPut)) + val update = WaitForTake(value, newMap) - case current @ WaitForPut(reads, takes) => - var first: Listener[A] = null - val update: State[A] = - if (takes.isEmpty) State(a) else { - val (x, rest) = takes.dequeue - first = x - if (rest.isEmpty) State.empty[A] - else WaitForPut(LinkedMap.empty, rest) - } + if (stateRef.compareAndSet(current, update)) { + unregisterPut(id) + } else { + // $COVERAGE-OFF$ + unsafePut(a)(onPut) // retry + // $COVERAGE-ON$ + } - if (stateRef.compareAndSet(current, update)) { - val value = Right(a) - // Satisfies all current `read` requests found - streamAll(value, reads) - // Satisfies the first `take` request found - if (first ne null) first(value) - // Signals completion of `put` - onPut(rightUnit) - IO.unit - } else { - // $COVERAGE-OFF$ - unsafePut(a)(onPut) // retry - // $COVERAGE-ON$ + case current @ WaitForPut(reads, takes) => + var first: Listener[A] = null + val update: State[A] = + if (takes.isEmpty) State(a) else { + val (x, rest) = takes.dequeue + first = x + if (rest.isEmpty) State.empty[A] + else WaitForPut(LinkedMap.empty, rest) } - } - } - if (a == null) { - onPut(Left(new NullPointerException("null not supported in MVar"))) - IO.unit - } else { - loop() + if (stateRef.compareAndSet(current, update)) { + val value = Right(a) + // Satisfies all current `read` requests found + streamAll(value, reads) + // Satisfies the first `take` request found + if (first ne null) first(value) + // Signals completion of `put` + onPut(rightUnit) + IO.unit + } else { + // $COVERAGE-OFF$ + unsafePut(a)(onPut) // retry + // $COVERAGE-ON$ + } } } @@ -224,6 +215,13 @@ private[effect] object MVarConcurrent { def empty[F[_], A](implicit F: Concurrent[F]): MVar[F, A] = new MVarConcurrent[F, A](State.empty) + /** + * Internal API — Matches the callack type in `cats.effect.Async`, + * but we don't care about about the error. + */ + private type Listener[-A] = Either[Nothing, A] => Unit + + /** Used with [[LinkedMap]] to identify callbacks that need to be cancelled. */ private final class Id extends Serializable /** ADT modelling the internal state of `MVar`. */ diff --git a/core/shared/src/test/scala/cats/effect/concurrent/MVarTests.scala b/core/shared/src/test/scala/cats/effect/concurrent/MVarTests.scala index ed4c7da6c6..fe697783e0 100644 --- a/core/shared/src/test/scala/cats/effect/concurrent/MVarTests.scala +++ b/core/shared/src/test/scala/cats/effect/concurrent/MVarTests.scala @@ -204,11 +204,12 @@ abstract class BaseMVarTests extends AsyncFunSuite with Matchers { } } - test("put(null) raises NullPointerException") { - val task = empty[String].flatMap(_.put(null)).attempt - - for (Left(v) <- task.unsafeToFuture()) yield { - v shouldBe an[NullPointerException] + test("put(null) works") { + val task = empty[String].flatMap { mvar => + mvar.put(null) *> mvar.read + } + for (v <- task.unsafeToFuture()) yield { + v shouldBe null } } From 7cd538a0daea3d9757c56ca727430259ad8043a1 Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Tue, 15 May 2018 08:55:14 +0300 Subject: [PATCH 08/17] LinkedMap[Nothing, Nothing] instead of LinkedMap[Any, Any] --- .../shared/src/main/scala/cats/effect/internals/LinkedMap.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/cats/effect/internals/LinkedMap.scala b/core/shared/src/main/scala/cats/effect/internals/LinkedMap.scala index b498e0a7cf..d6e7222e7c 100644 --- a/core/shared/src/main/scala/cats/effect/internals/LinkedMap.scala +++ b/core/shared/src/main/scala/cats/effect/internals/LinkedMap.scala @@ -70,5 +70,5 @@ private[effect] object LinkedMap { emptyRef.asInstanceOf[LinkedMap[K, V]] private val emptyRef = - new LinkedMap[Any, Any](Map.empty, LongMap.empty, 0) + new LinkedMap[Nothing, Nothing](Map.empty, LongMap.empty, 0) } From a6dfa4b68f7dbac8de3cc3e013865434736b8eb1 Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Tue, 15 May 2018 09:12:09 +0300 Subject: [PATCH 09/17] Refactoring --- .../effect/internals/MVarConcurrent.scala | 115 ++++++++---------- 1 file changed, 53 insertions(+), 62 deletions(-) diff --git a/core/shared/src/main/scala/cats/effect/internals/MVarConcurrent.scala b/core/shared/src/main/scala/cats/effect/internals/MVarConcurrent.scala index ae6793f71d..2b180be619 100644 --- a/core/shared/src/main/scala/cats/effect/internals/MVarConcurrent.scala +++ b/core/shared/src/main/scala/cats/effect/internals/MVarConcurrent.scala @@ -43,39 +43,15 @@ private[effect] final class MVarConcurrent[F[_], A] private ( def read: F[A] = F.cancelable(unsafeRead) - private def unregisterPut(id: Id): IO[Unit] = { - @tailrec def loop(): Unit = - stateRef.get() match { - case current @ WaitForTake(_, listeners) => - val update = current.copy(listeners = listeners - id) - if (!stateRef.compareAndSet(current, update)) { - // $COVERAGE-OFF$ - loop() // retry - // $COVERAGE-ON$ - } - case _ => - () - } - IO(loop()) - } - - private def streamAll(value: Either[Nothing, A], listeners: LinkedMap[Id, Listener[A]]): Unit = { - val cursor = listeners.values.iterator - while (cursor.hasNext) - cursor.next().apply(value) - } - @tailrec private def unsafePut(a: A)(onPut: Listener[Unit]): IO[Unit] = { - val current: State[A] = stateRef.get - - current match { - case WaitForTake(value, listeners) => + stateRef.get match { + case current @ WaitForTake(value, listeners) => val id = new Id val newMap = listeners.updated(id, (a, onPut)) val update = WaitForTake(value, newMap) if (stateRef.compareAndSet(current, update)) { - unregisterPut(id) + IO(unsafeCancelPut(id)) } else { // $COVERAGE-OFF$ unsafePut(a)(onPut) // retry @@ -109,27 +85,24 @@ private[effect] final class MVarConcurrent[F[_], A] private ( } } - private def unregisterTake(id: Id): IO[Unit] = { - @tailrec def loop(): Unit = - stateRef.get() match { - case current @ WaitForPut(reads, takes) => - val newMap = takes - id - val update: State[A] = WaitForPut(reads, newMap) - if (!stateRef.compareAndSet(current, update)) { - // $COVERAGE-OFF$ - loop() - // $COVERAGE-ON$ - } - case _ => - } - IO(loop()) - } + // Impure function meant to cancel the put request + @tailrec private def unsafeCancelPut(id: Id): Unit = + stateRef.get() match { + case current @ WaitForTake(_, listeners) => + val update = current.copy(listeners = listeners - id) + if (!stateRef.compareAndSet(current, update)) { + // $COVERAGE-OFF$ + unsafeCancelPut(id) // retry + // $COVERAGE-ON$ + } + case _ => + () + } @tailrec private def unsafeTake(onTake: Listener[A]): IO[Unit] = { - val current: State[A] = stateRef.get - current match { - case WaitForTake(value, queue) => + stateRef.get match { + case current @ WaitForTake(value, queue) => if (queue.isEmpty) { if (stateRef.compareAndSet(current, State.empty)) { onTake(Right(value)) @@ -152,11 +125,11 @@ private[effect] final class MVarConcurrent[F[_], A] private ( } } - case WaitForPut(reads, takes) => + case current @ WaitForPut(reads, takes) => val id = new Id val newQueue = takes.updated(id, onTake) if (stateRef.compareAndSet(current, WaitForPut(reads, newQueue))) - unregisterTake(id) + IO(unsafeCancelTake(id)) else { // $COVERAGE-OFF$ unsafeTake(onTake) // retry @@ -165,6 +138,20 @@ private[effect] final class MVarConcurrent[F[_], A] private ( } } + @tailrec private def unsafeCancelTake(id: Id): Unit = + stateRef.get() match { + case current @ WaitForPut(reads, takes) => + val newMap = takes - id + val update: State[A] = WaitForPut(reads, newMap) + if (!stateRef.compareAndSet(current, update)) { + // $COVERAGE-OFF$ + unsafeCancelTake(id) + // $COVERAGE-ON$ + } + case _ => + } + + @tailrec private def unsafeRead(onRead: Listener[A]): IO[Unit] = { val current: State[A] = stateRef.get @@ -180,7 +167,7 @@ private[effect] final class MVarConcurrent[F[_], A] private ( val id = new Id val newQueue = reads.updated(id, onRead) if (stateRef.compareAndSet(current, WaitForPut(newQueue, takes))) - unregisterRead(id) + IO(unsafeCancelRead(id)) else { // $COVERAGE-OFF$ unsafeRead(onRead) // retry @@ -189,20 +176,24 @@ private[effect] final class MVarConcurrent[F[_], A] private ( } } - private def unregisterRead(id: Id): IO[Unit] = { - @tailrec def loop(): Unit = - stateRef.get() match { - case current @ WaitForPut(reads, takes) => - val newMap = reads - id - val update: State[A] = WaitForPut(newMap, takes) - if (!stateRef.compareAndSet(current, update)) { - // $COVERAGE-OFF$ - loop() - // $COVERAGE-ON$ - } - case _ => () - } - IO(loop()) + private def unsafeCancelRead(id: Id): Unit = + stateRef.get() match { + case current @ WaitForPut(reads, takes) => + val newMap = reads - id + val update: State[A] = WaitForPut(newMap, takes) + if (!stateRef.compareAndSet(current, update)) { + // $COVERAGE-OFF$ + unsafeCancelRead(id) + // $COVERAGE-ON$ + } + case _ => () + } + + // For streaming a value to a whole `reads` collection + private def streamAll(value: Either[Nothing, A], listeners: LinkedMap[Id, Listener[A]]): Unit = { + val cursor = listeners.values.iterator + while (cursor.hasNext) + cursor.next().apply(value) } } From 140458d3536e87ac9901ab20afad78c0a21654e4 Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Tue, 15 May 2018 09:15:21 +0300 Subject: [PATCH 10/17] Fix TimerTest --- laws/shared/src/test/scala/cats/effect/TimerTests.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/laws/shared/src/test/scala/cats/effect/TimerTests.scala b/laws/shared/src/test/scala/cats/effect/TimerTests.scala index 57f6ae654c..977531eb4a 100644 --- a/laws/shared/src/test/scala/cats/effect/TimerTests.scala +++ b/laws/shared/src/test/scala/cats/effect/TimerTests.scala @@ -114,7 +114,7 @@ class TimerTests extends AsyncFunSuite with Matchers { } for (r <- io.value.unsafeToFuture()) yield { - r.right.getOrElse(0L) should be >= 9L + r.right.getOrElse(0L) should be > 0L } } From 87d577bd4edf4fa168347e5d988baabb319631e5 Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Tue, 15 May 2018 09:19:34 +0300 Subject: [PATCH 11/17] MVar - remove coverage annotations --- .../cats/effect/internals/MVarAsync.scala | 24 +++++-------------- .../effect/internals/MVarConcurrent.scala | 18 -------------- 2 files changed, 6 insertions(+), 36 deletions(-) diff --git a/core/shared/src/main/scala/cats/effect/internals/MVarAsync.scala b/core/shared/src/main/scala/cats/effect/internals/MVarAsync.scala index 2e03bd2152..8e74a0ec35 100644 --- a/core/shared/src/main/scala/cats/effect/internals/MVarAsync.scala +++ b/core/shared/src/main/scala/cats/effect/internals/MVarAsync.scala @@ -44,21 +44,13 @@ private[effect] final class MVarAsync[F[_], A] private ( def read: F[A] = F.async(unsafeRead) - private def streamAll(value: Either[Nothing, A], listeners: Iterable[Listener[A]]): Unit = { - val cursor = listeners.iterator - while (cursor.hasNext) - cursor.next().apply(value) - } - @tailrec private def unsafePut(a: A)(onPut: Listener[Unit]): Unit = { stateRef.get match { case current @ WaitForTake(value, puts) => val update = WaitForTake(value, puts.enqueue(a -> onPut)) if (!stateRef.compareAndSet(current, update)) { - // $COVERAGE-OFF$ unsafePut(a)(onPut) // retry - // $COVERAGE-ON$ } case current @ WaitForPut(reads, takes) => @@ -72,9 +64,7 @@ private[effect] final class MVarAsync[F[_], A] private ( } if (!stateRef.compareAndSet(current, update)) { - // $COVERAGE-OFF$ unsafePut(a)(onPut) // retry - // $COVERAGE-ON$ } else { val value = Right(a) // Satisfies all current `read` requests found @@ -97,9 +87,7 @@ private[effect] final class MVarAsync[F[_], A] private ( // Signals completion of `take` onTake(Right(value)) else { - // $COVERAGE-OFF$ unsafeTake(onTake) // retry - // $COVERAGE-ON$ } } else { val ((ax, notify), xs) = queue.dequeue @@ -110,17 +98,13 @@ private[effect] final class MVarAsync[F[_], A] private ( // Complete the `put` request waiting on a notification notify(rightUnit) } else { - // $COVERAGE-OFF$ unsafeTake(onTake) // retry - // $COVERAGE-ON$ } } case WaitForPut(reads, takes) => if (!stateRef.compareAndSet(current, WaitForPut(reads, takes.enqueue(onTake)))) { - // $COVERAGE-OFF$ unsafeTake(onTake) - // $COVERAGE-ON$ } } } @@ -137,12 +121,16 @@ private[effect] final class MVarAsync[F[_], A] private ( case WaitForPut(reads, takes) => // No value available, enqueue the callback if (!stateRef.compareAndSet(current, WaitForPut(reads.enqueue(onRead), takes))) { - // $COVERAGE-OFF$ unsafeRead(onRead) // retry - // $COVERAGE-ON$ } } } + + private def streamAll(value: Either[Nothing, A], listeners: Iterable[Listener[A]]): Unit = { + val cursor = listeners.iterator + while (cursor.hasNext) + cursor.next().apply(value) + } } private[effect] object MVarAsync { diff --git a/core/shared/src/main/scala/cats/effect/internals/MVarConcurrent.scala b/core/shared/src/main/scala/cats/effect/internals/MVarConcurrent.scala index 2b180be619..55340db826 100644 --- a/core/shared/src/main/scala/cats/effect/internals/MVarConcurrent.scala +++ b/core/shared/src/main/scala/cats/effect/internals/MVarConcurrent.scala @@ -53,9 +53,7 @@ private[effect] final class MVarConcurrent[F[_], A] private ( if (stateRef.compareAndSet(current, update)) { IO(unsafeCancelPut(id)) } else { - // $COVERAGE-OFF$ unsafePut(a)(onPut) // retry - // $COVERAGE-ON$ } case current @ WaitForPut(reads, takes) => @@ -78,9 +76,7 @@ private[effect] final class MVarConcurrent[F[_], A] private ( onPut(rightUnit) IO.unit } else { - // $COVERAGE-OFF$ unsafePut(a)(onPut) // retry - // $COVERAGE-ON$ } } } @@ -91,9 +87,7 @@ private[effect] final class MVarConcurrent[F[_], A] private ( case current @ WaitForTake(_, listeners) => val update = current.copy(listeners = listeners - id) if (!stateRef.compareAndSet(current, update)) { - // $COVERAGE-OFF$ unsafeCancelPut(id) // retry - // $COVERAGE-ON$ } case _ => () @@ -108,9 +102,7 @@ private[effect] final class MVarConcurrent[F[_], A] private ( onTake(Right(value)) IO.unit } else { - // $COVERAGE-OFF$ unsafeTake(onTake) // retry - // $COVERAGE-ON$ } } else { val ((ax, notify), xs) = queue.dequeue @@ -119,9 +111,7 @@ private[effect] final class MVarConcurrent[F[_], A] private ( notify(rightUnit) IO.unit } else { - // $COVERAGE-OFF$ unsafeTake(onTake) // retry - // $COVERAGE-ON$ } } @@ -131,9 +121,7 @@ private[effect] final class MVarConcurrent[F[_], A] private ( if (stateRef.compareAndSet(current, WaitForPut(reads, newQueue))) IO(unsafeCancelTake(id)) else { - // $COVERAGE-OFF$ unsafeTake(onTake) // retry - // $COVERAGE-ON$ } } } @@ -144,9 +132,7 @@ private[effect] final class MVarConcurrent[F[_], A] private ( val newMap = takes - id val update: State[A] = WaitForPut(reads, newMap) if (!stateRef.compareAndSet(current, update)) { - // $COVERAGE-OFF$ unsafeCancelTake(id) - // $COVERAGE-ON$ } case _ => } @@ -169,9 +155,7 @@ private[effect] final class MVarConcurrent[F[_], A] private ( if (stateRef.compareAndSet(current, WaitForPut(newQueue, takes))) IO(unsafeCancelRead(id)) else { - // $COVERAGE-OFF$ unsafeRead(onRead) // retry - // $COVERAGE-ON$ } } } @@ -182,9 +166,7 @@ private[effect] final class MVarConcurrent[F[_], A] private ( val newMap = reads - id val update: State[A] = WaitForPut(newMap, takes) if (!stateRef.compareAndSet(current, update)) { - // $COVERAGE-OFF$ unsafeCancelRead(id) - // $COVERAGE-ON$ } case _ => () } From e27c707aefed52c9a220c0e4f1d767b9920d8b1d Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Tue, 15 May 2018 09:47:42 +0300 Subject: [PATCH 12/17] Fix comments --- .../src/main/scala/cats/effect/internals/MVarAsync.scala | 4 ++-- .../src/main/scala/cats/effect/internals/MVarConcurrent.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/shared/src/main/scala/cats/effect/internals/MVarAsync.scala b/core/shared/src/main/scala/cats/effect/internals/MVarAsync.scala index 8e74a0ec35..b2488f25fe 100644 --- a/core/shared/src/main/scala/cats/effect/internals/MVarAsync.scala +++ b/core/shared/src/main/scala/cats/effect/internals/MVarAsync.scala @@ -143,8 +143,8 @@ private[effect] object MVarAsync { new MVarAsync[F, A](State.empty) /** - * Internal API — Matches the callack type in `cats.effect.Async`, - * but we don't care about about the error. + * Internal API — Matches the callback type in `cats.effect.Async`, + * but we don't care about the error. */ private type Listener[-A] = Either[Nothing, A] => Unit diff --git a/core/shared/src/main/scala/cats/effect/internals/MVarConcurrent.scala b/core/shared/src/main/scala/cats/effect/internals/MVarConcurrent.scala index 55340db826..7ab86d387a 100644 --- a/core/shared/src/main/scala/cats/effect/internals/MVarConcurrent.scala +++ b/core/shared/src/main/scala/cats/effect/internals/MVarConcurrent.scala @@ -189,8 +189,8 @@ private[effect] object MVarConcurrent { new MVarConcurrent[F, A](State.empty) /** - * Internal API — Matches the callack type in `cats.effect.Async`, - * but we don't care about about the error. + * Internal API — Matches the callback type in `cats.effect.Async`, + * but we don't care about the error. */ private type Listener[-A] = Either[Nothing, A] => Unit From 883797029abba16f64700a82749ff0d51dcb6041 Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Tue, 15 May 2018 09:49:25 +0300 Subject: [PATCH 13/17] Remove Async param --- core/shared/src/main/scala/cats/effect/concurrent/MVar.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/cats/effect/concurrent/MVar.scala b/core/shared/src/main/scala/cats/effect/concurrent/MVar.scala index a5258732ce..d5ff5e59ba 100644 --- a/core/shared/src/main/scala/cats/effect/concurrent/MVar.scala +++ b/core/shared/src/main/scala/cats/effect/concurrent/MVar.scala @@ -146,7 +146,7 @@ object MVar { F.delay(MVarAsync[F, A](a)(F)) /** Builds an empty `MVar`. */ - def empty[A](implicit F: Async[F]): F[MVar[F, A]] = + def empty[A]: F[MVar[F, A]] = F.delay(MVarAsync.empty[F, A](F)) } From 5c331988b75944398d68d378f4157192fa38e799 Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Tue, 15 May 2018 12:58:38 +0200 Subject: [PATCH 14/17] Refined builders, added test --- .../scala/cats/effect/concurrent/MVar.scala | 123 ++++++++++++------ .../cats/effect/concurrent/MVarTests.scala | 43 +++++- 2 files changed, 127 insertions(+), 39 deletions(-) diff --git a/core/shared/src/main/scala/cats/effect/concurrent/MVar.scala b/core/shared/src/main/scala/cats/effect/concurrent/MVar.scala index d5ff5e59ba..52d5eb531a 100644 --- a/core/shared/src/main/scala/cats/effect/concurrent/MVar.scala +++ b/core/shared/src/main/scala/cats/effect/concurrent/MVar.scala @@ -95,71 +95,120 @@ object MVar { * * This builder uses the * [[https://typelevel.org/cats/guidelines.html#partially-applied-type-params Partially-Applied Type]] - * technique. + * technique. This is a matter of preference. * * For creating an empty `MVar`: * {{{ - * MVar[IO].empty[Int] + * MVar[IO].empty[Int] <-> MVar.empty[IO, Int] * }}} * * For creating an `MVar` with an initial value: * {{{ - * MVar[IO].init("hello") + * MVar[IO].init("hello") <-> MVar.init[IO, String]("hello") * }}} + */ + def apply[F[_]](implicit F: Concurrent[F]): ApplyBuilders[F] = + new ApplyBuilders[F](F) + + /** + * Creates a cancelable `MVar` that starts as empty. + * + * @see [[emptyAsync]] for non-cancelable MVars * - * @see [[async]] + * @param F is a [[Concurrent]] constraint, needed in order to + * describe cancelable operations */ - def apply[F[_]](implicit F: Concurrent[F]): ConcurrentBuilder[F] = - new ConcurrentBuilder[F](F) + def empty[F[_], A](implicit F: Concurrent[F]): F[MVar[F, A]] = + F.delay(MVarConcurrent.empty) /** - * Builds an [[MVar]] value for `F` data types that are [[Async]]. + * Creates a non-cancelable `MVar` that starts as empty. * - * Due to `Async`'s restrictions the yielded values by [[MVar.take]] - * and [[MVar.put]] are not cancelable. + * The resulting `MVar` has non-cancelable operations. * - * This builder uses the - * [[https://typelevel.org/cats/guidelines.html#partially-applied-type-params Partially-Applied Type]] - * technique. + * @see [[empty]] for creating cancelable MVars + */ + def emptyAsync[F[_], A](implicit F: Async[F]): F[MVar[F, A]] = + F.delay(MVarAsync.empty) + + /** + * Creates a cancelable `MVar` that's initialized to an `initial` + * value. * - * For creating an empty `MVar`: - * {{{ - * MVar.async[IO].empty[Int] - * }}} + * @see [[initAsync]] for non-cancelable MVars * - * For creating an `MVar` with an initial value: - * {{{ - * MVar.async[IO].init("hello") - * }}} + * @param initial is a value that will be immediately available + * for the first `read` or `take` operation * - * @see [[apply]] + * @param F is a [[Concurrent]] constraint, needed in order to + * describe cancelable operations */ - def async[F[_]](implicit F: Async[F]): AsyncBuilder[F] = - new AsyncBuilder[F](F) + def init[F[_], A](initial: A)(implicit F: Concurrent[F]): F[MVar[F, A]] = + F.delay(MVarConcurrent(initial)) /** - * Returned by the [[async]] builder. + * Creates a non-cancelable `MVar` that's initialized to an `initial` + * value. + * + * The resulting `MVar` has non-cancelable operations. + * + * @see [[init]] for creating cancelable MVars */ - final class AsyncBuilder[F[_]](val F: Async[F]) extends AnyVal { - /** Builds an `MVar` with an initial value. */ - def init[A](a: A): F[MVar[F, A]] = - F.delay(MVarAsync[F, A](a)(F)) + def initAsync[F[_], A](initial: A)(implicit F: Async[F]): F[MVar[F, A]] = + F.delay(MVarAsync(initial)) - /** Builds an empty `MVar`. */ - def empty[A]: F[MVar[F, A]] = - F.delay(MVarAsync.empty[F, A](F)) - } + /** + * Creates a cancelable `MVar` initialized with a value given + * in the `F[A]` context, thus the initial value being lazily evaluated. + * + * @see [[init]] for creating MVars initialized with strict values + * @see [[initAsyncF]] for building non-cancelable MVars + * @param fa is the value that's going to be used as this MVar's + * initial value, available then for the first `take` or `read` + * @param F is a [[Concurrent]] constraint, needed in order to + * describe cancelable operations + */ + def initF[F[_], A](fa: F[A])(implicit F: Concurrent[F]): F[MVar[F, A]] = + F.map(fa)(MVarConcurrent.apply(_)) + + /** + * Creates a non-cancelable `MVar` initialized with a value given + * in the `F[A]` context, thus the initial value being lazily evaluated. + * + * @see [[initAsync]] for creating MVars initialized with strict values + * @see [[initF]] for building cancelable MVars + * @param fa is the value that's going to be used as this MVar's + * initial value, available then for the first `take` or `read` + */ + def initAsyncF[F[_], A](fa: F[A])(implicit F: Async[F]): F[MVar[F, A]] = + F.map(fa)(MVarAsync.apply(_)) /** * Returned by the [[apply]] builder. */ - final class ConcurrentBuilder[F[_]](val F: Concurrent[F]) extends AnyVal { - /** Builds an `MVar` with an initial value. */ + final class ApplyBuilders[F[_]](val F: Concurrent[F]) extends AnyVal { + /** + * Builds an `MVar` with an initial value. + * + * @see documentation for [[MVar.init]] + */ def init[A](a: A): F[MVar[F, A]] = - F.delay(MVarConcurrent[F, A](a)(F)) + MVar.init(a)(F) + + /** + * Builds an `MVar` with an initial value that's lazily evaluated. + * + * @see documentation for [[MVar.initF]] + */ + def initF[A](fa: F[A]): F[MVar[F, A]] = + MVar.initF(fa)(F) - /** Builds an empty `MVar`. */ + /** + * Builds an empty `MVar`. + * + * @see documentation for [[MVar.empty]] + */ def empty[A]: F[MVar[F, A]] = - F.delay(MVarConcurrent.empty[F, A](F)) + MVar.empty(F) } } \ No newline at end of file diff --git a/core/shared/src/test/scala/cats/effect/concurrent/MVarTests.scala b/core/shared/src/test/scala/cats/effect/concurrent/MVarTests.scala index fe697783e0..a3cb92ce39 100644 --- a/core/shared/src/test/scala/cats/effect/concurrent/MVarTests.scala +++ b/core/shared/src/test/scala/cats/effect/concurrent/MVarTests.scala @@ -27,6 +27,9 @@ class MVarConcurrentTests extends BaseMVarTests { def init[A](a: A): IO[MVar[IO, A]] = MVar[IO].init(a) + def initF[A](fa: IO[A]): IO[MVar[IO, A]] = + MVar[IO].initF(fa) + def empty[A]: IO[MVar[IO, A]] = MVar[IO].empty[A] @@ -84,10 +87,13 @@ class MVarConcurrentTests extends BaseMVarTests { class MVarAsyncTests extends BaseMVarTests { def init[A](a: A): IO[MVar[IO, A]] = - MVar.async[IO].init(a) + MVar.initAsync(a) + + def initF[A](fa: IO[A]): IO[MVar[IO, A]] = + MVar.initAsyncF(fa) def empty[A]: IO[MVar[IO, A]] = - MVar.async[IO].empty[A] + MVar.emptyAsync } abstract class BaseMVarTests extends AsyncFunSuite with Matchers { @@ -95,6 +101,7 @@ abstract class BaseMVarTests extends AsyncFunSuite with Matchers { ExecutionContext.Implicits.global def init[A](a: A): IO[MVar[IO, A]] + def initF[A](fa: IO[A]): IO[MVar[IO, A]] def empty[A]: IO[MVar[IO, A]] test("empty; put; take; put; take") { @@ -346,4 +353,36 @@ abstract class BaseMVarTests extends AsyncFunSuite with Matchers { r shouldBe true } } + + test("initF works") { + val task = for { + channel <- initF(IO(10)) + r1 <- channel.read + r2 <- channel.read + r3 <- channel.take + } yield List(r1, r2, r3) + + for (r <- task.unsafeToFuture()) yield { + r shouldBe List(10, 10, 10) + } + } + + test("concurrent take and put") { + val count = if (Platform.isJvm) 10000 else 1000 + val task = for { + mvar <- empty[Int] + ref <- Ref[IO, Int](0) + takes = (0 until count).toList.map(_ => IO.shift *> mvar.take.flatMap(x => ref.modify(_ + x))).parSequence + puts = (0 until count).toList.map(_ => IO.shift *> mvar.put(1)).parSequence + fiber1 <- takes.start + fiber2 <- puts.start + _ <- fiber1.join + _ <- fiber2.join + r <- ref.get + } yield r + + for (r <- task.unsafeToFuture()) yield { + r shouldBe count + } + } } From e61bf96be41a7cc99331ba83c1e08976e02b654a Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Tue, 15 May 2018 14:04:48 +0200 Subject: [PATCH 15/17] Change MVar builders --- .../shared/src/main/scala/cats/effect/concurrent/MVar.scala | 4 +++- .../src/test/scala/cats/effect/concurrent/MVarTests.scala | 6 +++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/core/shared/src/main/scala/cats/effect/concurrent/MVar.scala b/core/shared/src/main/scala/cats/effect/concurrent/MVar.scala index 52d5eb531a..3e50a6a0d0 100644 --- a/core/shared/src/main/scala/cats/effect/concurrent/MVar.scala +++ b/core/shared/src/main/scala/cats/effect/concurrent/MVar.scala @@ -95,7 +95,7 @@ object MVar { * * This builder uses the * [[https://typelevel.org/cats/guidelines.html#partially-applied-type-params Partially-Applied Type]] - * technique. This is a matter of preference. + * technique. * * For creating an empty `MVar`: * {{{ @@ -106,6 +106,8 @@ object MVar { * {{{ * MVar[IO].init("hello") <-> MVar.init[IO, String]("hello") * }}} + * + * @see [[init]], [[initF]] and [[empty]] */ def apply[F[_]](implicit F: Concurrent[F]): ApplyBuilders[F] = new ApplyBuilders[F](F) diff --git a/core/shared/src/test/scala/cats/effect/concurrent/MVarTests.scala b/core/shared/src/test/scala/cats/effect/concurrent/MVarTests.scala index a3cb92ce39..893c3691ed 100644 --- a/core/shared/src/test/scala/cats/effect/concurrent/MVarTests.scala +++ b/core/shared/src/test/scala/cats/effect/concurrent/MVarTests.scala @@ -370,10 +370,10 @@ abstract class BaseMVarTests extends AsyncFunSuite with Matchers { test("concurrent take and put") { val count = if (Platform.isJvm) 10000 else 1000 val task = for { - mvar <- empty[Int] + mVar <- empty[Int] ref <- Ref[IO, Int](0) - takes = (0 until count).toList.map(_ => IO.shift *> mvar.take.flatMap(x => ref.modify(_ + x))).parSequence - puts = (0 until count).toList.map(_ => IO.shift *> mvar.put(1)).parSequence + takes = (0 until count).map(_ => IO.shift *> mVar.take.flatMap(x => ref.modify(_ + x))).toList.parSequence + puts = (0 until count).map(_ => IO.shift *> mVar.put(1)).toList.parSequence fiber1 <- takes.start fiber2 <- puts.start _ <- fiber1.join From a027384b1a618df23cdb9eff9e9d0f76b70dd332 Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Tue, 15 May 2018 14:06:44 +0200 Subject: [PATCH 16/17] Fix doc --- site/src/main/tut/datatypes/mvar.md | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/site/src/main/tut/datatypes/mvar.md b/site/src/main/tut/datatypes/mvar.md index 49fffa843a..2c80a2c68e 100644 --- a/site/src/main/tut/datatypes/mvar.md +++ b/site/src/main/tut/datatypes/mvar.md @@ -25,10 +25,9 @@ It has two fundamental (atomic) operations: - `take`: tries reading the current value, or blocks (asynchronously) until there is a value available, at which point the operation resorts to a `take` followed by a `put` - -An additional but non-atomic operation is `read`, which tries reading the -current value, or blocks (asynchronously) until there is a value available, -at which point the operation resorts to a `take` followed by a `put`. +- `read`: which reads the current value without modifying the `MVar`, + assuming there is a value available, or otherwise it waits until a value + is made available via `put`

In this context "asynchronous blocking" means that we are not blocking From 18d3161d31290d27bdc5fd3272f296f506bccac3 Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Tue, 15 May 2018 14:10:01 +0200 Subject: [PATCH 17/17] Make test more complicated --- .../src/test/scala/cats/effect/concurrent/MVarTests.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/shared/src/test/scala/cats/effect/concurrent/MVarTests.scala b/core/shared/src/test/scala/cats/effect/concurrent/MVarTests.scala index 893c3691ed..60e3de9d3a 100644 --- a/core/shared/src/test/scala/cats/effect/concurrent/MVarTests.scala +++ b/core/shared/src/test/scala/cats/effect/concurrent/MVarTests.scala @@ -372,7 +372,7 @@ abstract class BaseMVarTests extends AsyncFunSuite with Matchers { val task = for { mVar <- empty[Int] ref <- Ref[IO, Int](0) - takes = (0 until count).map(_ => IO.shift *> mVar.take.flatMap(x => ref.modify(_ + x))).toList.parSequence + takes = (0 until count).map(_ => IO.shift *> mVar.read.map2(mVar.take)(_ + _).flatMap(x => ref.modify(_ + x))).toList.parSequence puts = (0 until count).map(_ => IO.shift *> mVar.put(1)).toList.parSequence fiber1 <- takes.start fiber2 <- puts.start @@ -382,7 +382,7 @@ abstract class BaseMVarTests extends AsyncFunSuite with Matchers { } yield r for (r <- task.unsafeToFuture()) yield { - r shouldBe count + r shouldBe count * 2 } } }