Skip to content

Commit

Permalink
feat: STM.iterateUntilRetry
Browse files Browse the repository at this point in the history
  • Loading branch information
oleg-py committed May 4, 2019
1 parent a1a6006 commit 9fb8bb6
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 8 deletions.
10 changes: 3 additions & 7 deletions shared/src/main/scala/com/olegpy/stm/concurrent/TQueue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.olegpy.stm.concurrent
import scala.collection.immutable.Queue

import cats.Foldable
import cats.data.{Chain, NonEmptyChain}
import cats.data.NonEmptyList
import cats.effect.Sync
import com.olegpy.stm.{STM, TRef}
import cats.syntax.all._
Expand All @@ -16,12 +16,8 @@ trait TQueue[A] {
def enqueueAll[F[_]: Foldable](fa: F[A]): STM[Unit] = fa.traverse_(enqueue)

def dequeue: STM[A] = tryDequeue.unNone
def dequeueUpTo(n: Int): STM[NonEmptyChain[A]] = {
def loop(as: Chain[A]): STM[Chain[A]] = tryDequeue.flatMap {
case Some(a) => loop(as :+ a)
case None => STM.pure(as)
}
loop(Chain.empty).mapFilter(NonEmptyChain.fromChain)
def dequeueUpTo(n: Int): STM[NonEmptyList[A]] = {
dequeue.iterateUntilRetry.mapFilter(NonEmptyList.fromList)
}

}
Expand Down
9 changes: 9 additions & 0 deletions shared/src/main/scala/com/olegpy/stm/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package object stm {
type Of[+A] <: Base with Tag

def pure[A](a: A): STM[A] = wrap(IO.pure(a))
def suspend[A](stm: => STM[A]): STM[A] = wrap(IO.suspend(expose[A](stm)))

val unit: STM[Unit] = wrap(IO.unit)
val retry: STM[Nothing] = delay { throw Retry(wrap(null)) }
def check(c: Boolean): STM[Unit] = retry.whenA(c)
Expand Down Expand Up @@ -45,6 +47,13 @@ package object stm {

def unNone[B](implicit ev: A <:< Option[B]): STM[B] =
functorFilter.mapFilter(self)(ev)

def iterateUntilRetry: STM[List[A]] = STM.suspend {
val b = List.newBuilder[A]
def loop: STM[List[A]] =
(self.map(b += _) >> loop).orElse(STM.pure(b.result()))
loop
}
}

implicit val monad: Monad[STM] with Defer[STM] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import com.olegpy.stm.results._

import java.io.{PrintWriter, StringWriter}

object SyntaxTests extends TestSuite with BaseIOSuite {
object APITests extends TestSuite with BaseIOSuite {
val tests = Tests {
"STM.atomically" - {
STM.atomically[IO](STM.pure(number))
Expand Down Expand Up @@ -65,5 +65,12 @@ object SyntaxTests extends TestSuite with BaseIOSuite {
val fk = STM.atomicallyK[IO]
fk(STM.pure(number)).map(_ ==> number)
}

"STM#iterateUntilRetry" - {
for {
r <- TRef.in[IO](4)
x <- r.modify(x => (x - 1, x)).filter(_ > 0).iterateUntilRetry.commit[IO]
} yield x ==> List(4, 3, 2, 1)
}
}
}

0 comments on commit 9fb8bb6

Please sign in to comment.