-
Notifications
You must be signed in to change notification settings - Fork 514
/
Deferred.scala
265 lines (231 loc) · 8.22 KB
/
Deferred.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
/*
* Copyright 2020-2023 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cats
package effect
package kernel
import cats.effect.kernel.Deferred.TransformedDeferred
import cats.syntax.all._
import scala.annotation.tailrec
import scala.collection.immutable.LongMap
import java.util.concurrent.atomic.AtomicReference
/**
* A purely functional synchronization primitive which represents a single value which may not
* yet be available.
*
* When created, a `Deferred` is empty. It can then be completed exactly once, and never be made
* empty again.
*
* `get` on an empty `Deferred` will block until the `Deferred` is completed. `get` on a
* completed `Deferred` will always immediately return its content.
*
* `complete(a)` on an empty `Deferred` will set it to `a`, notify any and all readers currently
* blocked on a call to `get`, and return true. `complete(a)` on a `Deferred` that has already
* been completed will not modify its content, and return false.
*
* Albeit simple, `Deferred` can be used in conjunction with [[Ref]] to build complex concurrent
* behaviour and data structures like queues and semaphores.
*
* Finally, the blocking mentioned above is semantic only, no actual threads are blocked by the
* implementation.
*/
abstract class Deferred[F[_], A] extends DeferredSource[F, A] with DeferredSink[F, A] {
/**
* Modify the context `F` using transformation `f`.
*/
def mapK[G[_]](f: F ~> G): Deferred[G, A] =
new TransformedDeferred(this, f)
}
object Deferred {
/**
* Creates an unset Deferred. Every time you bind the resulting `F`, a new Deferred is
* created. If you want to share one, pass it as an argument and `flatMap` once.
*/
def apply[F[_], A](implicit F: GenConcurrent[F, _]): F[Deferred[F, A]] =
F.deferred[A]
/**
* Like `apply` but returns the newly allocated Deferred directly instead of wrapping it in
* `F.delay`. This method is considered unsafe because it is not referentially transparent --
* it allocates mutable state. In general, you should prefer `apply` and use `flatMap` to get
* state sharing.
*/
def unsafe[F[_]: Async, A]: Deferred[F, A] = new AsyncDeferred[F, A]
/**
* Like [[apply]] but initializes state using another effect constructor
*/
def in[F[_], G[_], A](implicit F: Sync[F], G: Async[G]): F[Deferred[G, A]] =
F.delay(unsafe[G, A])
sealed abstract private class State[A]
private object State {
final case class Set[A](a: A) extends State[A]
final case class Unset[A](readers: LongMap[A => Unit], nextId: Long) extends State[A]
val initialId = 1L
val dummyId = 0L
}
final class AsyncDeferred[F[_], A](implicit F: Async[F]) extends Deferred[F, A] {
// shared mutable state
private[this] val ref = new AtomicReference[State[A]](
State.Unset(LongMap.empty, State.initialId)
)
def get: F[A] = {
// side-effectful
def addReader(awakeReader: A => Unit): Long = {
@tailrec
def loop(): Long =
ref.get match {
case State.Set(a) =>
awakeReader(a)
State.dummyId // never used
case s @ State.Unset(readers, nextId) =>
val updated = State.Unset(
readers + (nextId -> awakeReader),
nextId + 1
)
if (!ref.compareAndSet(s, updated)) loop()
else nextId
}
loop()
}
// side-effectful
def deleteReader(id: Long): Unit = {
@tailrec
def loop(): Unit =
ref.get match {
case State.Set(_) => ()
case s @ State.Unset(readers, _) =>
val updated = s.copy(readers = readers - id)
if (!ref.compareAndSet(s, updated)) loop()
else ()
}
loop()
}
F.defer {
ref.get match {
case State.Set(a) =>
F.pure(a)
case State.Unset(_, _) =>
F.async[A] { cb =>
val resume = (a: A) => cb(Right(a))
F.delay(addReader(awakeReader = resume)).map { id =>
// if canceled
F.delay(deleteReader(id)).some
}
}
}
}
}
def tryGet: F[Option[A]] =
F.delay {
ref.get match {
case State.Set(a) => Some(a)
case State.Unset(_, _) => None
}
}
def complete(a: A): F[Boolean] = {
def notifyReaders(readers: LongMap[A => Unit]): F[Unit] = {
// LongMap iterators return values in unsigned key order,
// which corresponds to the arrival order of readers since
// insertion is governed by a monotonically increasing id
val cursor = readers.valuesIterator
var acc = F.unit
while (cursor.hasNext) {
val next = cursor.next()
val task = F.delay(next(a))
acc = acc >> task
}
acc
}
// side-effectful (even though it returns F[Unit])
@tailrec
def loop(): F[Boolean] =
ref.get match {
case State.Set(_) =>
F.pure(false)
case s @ State.Unset(readers, _) =>
val updated = State.Set(a)
if (!ref.compareAndSet(s, updated)) loop()
else {
val notify = if (readers.isEmpty) F.unit else notifyReaders(readers)
notify.as(true)
}
}
F.uncancelable(_ => F.defer(loop()))
}
}
implicit def catsInvariantForDeferred[F[_]: Functor]: Invariant[Deferred[F, *]] =
new Invariant[Deferred[F, *]] {
override def imap[A, B](fa: Deferred[F, A])(f: A => B)(g: B => A): Deferred[F, B] =
new Deferred[F, B] {
override def get: F[B] =
fa.get.map(f)
override def complete(b: B): F[Boolean] =
fa.complete(g(b))
override def tryGet: F[Option[B]] =
fa.tryGet.map(_.map(f))
}
}
final private[kernel] class TransformedDeferred[F[_], G[_], A](
underlying: Deferred[F, A],
trans: F ~> G)
extends Deferred[G, A] {
override def get: G[A] = trans(underlying.get)
override def tryGet: G[Option[A]] = trans(underlying.tryGet)
override def complete(a: A): G[Boolean] = trans(underlying.complete(a))
}
}
trait DeferredSource[F[_], A] extends Serializable {
/**
* Obtains the value of the `Deferred`, or waits until it has been completed. The returned
* value may be canceled.
*/
def get: F[A]
/**
* Obtains the current value of the `Deferred`, or None if it hasn't completed.
*/
def tryGet: F[Option[A]]
}
object DeferredSource {
implicit def catsFunctorForDeferredSource[F[_]: Functor]: Functor[DeferredSource[F, *]] =
new Functor[DeferredSource[F, *]] {
override def map[A, B](fa: DeferredSource[F, A])(f: A => B): DeferredSource[F, B] =
new DeferredSource[F, B] {
override def get: F[B] =
fa.get.map(f)
override def tryGet: F[Option[B]] =
fa.tryGet.map(_.map(f))
}
}
}
trait DeferredSink[F[_], A] extends Serializable {
/**
* If this `Deferred` is empty, sets the current value to `a`, and notifies any and all
* readers currently blocked on a `get`. Returns true.
*
* If this `Deferred` has already been completed, returns false.
*
* Satisfies: `Deferred[F, A].flatMap(r => r.complete(a) *> r.get) == a.pure[F]`
*/
def complete(a: A): F[Boolean]
}
object DeferredSink {
implicit def catsContravariantForDeferredSink[F[_]]: Contravariant[DeferredSink[F, *]] =
new Contravariant[DeferredSink[F, *]] {
override def contramap[A, B](fa: DeferredSink[F, A])(f: B => A): DeferredSink[F, B] =
new DeferredSink[F, B] {
override def complete(b: B): F[Boolean] =
fa.complete(f(b))
}
}
}