Skip to content

Commit

Permalink
feat: invariant instances for concurrent things
Browse files Browse the repository at this point in the history
  • Loading branch information
oleg-py committed May 4, 2019
1 parent 9fb8bb6 commit 7824e10
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 8 deletions.
13 changes: 11 additions & 2 deletions shared/src/main/scala/com/olegpy/stm/concurrent/TDeferred.scala
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
package com.olegpy.stm.concurrent

import cats.{Functor, Invariant}
import cats.effect.{Concurrent, Sync}
import cats.effect.concurrent.TryableDeferred
import com.olegpy.stm.{STM, TRef}
import cats.implicits._


class TDeferred[A] (state: TRef[Option[A]]) extends TryableDeferred[STM, A] { outer =>
class TDeferred[A] (private val state: TRef[Option[A]]) extends TryableDeferred[STM, A] { outer =>
def tryGet: STM[Option[A]] = state.get
def get: STM[A] = tryGet.unNone
def complete(a: A): STM[Unit] = state.updateF {
case Some(_) => STM.abort(new IllegalStateException("Attempting to complete deferred twice"))
case None => STM.pure(Some(a))
}

// N.B: cannot use this.mapK as that doesn't return TryableDeferred
def in[F[_]: Concurrent]: TryableDeferred[F, A] = new TryableDeferred[F, A] {
def tryGet: F[Option[A]] = outer.tryGet.commit[F]
def get: F[A] = outer.get.commit[F]
Expand All @@ -23,4 +25,11 @@ class TDeferred[A] (state: TRef[Option[A]]) extends TryableDeferred[STM, A] { ou
object TDeferred {
def apply[A]: STM[TDeferred[A]] = TRef(Option.empty[A]).map(new TDeferred(_))
def in[F[_]: Sync, A]: F[TDeferred[A]] = STM.unsafeToSync(apply)

implicit val invariant: Invariant[TDeferred] = new Invariant[TDeferred] {
def imap[A, B](fa: TDeferred[A])(f: A => B)(g: B => A): TDeferred[B] = {
val fo = Functor[Option]
new TDeferred[B](fa.state.imap(fo.lift(f))(fo.lift(g)))
}
}
}
13 changes: 10 additions & 3 deletions shared/src/main/scala/com/olegpy/stm/concurrent/TMVar.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.olegpy.stm.concurrent

import cats.{Functor, Invariant}
import cats.effect.{Concurrent, Sync}
import cats.effect.concurrent.MVar
import com.olegpy.stm.{STM, TRef}
import cats.syntax.flatMap._
import cats.syntax.option._
import cats.implicits._

class TMVar[A] (state: TRef[Option[A]]) extends MVar[STM, A] {
class TMVar[A] (private val state: TRef[Option[A]]) extends MVar[STM, A] {
def isEmpty: STM[Boolean] = state.get.map(_.isEmpty)
def put(a: A): STM[Unit] = state.updOrRetry { case None => a.some }
def tryPut(a: A): STM[Boolean] = put(a).as(true) orElse STM.pure(false)
Expand All @@ -23,4 +23,11 @@ object TMVar {

def in[F[_]: Sync, A](initial: A): F[TMVar[A]] = STM.unsafeToSync(TMVar(initial))
def emptyIn[F[_]: Sync, A]: F[TMVar[A]] = STM.unsafeToSync(TMVar.empty)

implicit val invariantInstance: Invariant[TMVar] = new Invariant[TMVar] {
def imap[A, B](fa: TMVar[A])(f: A => B)(g: B => A): TMVar[B] = {
val fo = Functor[Option]
new TMVar[B](fa.state.imap(fo.lift(f))(fo.lift(g)))
}
}
}
13 changes: 10 additions & 3 deletions shared/src/main/scala/com/olegpy/stm/concurrent/TQueue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package com.olegpy.stm.concurrent

import scala.collection.immutable.Queue

import cats.Foldable
import cats.data.NonEmptyList
import cats.{Foldable, Invariant}
import cats.data.{Nested, NonEmptyList}
import cats.effect.Sync
import com.olegpy.stm.{STM, TRef}
import cats.syntax.all._
import cats.instances.option._

trait TQueue[A] {
def offer(a: A): STM[Boolean]
Expand All @@ -19,7 +20,6 @@ trait TQueue[A] {
def dequeueUpTo(n: Int): STM[NonEmptyList[A]] = {
dequeue.iterateUntilRetry.mapFilter(NonEmptyList.fromList)
}

}

object TQueue {
Expand Down Expand Up @@ -76,4 +76,11 @@ object TQueue {

def circularBufferIn[F[_]: Sync, A](max: Int): F[TQueue[A]] =
STM.unsafeToSync(circularBuffer(max))

implicit val invariant: Invariant[TQueue] = new Invariant[TQueue] {
def imap[A, B](fa: TQueue[A])(f: A => B)(g: B => A): TQueue[B] = new TQueue[B] {
def offer(a: B): STM[Boolean] = fa.offer(g(a))
def tryDequeue: STM[Option[B]] = Nested(fa.tryDequeue).map(f).value
}
}
}

0 comments on commit 7824e10

Please sign in to comment.