Skip to content

Commit 083a635

Browse files
authored
Merge pull request #4424 from BalmungSan/add-atomic-map
Add `AtomicMap`
2 parents 6acfb7b + d81546f commit 083a635

File tree

7 files changed

+599
-84
lines changed

7 files changed

+599
-84
lines changed

build.sbt

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1149,7 +1149,21 @@ lazy val std = crossProject(JSPlatform, JVMPlatform, NativePlatform)
11491149
ProblemFilters.exclude[DirectMissingMethodProblem](
11501150
"cats.effect.std.Mutex#ConcurrentImpl.EmptyCell"),
11511151
ProblemFilters.exclude[DirectMissingMethodProblem](
1152-
"cats.effect.std.Mutex#ConcurrentImpl.LockQueueCell")
1152+
"cats.effect.std.Mutex#ConcurrentImpl.LockQueueCell"),
1153+
// #4424, refactored private classes
1154+
ProblemFilters.exclude[IncompatibleMethTypeProblem](
1155+
"cats.effect.std.AtomicCell#AsyncImpl.this"),
1156+
ProblemFilters.exclude[IncompatibleMethTypeProblem](
1157+
"cats.effect.std.AtomicCell#ConcurrentImpl.this"),
1158+
// #4424, false warnings in CommonImpl due to lightbend-labs/mima#211
1159+
ProblemFilters.exclude[DirectAbstractMethodProblem](
1160+
"cats.effect.std.AtomicCell.modify"),
1161+
ProblemFilters.exclude[DirectAbstractMethodProblem](
1162+
"cats.effect.std.AtomicCell.evalUpdate"),
1163+
ProblemFilters.exclude[DirectAbstractMethodProblem](
1164+
"cats.effect.std.AtomicCell.evalGetAndUpdate"),
1165+
ProblemFilters.exclude[DirectAbstractMethodProblem](
1166+
"cats.effect.std.AtomicCell.evalUpdateAndGet")
11531167
)
11541168
)
11551169
.jsSettings(

docs/std/atomic-cell.md

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,6 @@ title: Atomic Cell
55

66
A synchronized, concurrent, mutable reference.
77

8-
Provides safe concurrent access and modification of its contents, by ensuring only one fiber
9-
can operate on them at the time. Thus, all operations except `get` may semantically block the
10-
calling fiber.
11-
128
```scala mdoc:silent
139
abstract class AtomicCell[F[_], A] {
1410
def get: F[A]
@@ -20,34 +16,45 @@ abstract class AtomicCell[F[_], A] {
2016
}
2117
```
2218

19+
Provides safe concurrent access and modification of its contents, by ensuring only one fiber
20+
can operate on them at the time. Thus, all operations except `get` may semantically block the
21+
calling fiber.
22+
2323
## Using `AtomicCell`
2424

2525
The `AtomicCell` can be treated as a combination of `Mutex` and `Ref`:
26+
2627
```scala mdoc:reset:silent
27-
import cats.effect.{IO, Ref}
28-
import cats.effect.std.Mutex
28+
import cats.effect.IO
29+
import cats.effect.std.AtomicCell
2930

3031
trait State
31-
class Service(mtx: Mutex[IO], ref: Ref[IO, State]) {
32-
def modify(f: State => IO[State]): IO[Unit] =
33-
mtx.lock.surround {
34-
for {
35-
current <- ref.get
36-
next <- f(current)
37-
_ <- ref.set(next)
38-
} yield ()
39-
}
32+
33+
class Service(cell: AtomicCell[IO, State]) {
34+
def modify(f: State => IO[State]): IO[Unit] =
35+
cell.evalUpdate(f)
4036
}
4137
```
4238

43-
The following is the equivalent of the example above:
39+
### Example
40+
41+
Imagine a random data generator,
42+
that requires running some effectual operations _(e.g. checking a database)_
43+
to produce a new value.
44+
In that case, it may be better to block than to repeat the operation.
45+
4446
```scala mdoc:reset:silent
4547
import cats.effect.IO
4648
import cats.effect.std.AtomicCell
4749

48-
trait State
49-
class Service(cell: AtomicCell[IO, State]) {
50-
def modify(f: State => IO[State]): IO[Unit] =
51-
cell.evalUpdate(current => f(current))
50+
trait Data
51+
52+
class RandomDataGenerator(cell: AtomicCell[IO, Data]) {
53+
// Generates a new random value.
54+
def next: IO[Data] =
55+
cell.evalUpdateAndGet(generate)
56+
57+
private def generate(previous: Data): IO[Data] =
58+
???
5259
}
5360
```

docs/std/atomic-map.md

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
---
2+
id: atomic-map
3+
title: Atomic Map
4+
---
5+
6+
A total map from `K` to `AtomicCell[F, V]`.
7+
8+
```scala mdoc:silent
9+
import cats.effect.std.AtomicCell
10+
11+
trait AtomicMap[F[_], K, V] {
12+
/**
13+
* Access the AtomicCell for the given key.
14+
*/
15+
def apply(key: K): AtomicCell[F, V]
16+
}
17+
```
18+
19+
It is conceptually similar to a `AtomicCell[F, Map[K, V]]`, but with better ergonomics when
20+
working on a per key basis. Note, however, that it does not support atomic updates to
21+
multiple keys.
22+
23+
Additionally, it also provide less contention: since all operations are performed on
24+
individual key-value pairs, the pairs can be sharded by key. Thus, multiple concurrent
25+
updates may be executed independently to each other, as long as their keys belong to
26+
different shards.
27+
28+
## Using `AtomicMap`
29+
30+
You can think of a `AtomicMap` like a `MapRef` that supports effectual updates by locking the underlying `Ref`.
31+
32+
```scala mdoc:reset:silent
33+
import cats.effect.IO
34+
import cats.effect.std.AtomicMap
35+
36+
trait State
37+
trait Key
38+
39+
class Service(am: AtomicMap[IO, Key, State]) {
40+
def modify(key: Key)(f: State => IO[State]): IO[Unit] =
41+
am(key).evalUpdate(f)
42+
}
43+
```
44+
45+
### Example
46+
47+
Imagine a parking tower,
48+
where users have access to specific floors,
49+
and getting a parking space involves an effectual operation _(e.g. a database call)_.
50+
In that case, it may be better to block than repeat the operation,
51+
but without blocking operations on different floors.
52+
53+
```scala mdoc:reset:silent
54+
import cats.effect.IO
55+
import cats.effect.std.AtomicMap
56+
57+
trait Car
58+
trait Floor
59+
trait ParkingSpace
60+
61+
class ParkingTowerService(state: AtomicMap[IO, Floor, List[ParkingSpace]]) {
62+
// Tries to park the given Car in the solicited Floor.
63+
// Returns either the assigned ParkingSpace, or None if this Floor is full.
64+
def parkCarInFloor(floor: Floor, car: Car): IO[Option[ParkingSpace]] =
65+
state(key = floor).evalModify {
66+
case firstFreeParkingSpace :: remainingParkingSpaces =>
67+
markParkingSpaceAsUsed(parkingSpace = firstFreeParkingSpace, car).as(
68+
remainingParkingSpaces -> Some(firstFreeParkingSpace)
69+
)
70+
71+
case Nil =>
72+
IO.pure(List.empty -> None)
73+
}
74+
75+
private def markParkingSpaceAsUsed(parkingSpace: ParkingSpace, car: Car): IO[Unit] =
76+
???
77+
}
78+
```

std/shared/src/main/scala/cats/effect/std/AtomicCell.scala

Lines changed: 127 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -153,35 +153,29 @@ object AtomicCell {
153153
of(M.empty)(F)
154154
}
155155

156-
private[effect] def async[F[_], A](init: A)(implicit F: Async[F]): F[AtomicCell[F, A]] =
157-
Mutex.apply[F].map(mutex => new AsyncImpl(init, mutex))
158-
159-
private[effect] def concurrent[F[_], A](init: A)(
160-
implicit F: Concurrent[F]): F[AtomicCell[F, A]] =
161-
(Ref.of[F, A](init), Mutex.apply[F]).mapN { (ref, m) => new ConcurrentImpl(ref, m) }
156+
private[effect] def async[F[_], A](
157+
init: A
158+
)(
159+
implicit F: Async[F]
160+
): F[AtomicCell[F, A]] =
161+
Mutex.apply[F].map(mutex => new AsyncImpl(init, lock = mutex.lock))
162162

163-
private final class ConcurrentImpl[F[_], A](
164-
ref: Ref[F, A],
165-
mutex: Mutex[F]
163+
private[effect] def concurrent[F[_], A](
164+
init: A
166165
)(
167166
implicit F: Concurrent[F]
168-
) extends AtomicCell[F, A] {
169-
override def get: F[A] = ref.get
170-
171-
override def set(a: A): F[Unit] =
172-
mutex.lock.surround(ref.set(a))
167+
): F[AtomicCell[F, A]] =
168+
(Ref.of[F, A](init), Mutex.apply[F]).mapN { (ref, mutex) =>
169+
new ConcurrentImpl(ref, lock = mutex.lock)
170+
}
173171

172+
// Provides common implementations for derived methods that depend on F being an applicative.
173+
private[effect] sealed abstract class CommonImpl[F[_], A](
174+
implicit F: Applicative[F]
175+
) extends AtomicCell[F, A] {
174176
override def modify[B](f: A => (A, B)): F[B] =
175177
evalModify(a => F.pure(f(a)))
176178

177-
override def evalModify[B](f: A => F[(A, B)]): F[B] =
178-
mutex.lock.surround {
179-
ref.get.flatMap(f).flatMap {
180-
case (a, b) =>
181-
ref.set(a).as(b)
182-
}
183-
}
184-
185179
override def evalUpdate(f: A => F[A]): F[Unit] =
186180
evalModify(a => f(a).map(aa => (aa, ())))
187181

@@ -192,12 +186,33 @@ object AtomicCell {
192186
evalModify(a => f(a).map(aa => (aa, aa)))
193187
}
194188

195-
private final class AsyncImpl[F[_], A](
189+
private[effect] final class ConcurrentImpl[F[_], A](
190+
ref: Ref[F, A],
191+
lock: Resource[F, Unit]
192+
)(
193+
implicit F: Concurrent[F]
194+
) extends CommonImpl[F, A] {
195+
override def get: F[A] =
196+
ref.get
197+
198+
override def set(a: A): F[Unit] =
199+
lock.surround(ref.set(a))
200+
201+
override def evalModify[B](f: A => F[(A, B)]): F[B] =
202+
lock.surround {
203+
ref.get.flatMap(f).flatMap {
204+
case (a, b) =>
205+
ref.set(a).as(b)
206+
}
207+
}
208+
}
209+
210+
private[effect] final class AsyncImpl[F[_], A](
196211
init: A,
197-
mutex: Mutex[F]
212+
lock: Resource[F, Unit]
198213
)(
199214
implicit F: Async[F]
200-
) extends AtomicCell[F, A] {
215+
) extends CommonImpl[F, A] {
201216
@volatile private var cell: A = init
202217

203218
override def get: F[A] =
@@ -206,17 +221,14 @@ object AtomicCell {
206221
}
207222

208223
override def set(a: A): F[Unit] =
209-
mutex.lock.surround {
224+
lock.surround {
210225
F.delay {
211226
cell = a
212227
}
213228
}
214229

215-
override def modify[B](f: A => (A, B)): F[B] =
216-
evalModify(a => F.pure(f(a)))
217-
218230
override def evalModify[B](f: A => F[(A, B)]): F[B] =
219-
mutex.lock.surround {
231+
lock.surround {
220232
F.delay(cell).flatMap(f).flatMap {
221233
case (a, b) =>
222234
F.delay {
@@ -225,14 +237,93 @@ object AtomicCell {
225237
}
226238
}
227239
}
240+
}
228241

229-
override def evalUpdate(f: A => F[A]): F[Unit] =
230-
evalModify(a => f(a).map(aa => (aa, ())))
242+
/**
243+
* Allows seeing a `AtomicCell[F, Option[A]]` as a `AtomicCell[F, A]`. This is useful not only
244+
* for ergonomic reasons, but because some implementations may save space.
245+
*
246+
* Setting the `default` value is the same as storing a `None` in the underlying `AtomicCell`.
247+
*/
248+
def defaultedAtomicCell[F[_], A](
249+
atomicCell: AtomicCell[F, Option[A]],
250+
default: A
251+
)(
252+
implicit F: Applicative[F]
253+
): AtomicCell[F, A] =
254+
new DefaultedAtomicCell[F, A](atomicCell, default)
231255

232-
override def evalGetAndUpdate(f: A => F[A]): F[A] =
233-
evalModify(a => f(a).map(aa => (aa, a)))
256+
private[effect] final class DefaultedAtomicCell[F[_], A](
257+
atomicCell: AtomicCell[F, Option[A]],
258+
default: A
259+
)(
260+
implicit F: Applicative[F]
261+
) extends CommonImpl[F, A] {
262+
override def get: F[A] =
263+
atomicCell.get.map(_.getOrElse(default))
234264

235-
override def evalUpdateAndGet(f: A => F[A]): F[A] =
236-
evalModify(a => f(a).map(aa => (aa, aa)))
265+
override def set(a: A): F[Unit] =
266+
if (a == default) atomicCell.set(None) else atomicCell.set(Some(a))
267+
268+
override def evalModify[B](f: A => F[(A, B)]): F[B] =
269+
atomicCell.evalModify { opt =>
270+
val a = opt.getOrElse(default)
271+
f(a).map {
272+
case (result, b) =>
273+
if (result == default) (None, b) else (Some(result), b)
274+
}
275+
}
276+
}
277+
278+
implicit def atomicCellOptionSyntax[F[_], A](
279+
atomicCell: AtomicCell[F, Option[A]]
280+
)(
281+
implicit F: Applicative[F]
282+
): AtomicCellOptionOps[F, A] =
283+
new AtomicCellOptionOps(atomicCell)
284+
285+
final class AtomicCellOptionOps[F[_], A] private[effect] (
286+
atomicCell: AtomicCell[F, Option[A]]
287+
)(
288+
implicit F: Applicative[F]
289+
) {
290+
def getOrElse(default: A): F[A] =
291+
atomicCell.get.map(_.getOrElse(default))
292+
293+
def unset: F[Unit] =
294+
atomicCell.set(None)
295+
296+
def setValue(a: A): F[Unit] =
297+
atomicCell.set(Some(a))
298+
299+
def modifyValueIfSet[B](f: A => (A, B)): F[Option[B]] =
300+
evalModifyValueIfSet(a => F.pure(f(a)))
301+
302+
def evalModifyValueIfSet[B](f: A => F[(A, B)]): F[Option[B]] =
303+
atomicCell.evalModify {
304+
case None =>
305+
F.pure((None, None))
306+
307+
case Some(a) =>
308+
f(a).map {
309+
case (result, b) =>
310+
(Some(result), Some(b))
311+
}
312+
}
313+
314+
def updateValueIfSet(f: A => A): F[Unit] =
315+
evalUpdateValueIfSet(a => F.pure(f(a)))
316+
317+
def evalUpdateValueIfSet(f: A => F[A]): F[Unit] =
318+
atomicCell.evalUpdate {
319+
case None => F.pure(None)
320+
case Some(a) => f(a).map(Some.apply)
321+
}
322+
323+
def getAndSetValue(a: A): F[Option[A]] =
324+
atomicCell.getAndSet(Some(a))
325+
326+
def withDefaultValue(default: A): AtomicCell[F, A] =
327+
defaultedAtomicCell(atomicCell, default)
237328
}
238329
}

0 commit comments

Comments
 (0)