Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ZIO#cached #1361

Merged
merged 3 commits into from
Aug 10, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 17 additions & 0 deletions core-tests/jvm/src/test/scala/zio/IOSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import org.specs2.execute.Result
import scala.collection.mutable
import scala.util.Try
import zio.Cause.{ die, fail, interrupt, Both }
import zio.duration._

class IOSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends TestRuntime with GenIO with ScalaCheck {
import Prop.forAll
Expand Down Expand Up @@ -45,6 +46,7 @@ class IOSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends TestRuntim
Check `absolve` method on IO[E, Either[E, A]] returns the same IO[E, Either[E, String]] as `IO.absolve` does. $testAbsolve
Check non-`memoize`d IO[E, A] returns new instances on repeated calls due to referential transparency. $testNonMemoizationRT
Check `memoize` method on IO[E, A] returns the same instance on repeated calls. $testMemoization
Check `cached` method on IO[E, A] returns new instances after duration. $testCached
Check `raceAll` method returns the same IO[E, A] as `IO.raceAll` does. $testRaceAll
Check `firstSuccessOf` method returns the same IO[E, A] as `IO.firstSuccessOf` does. $testfirstSuccessOf
Check `zipPar` method does not swallow exit causes of loser. $testZipParInterupt
Expand Down Expand Up @@ -305,6 +307,21 @@ class IOSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends TestRuntim
)
}

def testCached = {
def incrementAndGet(ref: Ref[Int]): UIO[Int] = ref.modify(n => (n + 1, n + 1))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using update instead of modify, you won't have to repeat n + 1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

unsafeRun {
for {
ref <- Ref.make(0)
cache <- incrementAndGet(ref).cached(100.milliseconds)
a <- cache
b <- cache
_ <- clock.sleep(100.milliseconds)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be flaky due to non-determinism. We could use with MockClock or add flaky combinator to test (it's defined in RTSSpec).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, trying to test this is actually what led me to identify the issues with the MockClock. I used the flaky combinator for now. If we get the MockClock improvements in first we can switch it to use that or otherwise we can do it subsequently.

c <- cache
d <- cache
} yield (a must_=== b) && (b must_!== c) && (c must_=== d)
}
}

def testRaceAll = {
val io = IO.effectTotal("supercalifragilisticexpialadocious")
val ios = List.empty[UIO[String]]
Expand Down
24 changes: 24 additions & 0 deletions core/shared/src/main/scala/zio/ZIO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,30 @@ sealed trait ZIO[-R, +E, +A] extends Serializable { self =>
}
)(use)

/**
* Returns an effect that, if evaluated, will return the cached result of
* this effect. Cached results will expire after `timeToLive` duration.
*/
final def cached(timeToLive: Duration): ZIO[R with Clock, Nothing, IO[E, A]] = {

def get(cache: RefM[Option[Promise[E, A]]]): ZIO[R with Clock, E, A] =
cache.update {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use updateSome and only handle case None.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great suggestion!

case Some(p) =>
ZIO.succeed(Some(p))
case None =>
for {
p <- Promise.make[E, A]
_ <- self.to(p)
_ <- p.await.delay(timeToLive).flatMap(_ => cache.set(None)).fork
} yield Some(p)
}.flatMap(_.get.await)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! There are only two ways to improve this:

  • Make the ref a try state (empty, pending, or full), so two fibers don't try to fill it at the same time
  • Deal with interruption, i.e. ensure interruption of a get does not prevent the cache from ever being filled

The first would necessitate the second, I think.

In any case, looks good to me, 👍 to ship!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated it to deal with interruption by using uninterruptibleMask so only the waiting on the promise is interruptible. Regarding your first point, I thought I had addressed this by using RefM. My thinking was that if two fibers called get at the same time the first would create the promise, fork the fiber to fill it, and take a reference to that promise, and then the second would wait for the promise creation / forking and take a copy of the same promise. So the empty / pending / full states would correspond to none, some pending promise, and some completed promise. Does this work or am I messing something up here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes, you did. I forgot you were using RefM. Effects will be synchronized appropriately.


for {
r <- ZIO.environment[R with Clock]
cache <- RefM.make[Option[Promise[E, A]]](None)
} yield get(cache).provide(r)
}

/**
* Recovers from all errors.
*
Expand Down