From 5d89a64277905050cddf9e844a9385a41a1b6839 Mon Sep 17 00:00:00 2001 From: Oleg Pyzhcov Date: Fri, 10 May 2019 14:34:54 +0300 Subject: [PATCH] feat: more debugability for refs and co. --- .../olegpy/stm/internal/StorePlatform.scala | 2 ++ .../olegpy/stm/internal/StorePlatform.scala | 7 +++- .../src/main/scala/com/olegpy/stm/TRef.scala | 10 ++++++ .../com/olegpy/stm/internal/Monitor.scala | 2 +- .../scala/com/olegpy/stm/internal/Store.scala | 2 ++ .../com/olegpy/stm/internal/TRefImpl.scala | 3 ++ .../scala/com/olegpy/stm/misc/TDeferred.scala | 5 +++ .../scala/com/olegpy/stm/misc/TMVar.scala | 5 +++ .../scala/com/olegpy/stm/misc/TQueue.scala | 8 +++++ .../scala/com/olegpy/stm/BaseIOSuite.scala | 3 +- .../scala/com/olegpy/stm/StoreTests.scala | 34 +++++++++++++++++++ .../problems/CigaretteSmokersProblem.scala | 6 ++-- 12 files changed, 82 insertions(+), 5 deletions(-) create mode 100644 shared/src/test/scala/com/olegpy/stm/StoreTests.scala diff --git a/js/src/main/scala/com/olegpy/stm/internal/StorePlatform.scala b/js/src/main/scala/com/olegpy/stm/internal/StorePlatform.scala index b2af7ff..b5e1e89 100644 --- a/js/src/main/scala/com/olegpy/stm/internal/StorePlatform.scala +++ b/js/src/main/scala/com/olegpy/stm/internal/StorePlatform.scala @@ -74,5 +74,7 @@ trait StorePlatform { throw ex } } + + def unsafeReadCommitted(k: AnyRef): Any = committed.get(k) } } diff --git a/jvm/src/main/scala/com/olegpy/stm/internal/StorePlatform.scala b/jvm/src/main/scala/com/olegpy/stm/internal/StorePlatform.scala index 20755b8..45969fe 100644 --- a/jvm/src/main/scala/com/olegpy/stm/internal/StorePlatform.scala +++ b/jvm/src/main/scala/com/olegpy/stm/internal/StorePlatform.scala @@ -61,7 +61,7 @@ trait StorePlatform { val ksi = j.reads.iterator() while (ksi.hasNext && !hasConflict) { val key = ksi.next() - hasConflict = start.get(key) != preCommit.get(key) + hasConflict = start.get(key) ne preCommit.get(key) } } if (hasConflict) { @@ -101,5 +101,10 @@ trait StorePlatform { throw ex } } + + def unsafeReadCommitted(k: AnyRef): Any = committed.get().get(k) match { + case null => null + case t => t._1 + } } } diff --git a/shared/src/main/scala/com/olegpy/stm/TRef.scala b/shared/src/main/scala/com/olegpy/stm/TRef.scala index ab2f16e..88049d4 100644 --- a/shared/src/main/scala/com/olegpy/stm/TRef.scala +++ b/shared/src/main/scala/com/olegpy/stm/TRef.scala @@ -30,6 +30,10 @@ trait TRef[A] extends Ref[STM, A] { def tryModifyState[B](state: State[A, B]): STM[Option[B]] = modifyState(state).map(_.some) def modifyState[B](state: State[A, B]): STM[B] = modify(state.run(_).value) + + protected[stm] def unsafeLastValue: A + + override def toString: String = s"TRef($unsafeLastValue)" } object TRef { @@ -45,16 +49,22 @@ object TRef { val unit: TRef[Unit] = new TRef[Unit] { def get: STM[Unit] = STM.unit def set(a: Unit): STM[Unit] = STM.unit + + protected[stm] def unsafeLastValue: Unit = () } def imap[A, B](fa: TRef[A])(f: A => B)(g: B => A): TRef[B] = new TRef[B] { def get: STM[B] = fa.get map f def set(a: B): STM[Unit] = fa.set(g(a)) + + protected[stm] def unsafeLastValue: B = f(fa.unsafeLastValue) } def product[A, B](fa: TRef[A], fb: TRef[B]): TRef[(A, B)] = new TRef[(A, B)] { def get: STM[(A, B)] = fa.get product fb.get def set(a: (A, B)): STM[Unit] = fa.set(a._1) *> fb.set(a._2) + + protected[stm] def unsafeLastValue: (A, B) = (fa.unsafeLastValue, fb.unsafeLastValue) } } } diff --git a/shared/src/main/scala/com/olegpy/stm/internal/Monitor.scala b/shared/src/main/scala/com/olegpy/stm/internal/Monitor.scala index 5b35fe2..e34fb8d 100644 --- a/shared/src/main/scala/com/olegpy/stm/internal/Monitor.scala +++ b/shared/src/main/scala/com/olegpy/stm/internal/Monitor.scala @@ -11,7 +11,7 @@ class Monitor private[stm] () { private[this] val store: Store = /*_*/Store.forPlatform()/*_*/ private[this] val rightUnit = Right(()) - private[this] class RetryCallback(keys: Iterable[AnyRef]) { + private[this] class RetryCallback (keys: Iterable[AnyRef]) { @volatile var catsCb: Callback = _ keys.foreach(listenTo) def invoke(): Unit = { diff --git a/shared/src/main/scala/com/olegpy/stm/internal/Store.scala b/shared/src/main/scala/com/olegpy/stm/internal/Store.scala index a4029dc..905f830 100644 --- a/shared/src/main/scala/com/olegpy/stm/internal/Store.scala +++ b/shared/src/main/scala/com/olegpy/stm/internal/Store.scala @@ -5,6 +5,8 @@ trait Store { def current(): Store.Journal def transact[A](f: => A): A def attempt[A](f: => A): A + + def unsafeReadCommitted(k: AnyRef): Any } object Store extends StorePlatform { diff --git a/shared/src/main/scala/com/olegpy/stm/internal/TRefImpl.scala b/shared/src/main/scala/com/olegpy/stm/internal/TRefImpl.scala index e47cbcf..0708071 100644 --- a/shared/src/main/scala/com/olegpy/stm/internal/TRefImpl.scala +++ b/shared/src/main/scala/com/olegpy/stm/internal/TRefImpl.scala @@ -11,4 +11,7 @@ private[stm] class TRefImpl[A](initial: A) extends TRef[A] { def set(a: A): STM[Unit] = STM.delay { STM.store.current().update(this, a) } + + protected[stm] def unsafeLastValue: A = + STM.store.unsafeReadCommitted(this).asInstanceOf[A] } diff --git a/shared/src/main/scala/com/olegpy/stm/misc/TDeferred.scala b/shared/src/main/scala/com/olegpy/stm/misc/TDeferred.scala index 1b8a25e..995a4ab 100644 --- a/shared/src/main/scala/com/olegpy/stm/misc/TDeferred.scala +++ b/shared/src/main/scala/com/olegpy/stm/misc/TDeferred.scala @@ -20,6 +20,11 @@ class TDeferred[A] (private[stm] val state: TRef[Option[A]]) extends TryableDefe def get: F[A] = outer.get.commit[F] def complete(a: A): F[Unit] = outer.complete(a).commit[F] } + + override def toString: String = state.unsafeLastValue match { + case Some(value) => s"TDeferred(: $value)" + case None => s"TDeferred()" + } } object TDeferred { diff --git a/shared/src/main/scala/com/olegpy/stm/misc/TMVar.scala b/shared/src/main/scala/com/olegpy/stm/misc/TMVar.scala index 9997aa9..577c473 100644 --- a/shared/src/main/scala/com/olegpy/stm/misc/TMVar.scala +++ b/shared/src/main/scala/com/olegpy/stm/misc/TMVar.scala @@ -15,6 +15,11 @@ class TMVar[A] (private[stm] val state: TRef[Option[A]]) extends MVar[STM, A] { def read: STM[A] = state.get.unNone def to[F[_]: Concurrent]: MVar[F, A] = mapK(STM.atomicallyK[F]) + + override def toString: String = state.unsafeLastValue match { + case Some(value) => s"TMVar(: $value)" + case None => "TMVar()" + } } object TMVar { diff --git a/shared/src/main/scala/com/olegpy/stm/misc/TQueue.scala b/shared/src/main/scala/com/olegpy/stm/misc/TQueue.scala index a0754f4..ba752ba 100644 --- a/shared/src/main/scala/com/olegpy/stm/misc/TQueue.scala +++ b/shared/src/main/scala/com/olegpy/stm/misc/TQueue.scala @@ -28,6 +28,8 @@ object TQueue { def offer(a: A): STM[Boolean] = slot.get .flatMap(_.fold(slot.set(a.some).as(true))(_ => STM.pure(false))) def tryDequeue: STM[Option[A]] = slot.get + + override def toString: String = s"TQueue(synchronous, ${slot.unsafeLastValue.getOrElse("")})" } } @@ -43,6 +45,8 @@ object TQueue { case Some((hd, rest)) => (rest, hd.some) } } + + override def toString: String = s"TQueue(unbounded, ${state.unsafeLastValue.mkString(", ")})" } } @@ -59,6 +63,8 @@ object TQueue { case hd +: tail => (tail, hd.some) case empty => (empty, none) } + + override def toString: String = s"TQueue(bounded($max), ${state.unsafeLastValue.mkString(", ")})" } } @@ -71,6 +77,8 @@ object TQueue { case hd +: tail => (tail, hd.some) case empty => (empty, none) } + + override def toString: String = s"TQueue(circularBuffer($max), ${state.unsafeLastValue.mkString(", ")})" } } diff --git a/shared/src/test/scala/com/olegpy/stm/BaseIOSuite.scala b/shared/src/test/scala/com/olegpy/stm/BaseIOSuite.scala index 4b19108..a3b83d1 100644 --- a/shared/src/test/scala/com/olegpy/stm/BaseIOSuite.scala +++ b/shared/src/test/scala/com/olegpy/stm/BaseIOSuite.scala @@ -30,7 +30,7 @@ abstract class NondetIOSuite extends TestSuite with IOSuiteUtils { implicit def timer: Timer[IO] = IO.timer(ec) - def ioTimeout: FiniteDuration = 750.millis + def ioTimeout: FiniteDuration = 1.second override def utestWrap(path: Seq[String], runBody: => Future[Any])(implicit ec: ExecutionContext): Future[Any] = { super.utestWrap(path, runBody.flatMap { @@ -50,6 +50,7 @@ abstract class DeterministicIOSuite extends TestSuite with IOSuiteUtils { case io: IO[_] => val f = io.unsafeToFuture() tc.tick(365.days) + assert(tc.state.tasks.isEmpty) Future.fromTry(f.value.get) case other => Future.successful(other) })(new ExecutionContext { diff --git a/shared/src/test/scala/com/olegpy/stm/StoreTests.scala b/shared/src/test/scala/com/olegpy/stm/StoreTests.scala new file mode 100644 index 0000000..026470e --- /dev/null +++ b/shared/src/test/scala/com/olegpy/stm/StoreTests.scala @@ -0,0 +1,34 @@ +package com.olegpy.stm + +import scala.concurrent.Future +import scala.concurrent.ExecutionContext.Implicits.global + +import com.olegpy.stm.internal.Store +import utest._ + +object StoreTests extends TestSuite { + val tests = Tests { + "Store resolves conflicting updates" - { + val store = Store.forPlatform() + val key1, key2 = new Object + store.transact(store.current().update(key1, 0)) + store.transact(store.current().update(key2, 0)) + def increment(key: Object): Unit = store.transact { + val j = store.current() + j.update(key, j.read(key).asInstanceOf[Int] + 1) + } + val execs = 10000 + Future.sequence { List.tabulate(execs * 2)(i => Future { + if (i % 2 == 0) increment(key1) else increment(key2) + }) } + .map { _ => + val (r1, r2) = store.transact { + val j = store.current() + (j.read(key1), j.read(key2)) + } + assert(r1 == execs) + assert(r2 == execs) + } + } + } +} diff --git a/shared/src/test/scala/com/olegpy/stm/problems/CigaretteSmokersProblem.scala b/shared/src/test/scala/com/olegpy/stm/problems/CigaretteSmokersProblem.scala index 2daea05..78b083b 100644 --- a/shared/src/test/scala/com/olegpy/stm/problems/CigaretteSmokersProblem.scala +++ b/shared/src/test/scala/com/olegpy/stm/problems/CigaretteSmokersProblem.scala @@ -3,6 +3,7 @@ package com.olegpy.stm.problems import scala.concurrent.duration._ import scala.util.Random +import cats.effect.ExitCase.Canceled import cats.effect.IO import com.olegpy.stm.misc.TQueue import com.olegpy.stm._ @@ -11,7 +12,7 @@ import cats.implicits._ object CigaretteSmokersProblem extends NondetIOSuite { - override def ioTimeout: FiniteDuration = 3.seconds + override def ioTimeout: FiniteDuration = 30.seconds val tests = Tests { "Cigarette smokers problem" - { @@ -20,7 +21,7 @@ object CigaretteSmokersProblem extends NondetIOSuite { table <- mkTable deal <- new Dealer(table).dealRandom.replicateA(attempts).start counter <- TRef.in[IO](0) - puff = (counter.update(_ + 1) >> counter.get).commit[IO] >> nap + puff = counter.update(_ + 1).commit[IO]// >> nap smoke <- allIngredients.foldMapM { new Smoker(_, table).buildACig(puff).foreverM[Unit].start } @@ -42,6 +43,7 @@ object CigaretteSmokersProblem extends NondetIOSuite { class Table(queue: TQueue[Ingredient]) { def put(ingredient: Ingredient): STM[Unit] = queue.enqueue(ingredient) def takeThings: STM[Set[Ingredient]] = queue.dequeue.replicateA(2).map(_.toSet) + override def toString: String = s"Table($queue)" } def mkTable: IO[Table] = TQueue.boundedIn[IO, Ingredient](2).map(new Table(_))