Skip to content

Commit

Permalink
feat: more debugability for refs and co.
Browse files Browse the repository at this point in the history
  • Loading branch information
oleg-py committed May 10, 2019
1 parent 0362040 commit 5d89a64
Show file tree
Hide file tree
Showing 12 changed files with 82 additions and 5 deletions.
2 changes: 2 additions & 0 deletions js/src/main/scala/com/olegpy/stm/internal/StorePlatform.scala
Expand Up @@ -74,5 +74,7 @@ trait StorePlatform {
throw ex
}
}

def unsafeReadCommitted(k: AnyRef): Any = committed.get(k)
}
}
Expand Up @@ -61,7 +61,7 @@ trait StorePlatform {
val ksi = j.reads.iterator()
while (ksi.hasNext && !hasConflict) {
val key = ksi.next()
hasConflict = start.get(key) != preCommit.get(key)
hasConflict = start.get(key) ne preCommit.get(key)
}
}
if (hasConflict) {
Expand Down Expand Up @@ -101,5 +101,10 @@ trait StorePlatform {
throw ex
}
}

def unsafeReadCommitted(k: AnyRef): Any = committed.get().get(k) match {
case null => null
case t => t._1
}
}
}
10 changes: 10 additions & 0 deletions shared/src/main/scala/com/olegpy/stm/TRef.scala
Expand Up @@ -30,6 +30,10 @@ trait TRef[A] extends Ref[STM, A] {
def tryModifyState[B](state: State[A, B]): STM[Option[B]] = modifyState(state).map(_.some)

def modifyState[B](state: State[A, B]): STM[B] = modify(state.run(_).value)

protected[stm] def unsafeLastValue: A

override def toString: String = s"TRef($unsafeLastValue)"
}

object TRef {
Expand All @@ -45,16 +49,22 @@ object TRef {
val unit: TRef[Unit] = new TRef[Unit] {
def get: STM[Unit] = STM.unit
def set(a: Unit): STM[Unit] = STM.unit

protected[stm] def unsafeLastValue: Unit = ()
}

def imap[A, B](fa: TRef[A])(f: A => B)(g: B => A): TRef[B] = new TRef[B] {
def get: STM[B] = fa.get map f
def set(a: B): STM[Unit] = fa.set(g(a))

protected[stm] def unsafeLastValue: B = f(fa.unsafeLastValue)
}

def product[A, B](fa: TRef[A], fb: TRef[B]): TRef[(A, B)] = new TRef[(A, B)] {
def get: STM[(A, B)] = fa.get product fb.get
def set(a: (A, B)): STM[Unit] = fa.set(a._1) *> fb.set(a._2)

protected[stm] def unsafeLastValue: (A, B) = (fa.unsafeLastValue, fb.unsafeLastValue)
}
}
}
Expand Up @@ -11,7 +11,7 @@ class Monitor private[stm] () {
private[this] val store: Store = /*_*/Store.forPlatform()/*_*/
private[this] val rightUnit = Right(())

private[this] class RetryCallback(keys: Iterable[AnyRef]) {
private[this] class RetryCallback (keys: Iterable[AnyRef]) {
@volatile var catsCb: Callback = _
keys.foreach(listenTo)
def invoke(): Unit = {
Expand Down
2 changes: 2 additions & 0 deletions shared/src/main/scala/com/olegpy/stm/internal/Store.scala
Expand Up @@ -5,6 +5,8 @@ trait Store {
def current(): Store.Journal
def transact[A](f: => A): A
def attempt[A](f: => A): A

def unsafeReadCommitted(k: AnyRef): Any
}

object Store extends StorePlatform {
Expand Down
3 changes: 3 additions & 0 deletions shared/src/main/scala/com/olegpy/stm/internal/TRefImpl.scala
Expand Up @@ -11,4 +11,7 @@ private[stm] class TRefImpl[A](initial: A) extends TRef[A] {
def set(a: A): STM[Unit] = STM.delay {
STM.store.current().update(this, a)
}

protected[stm] def unsafeLastValue: A =
STM.store.unsafeReadCommitted(this).asInstanceOf[A]
}
5 changes: 5 additions & 0 deletions shared/src/main/scala/com/olegpy/stm/misc/TDeferred.scala
Expand Up @@ -20,6 +20,11 @@ class TDeferred[A] (private[stm] val state: TRef[Option[A]]) extends TryableDefe
def get: F[A] = outer.get.commit[F]
def complete(a: A): F[Unit] = outer.complete(a).commit[F]
}

override def toString: String = state.unsafeLastValue match {
case Some(value) => s"TDeferred(<completed>: $value)"
case None => s"TDeferred(<not completed>)"
}
}

object TDeferred {
Expand Down
5 changes: 5 additions & 0 deletions shared/src/main/scala/com/olegpy/stm/misc/TMVar.scala
Expand Up @@ -15,6 +15,11 @@ class TMVar[A] (private[stm] val state: TRef[Option[A]]) extends MVar[STM, A] {
def read: STM[A] = state.get.unNone

def to[F[_]: Concurrent]: MVar[F, A] = mapK(STM.atomicallyK[F])

override def toString: String = state.unsafeLastValue match {
case Some(value) => s"TMVar(<full>: $value)"
case None => "TMVar(<empty>)"
}
}

object TMVar {
Expand Down
8 changes: 8 additions & 0 deletions shared/src/main/scala/com/olegpy/stm/misc/TQueue.scala
Expand Up @@ -28,6 +28,8 @@ object TQueue {
def offer(a: A): STM[Boolean] = slot.get
.flatMap(_.fold(slot.set(a.some).as(true))(_ => STM.pure(false)))
def tryDequeue: STM[Option[A]] = slot.get

override def toString: String = s"TQueue(synchronous, ${slot.unsafeLastValue.getOrElse("<empty>")})"
}
}

Expand All @@ -43,6 +45,8 @@ object TQueue {
case Some((hd, rest)) => (rest, hd.some)
}
}

override def toString: String = s"TQueue(unbounded, ${state.unsafeLastValue.mkString(", ")})"
}
}

Expand All @@ -59,6 +63,8 @@ object TQueue {
case hd +: tail => (tail, hd.some)
case empty => (empty, none)
}

override def toString: String = s"TQueue(bounded($max), ${state.unsafeLastValue.mkString(", ")})"
}
}

Expand All @@ -71,6 +77,8 @@ object TQueue {
case hd +: tail => (tail, hd.some)
case empty => (empty, none)
}

override def toString: String = s"TQueue(circularBuffer($max), ${state.unsafeLastValue.mkString(", ")})"
}
}

Expand Down
3 changes: 2 additions & 1 deletion shared/src/test/scala/com/olegpy/stm/BaseIOSuite.scala
Expand Up @@ -30,7 +30,7 @@ abstract class NondetIOSuite extends TestSuite with IOSuiteUtils {
implicit def timer: Timer[IO] = IO.timer(ec)


def ioTimeout: FiniteDuration = 750.millis
def ioTimeout: FiniteDuration = 1.second

override def utestWrap(path: Seq[String], runBody: => Future[Any])(implicit ec: ExecutionContext): Future[Any] = {
super.utestWrap(path, runBody.flatMap {
Expand All @@ -50,6 +50,7 @@ abstract class DeterministicIOSuite extends TestSuite with IOSuiteUtils {
case io: IO[_] =>
val f = io.unsafeToFuture()
tc.tick(365.days)
assert(tc.state.tasks.isEmpty)
Future.fromTry(f.value.get)
case other => Future.successful(other)
})(new ExecutionContext {
Expand Down
34 changes: 34 additions & 0 deletions shared/src/test/scala/com/olegpy/stm/StoreTests.scala
@@ -0,0 +1,34 @@
package com.olegpy.stm

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import com.olegpy.stm.internal.Store
import utest._

object StoreTests extends TestSuite {
val tests = Tests {
"Store resolves conflicting updates" - {
val store = Store.forPlatform()
val key1, key2 = new Object
store.transact(store.current().update(key1, 0))
store.transact(store.current().update(key2, 0))
def increment(key: Object): Unit = store.transact {
val j = store.current()
j.update(key, j.read(key).asInstanceOf[Int] + 1)
}
val execs = 10000
Future.sequence { List.tabulate(execs * 2)(i => Future {
if (i % 2 == 0) increment(key1) else increment(key2)
}) }
.map { _ =>
val (r1, r2) = store.transact {
val j = store.current()
(j.read(key1), j.read(key2))
}
assert(r1 == execs)
assert(r2 == execs)
}
}
}
}
Expand Up @@ -3,6 +3,7 @@ package com.olegpy.stm.problems
import scala.concurrent.duration._
import scala.util.Random

import cats.effect.ExitCase.Canceled
import cats.effect.IO
import com.olegpy.stm.misc.TQueue
import com.olegpy.stm._
Expand All @@ -11,7 +12,7 @@ import cats.implicits._


object CigaretteSmokersProblem extends NondetIOSuite {
override def ioTimeout: FiniteDuration = 3.seconds
override def ioTimeout: FiniteDuration = 30.seconds

val tests = Tests {
"Cigarette smokers problem" - {
Expand All @@ -20,7 +21,7 @@ object CigaretteSmokersProblem extends NondetIOSuite {
table <- mkTable
deal <- new Dealer(table).dealRandom.replicateA(attempts).start
counter <- TRef.in[IO](0)
puff = (counter.update(_ + 1) >> counter.get).commit[IO] >> nap
puff = counter.update(_ + 1).commit[IO]// >> nap
smoke <- allIngredients.foldMapM {
new Smoker(_, table).buildACig(puff).foreverM[Unit].start
}
Expand All @@ -42,6 +43,7 @@ object CigaretteSmokersProblem extends NondetIOSuite {
class Table(queue: TQueue[Ingredient]) {
def put(ingredient: Ingredient): STM[Unit] = queue.enqueue(ingredient)
def takeThings: STM[Set[Ingredient]] = queue.dequeue.replicateA(2).map(_.toSet)
override def toString: String = s"Table($queue)"
}

def mkTable: IO[Table] = TQueue.boundedIn[IO, Ingredient](2).map(new Table(_))
Expand Down

0 comments on commit 5d89a64

Please sign in to comment.