Skip to content

Commit

Permalink
Prototype Safe Semaphore (#1523)
Browse files Browse the repository at this point in the history
* implement permits

* make TRef state private

* prototype safe semaphore

* implement permits

* make TRef state private

* #1219 - add doUntilEquals and doWhileEquals methods (#1634)

* Update zio-interop-reactivestreams to 1.0.3.1-RC1 (#1665)

* Add ZStream#cross, crossWith, <*>, <*, *>, <&>, zipLeft, zipRight, <&,  &> (#1510)

* Add ZStrem#zip left and right operators and alias for all zip operators

* formatted code to make scalafmtCheck pass.

* change for <*> variants with flatmap and <&> with zipWith.

* adding crossWith and cross functions to ZStream.

* merge conflict change.

* formatted naming conventions.

* Update the encoding of ZSink (#1560)

* prototype safe semaphore

* cleanup

* uodate documentation

* address review comments

* cleanup

* add back and deprecate methods

* cleanup

* make method private
  • Loading branch information
adamgfraser committed Nov 7, 2019
1 parent 3e49282 commit 0c32182
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 114 deletions.
32 changes: 16 additions & 16 deletions core-tests/shared/src/test/scala/zio/FiberSpec.scala
Expand Up @@ -32,26 +32,26 @@ object FiberSpec
},
testM("`orElse`") {
for {
fiberRef <- FiberRef.make(initial)
semaphore <- Semaphore.make(2)
_ <- semaphore.acquireN(2)
child1 <- (fiberRef.set("child1") *> semaphore.release).fork
child2 <- (fiberRef.set("child2") *> semaphore.release).fork
_ <- semaphore.acquireN(2)
_ <- child1.orElse(child2).inheritFiberRefs
value <- fiberRef.get
fiberRef <- FiberRef.make(initial)
latch1 <- Promise.make[Nothing, Unit]
latch2 <- Promise.make[Nothing, Unit]
child1 <- (fiberRef.set("child1") *> latch1.succeed(())).fork
child2 <- (fiberRef.set("child2") *> latch2.succeed(())).fork
_ <- latch1.await *> latch2.await
_ <- child1.orElse(child2).inheritFiberRefs
value <- fiberRef.get
} yield assert(value, equalTo("child1"))
},
testM("`zip`") {
for {
fiberRef <- FiberRef.make(initial)
semaphore <- Semaphore.make(2)
_ <- semaphore.acquireN(2)
child1 <- (fiberRef.set("child1") *> semaphore.release).fork
child2 <- (fiberRef.set("child2") *> semaphore.release).fork
_ <- semaphore.acquireN(2)
_ <- child1.zip(child2).inheritFiberRefs
value <- fiberRef.get
fiberRef <- FiberRef.make(initial)
latch1 <- Promise.make[Nothing, Unit]
latch2 <- Promise.make[Nothing, Unit]
child1 <- (fiberRef.set("child1") *> latch1.succeed(())).fork
child2 <- (fiberRef.set("child2") *> latch2.succeed(())).fork
_ <- latch1.await *> latch2.await
_ <- child1.zip(child2).inheritFiberRefs
value <- fiberRef.get
} yield assert(value, equalTo("child1"))
}
),
Expand Down
95 changes: 38 additions & 57 deletions core-tests/shared/src/test/scala/zio/SemaphoreSpec.scala
Expand Up @@ -3,30 +3,10 @@

package zio

import zio.duration._
import zio.test.Assertion._
import zio.test._
import zio.test.environment.TestClock
import SemaphoreSpecData._

object SemaphoreSpecData {
def offsettingReleasesAcquires(
acquires: (Semaphore, Vector[Long]) => UIO[Unit],
releases: (Semaphore, Vector[Long]) => UIO[Unit]
) = {
val permits = Vector(1L, 0L, 20L, 4L, 0L, 5L, 2L, 1L, 1L, 3L)

for {
semaphore <- Semaphore.make(0L)
acquiresFiber <- acquires(semaphore, permits).fork
releasesFiber <- releases(semaphore, permits).fork
_ <- acquiresFiber.join
_ <- releasesFiber.join
count <- semaphore.available
} yield assert(count, equalTo(0L))
}
}

object SemaphoreSpec
extends ZIOBaseSpec(
suite("SemaphoreSpec")(
Expand All @@ -35,71 +15,72 @@ object SemaphoreSpec
val n = 20L
for {
semaphore <- Semaphore.make(n)
available <- IO.foreach((0L until n).toList)(_ => semaphore.acquire) *> semaphore.available
} yield assert(available, equalTo(0L))
available <- IO.foreach((0L until n).toList)(_ => semaphore.withPermit(semaphore.available))
} yield assert(available, forall(equalTo(19L)))
},
testM("`acquire` permits in parallel") {
val n = 20L
for {
semaphore <- Semaphore.make(n)
available <- IO.foreachPar((0L until n).toList)(_ => semaphore.acquire) *> semaphore.available
} yield assert(available, equalTo(0L))
available <- IO.foreachPar((0L until n).toList)(_ => semaphore.withPermit(semaphore.available))
} yield assert(available, forall(isLessThan(20L)))
},
testM("`acquireN`s can be parallel with `releaseN`s") {
offsettingReleasesAcquires(
(s, permits) => IO.foreach(permits)(s.acquireN).unit,
(s, permits) => IO.foreach(permits.reverse)(s.releaseN).unit
offsettingWithPermits(
(s, permits) => IO.foreach(permits)(s.withPermits(_)(IO.unit)).unit
)
},
testM("individual `acquireN`s can be parallel with individual `releaseN`s") {
offsettingReleasesAcquires(
(s, permits) => IO.foreachPar(permits)(amount => s.acquireN(amount)).unit,
(s, permits) => IO.foreachPar(permits.reverse)(amount => s.releaseN(amount)).unit
offsettingWithPermits(
(s, permits) => IO.foreachPar(permits)(s.withPermits(_)(IO.unit)).unit
)
},
testM("semaphores and fibers play ball together") {
val n = 1L
for {
s <- Semaphore.make(n).tap(_.acquire)
_ <- s.release.fork
_ <- s.acquire
s <- Semaphore.make(n)
_ <- s.withPermit(IO.unit).fork
_ <- s.withPermit(IO.unit)
} yield assertCompletes
},
/**
* Ported from @mpilquist work in Cats Effect (https://github.com/typelevel/cats-effect/pull/403)
*/
testM("`acquire` doesn't leak permits upon cancellation") {
testM("`withPermit` doesn't leak permits upon failure") {
val n = 1L
for {
s <- Semaphore.make(n)
acquireFork <- s.acquireN(2).timeout(1.milli).fork
_ <- TestClock.adjust(1.milli) *> acquireFork.join
permitsFork <- (s.release *> clock.sleep(10.millis) *> s.available).fork
permits <- TestClock.adjust(10.millis) *> permitsFork.join
} yield assert(permits, equalTo(2L))
s <- Semaphore.make(n)
_ <- s.withPermit(IO.fail("fail")).either
permits <- s.available
} yield assert(permits, equalTo(1L))
},
/**
* Ported from @mpilquist work in Cats Effect (https://github.com/typelevel/cats-effect/pull/403)
*/
testM("`withPermit` does not leak fibers or permits upon cancellation") {
val n = 0L
val n = 1L
for {
s <- Semaphore.make(n)
acquireFork <- s.withPermit(s.release).timeout(1.milli).fork
_ <- TestClock.adjust(1.milli) *> acquireFork.join
permitsFork <- (s.release *> clock.sleep(10.millis) *> s.available).fork
permits <- TestClock.adjust(10.millis) *> permitsFork.join
s <- Semaphore.make(n)
fiber <- s.withPermit(IO.never).fork
_ <- fiber.interrupt
permits <- s.available
} yield assert(permits, equalTo(1L))
},
testM("`withPermitManaged` does not leak fibers or permits upon cancellation") {
for {
s <- Semaphore.make(0)
acquireFork <- s.withPermitManaged.use(_ => s.release).timeout(1.millisecond).fork
_ <- TestClock.adjust(1.milli) *> acquireFork.join
permitsFork <- (s.release *> clock.sleep(10.milliseconds) *> s.available).fork
permits <- TestClock.adjust(10.millis) *> permitsFork.join
s <- Semaphore.make(1L)
fiber <- s.withPermitManaged.use(_ => IO.never).fork
_ <- fiber.interrupt
permits <- s.available
} yield assert(permits, equalTo(1L))
}
)
)
)

object SemaphoreSpecData {
def offsettingWithPermits(withPermits: (Semaphore, Vector[Long]) => UIO[Unit]) = {
val permits = Vector(1L, 0L, 20L, 4L, 0L, 5L, 2L, 1L, 1L, 3L)

for {
semaphore <- Semaphore.make(20L)
fiber <- withPermits(semaphore, permits).fork
_ <- fiber.join
count <- semaphore.available
} yield assert(count, equalTo(20L))
}
}
58 changes: 36 additions & 22 deletions core/shared/src/main/scala/zio/Semaphore.scala
Expand Up @@ -41,6 +41,7 @@ final class Semaphore private (private val state: Ref[State]) extends Serializab
* If a permit is not available, the fiber invoking this method will be
* suspended until a permit is available.
*/
@deprecated("use withPermit", "1.0.0")
final def acquire: UIO[Unit] = acquireN(1)

/**
Expand All @@ -51,6 +52,7 @@ final class Semaphore private (private val state: Ref[State]) extends Serializab
*
* Ported from @mpilquist work in Cats Effect (https://github.com/typelevel/cats-effect/pull/403)
*/
@deprecated("use withPermits", "1.0.0")
final def acquireN(n: Long): UIO[Unit] =
assertNonNegative(n) *> IO.bracketExit(prepare(n))(cleanup)(_.awaitAcquire)

Expand All @@ -65,6 +67,7 @@ final class Semaphore private (private val state: Ref[State]) extends Serializab
/**
* Releases a single permit.
*/
@deprecated("use withPermit", "1.0.0")
final def release: UIO[Unit] = releaseN(1)

/**
Expand All @@ -74,26 +77,9 @@ final class Semaphore private (private val state: Ref[State]) extends Serializab
* they will be woken up (in FIFO order) if this action releases enough
* of them.
*/
final def releaseN(toRelease: Long): UIO[Unit] = {

@tailrec def loop(n: Long, state: State, acc: UIO[Unit]): (UIO[Unit], State) = state match {
case Right(m) => acc -> Right(n + m)
case Left(q) =>
q.dequeueOption match {
case None => acc -> Right(n)
case Some(((p, m), q)) =>
if (n > m)
loop(n - m, Left(q), acc <* p.succeed(()))
else if (n == m)
(acc <* p.succeed(())) -> Left(q)
else
acc -> Left((p -> (m - n)) +: q)
}
}

IO.flatten(assertNonNegative(toRelease) *> state.modify(loop(toRelease, _, IO.unit))).uninterruptible

}
@deprecated("use withPermits", "1.0.0")
final def releaseN(toRelease: Long): UIO[Unit] =
releaseN0(toRelease)

/**
* Acquires a permit, executes the action and releases the permit right after.
Expand Down Expand Up @@ -132,7 +118,7 @@ final class Semaphore private (private val state: Ref[State]) extends Serializab
def restore(p: Promise[Nothing, Unit], n: Long): UIO[Unit] =
IO.flatten(state.modify {
case Left(q) =>
q.find(_._1 == p).fold(releaseN(n) -> Left(q))(x => releaseN(n - x._2) -> Left(q.filter(_._1 != p)))
q.find(_._1 == p).fold(releaseN0(n) -> Left(q))(x => releaseN0(n - x._2) -> Left(q.filter(_._1 != p)))
case Right(m) => IO.unit -> Right(m + n)
})

Expand All @@ -141,13 +127,41 @@ final class Semaphore private (private val state: Ref[State]) extends Serializab
else
Promise.make[Nothing, Unit].flatMap { p =>
state.modify {
case Right(m) if m >= n => Acquisition(IO.unit, releaseN(n)) -> Right(m - n)
case Right(m) if m >= n => Acquisition(IO.unit, releaseN0(n)) -> Right(m - n)
case Right(m) => Acquisition(p.await, restore(p, n)) -> Left(IQueue(p -> (n - m)))
case Left(q) => Acquisition(p.await, restore(p, n)) -> Left(q.enqueue(p -> n))
}
}
}

/**
* Releases a specified number of permits.
*
* If fibers are currently suspended until enough permits are available,
* they will be woken up (in FIFO order) if this action releases enough
* of them.
*/
final private def releaseN0(toRelease: Long): UIO[Unit] = {

@tailrec def loop(n: Long, state: State, acc: UIO[Unit]): (UIO[Unit], State) = state match {
case Right(m) => acc -> Right(n + m)
case Left(q) =>
q.dequeueOption match {
case None => acc -> Right(n)
case Some(((p, m), q)) =>
if (n > m)
loop(n - m, Left(q), acc <* p.succeed(()))
else if (n == m)
(acc <* p.succeed(())) -> Left(q)
else
acc -> Left((p -> (m - n)) +: q)
}
}

IO.flatten(assertNonNegative(toRelease) *> state.modify(loop(toRelease, _, IO.unit))).uninterruptible

}

}

object Semaphore extends Serializable {
Expand Down
22 changes: 5 additions & 17 deletions docs/datatypes/semaphore.md
Expand Up @@ -3,14 +3,14 @@ id: datatypes_semaphore
title: "Semaphore"
---

A `Semaphore` datatype which allows synchronization between fibers with `acquire` and `release` operations.
A `Semaphore` datatype which allows synchronization between fibers with the `withPermit` operation, which safely acquires and releases a permit.
`Semaphore` is based on `Ref[A]` datatype.

## Operations

For example a synchronization of asynchronous tasks can
be done via acquiring and releasing a semaphore with given number of permits it can spend.
When the `acquire` operation cannot be performed, due to insufficient `permits` value in the semaphore, such task
When the acquire operation cannot be performed, due to insufficient `permits` value in the semaphore, such task
is placed in internal suspended fibers queue and will be awaken when `permits` value is sufficient:

```scala mdoc:silent
Expand All @@ -26,9 +26,7 @@ val task = for {
} yield ()

val semTask = (sem: Semaphore) => for {
_ <- sem.acquire
_ <- task
_ <- sem.release
_ <- sem.withPermit(task)
} yield ()

val semTaskSeq = (sem: Semaphore) => (1 to 3).map(_ => semTask(sem))
Expand All @@ -49,18 +47,8 @@ we can acquire and release any value, regarding semaphore's permits:

```scala mdoc:silent
val semTaskN = (sem: Semaphore) => for {
_ <- sem.acquireN(5)
_ <- task
_ <- sem.releaseN(5)
_ <- sem.withPermits(5)(task)
} yield ()
```

When acquiring and performing task is followed by equivalent release
then entire action can be done with `withPermit`
(or corresponding counting version `withPermits`):

```scala mdoc:silent
val permitTask = (sem: Semaphore) => for {
_ <- sem.withPermit(task)
} yield ()
```
The guarantee of `withPermit` (and its corresponding counting version `withPermits`) is that acquisition will be followed by equivalent release, regardless of whether the task succeeds, fails, or is interrupted.
4 changes: 2 additions & 2 deletions streams-tests/jvm/src/test/scala/zio/stream/StreamSpec.scala
Expand Up @@ -768,7 +768,7 @@ object StreamSpec
_ <- Stream(1, 2, 3, 4)
.flatMapParSwitch(1) { i =>
if (i > 3) Stream.bracket(UIO.unit)(_ => lastExecuted.set(true)).flatMap(_ => Stream.empty)
else Stream.bracket(semaphore.acquire)(_ => semaphore.release).flatMap(_ => Stream.never)
else Stream.managed(semaphore.withPermitManaged).flatMap(_ => Stream.never)
}
.runDrain
result <- semaphore.withPermit(lastExecuted.get)
Expand All @@ -782,7 +782,7 @@ object StreamSpec
.flatMapParSwitch(4) { i =>
if (i > 8)
Stream.bracket(UIO.unit)(_ => lastExecuted.update(_ + 1)).flatMap(_ => Stream.empty)
else Stream.bracket(semaphore.acquire)(_ => semaphore.release).flatMap(_ => Stream.never)
else Stream.managed(semaphore.withPermitManaged).flatMap(_ => Stream.never)
}
.runDrain
result <- semaphore.withPermits(4)(lastExecuted.get)
Expand Down

0 comments on commit 0c32182

Please sign in to comment.