-
-
Notifications
You must be signed in to change notification settings - Fork 8
/
Util.scala
153 lines (141 loc) · 6.01 KB
/
Util.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package nl.vroste.zio.kinesis.client
import zio._
import zio.stream.ZStream
import zio.Clock
import scala.annotation.nowarn
object Util {
implicit class ZStreamExtensions[-R, +E, +O](val stream: ZStream[R, E, O]) extends AnyVal {
// ZStream's groupBy using distributedWithDynamic is not performant enough, maybe because
// it breaks chunks
final def groupByKey2[K](
getKey: O => K,
substreamChunkBuffer: Int = 32 // Number of chunks to buffer per substream
): ZStream[R, E, (K, ZStream[Any, E, O])] =
ZStream.unwrapScoped[R] {
type GroupQueueValues = Exit[Option[E], Chunk[O]]
for {
substreamsQueue <- Queue
.unbounded[Exit[Option[E], (K, Queue[GroupQueueValues])]]
substreamsQueuesMap <- Ref.make(Map.empty[K, Queue[GroupQueueValues]])
inStream = {
def addToSubStream(key: K, values: Chunk[O]): ZIO[Any, Nothing, Unit] =
for {
substreams <- substreamsQueuesMap.get
_ <- if (substreams.contains(key))
substreams(key).offer(Exit.succeed(values))
else
Queue
.bounded[GroupQueueValues](substreamChunkBuffer)
.tap(_.offer(Exit.succeed(values)))
.tap(q => substreamsQueuesMap.update(_ + (key -> q)))
.tap(q => substreamsQueue.offer(Exit.succeed((key, q))))
.unit
} yield ()
stream.mapChunksZIO { chunk =>
ZIO
.foreachDiscard(chunk.groupBy(getKey)) { case (k, chunk) =>
ZIO.uninterruptible(addToSubStream(k, chunk))
}
.as(Chunk.empty)
}
}
_ <- ZIO.addFinalizer(
substreamsQueuesMap.get.flatMap(map =>
ZIO.foreachDiscard(map.values)(_.offer(Exit.fail(None)).catchAllCause(_ => UIO.unit))
)
)
} yield inStream mergeTerminateEither ZStream.fromQueueWithShutdown(substreamsQueue).flattenExitOption.map {
case (key, substreamQueue) =>
val substream = ZStream
.fromQueueWithShutdown(substreamQueue)
.flattenExitOption
.flattenChunks
(key, substream)
}
}
}
/**
* Schedule for exponential backoff up to a maximum interval and an optional maximum number of retries
*
* @param min
* Minimum backoff time
* @param max
* Maximum backoff time. When this value is reached, subsequent intervals will be equal to this value.
* @param factor
* Exponential factor. 2 means doubling, 1 is constant, < 1 means decreasing
* @param maxRecurs
* Maximum retries. When this number is exceeded, the schedule will end
* @tparam A
* Schedule input
*/
@nowarn("msg=a type was inferred to be `Any`")
def exponentialBackoff[A](
min: Duration,
max: Duration,
factor: Double = 2.0,
maxRecurs: Option[Int] = None
): Schedule[Clock, A, (Duration, Long)] =
(Schedule.exponential(min, factor).whileOutput(_ <= max) andThen Schedule.fixed(max).as(max)) &&
maxRecurs.map(Schedule.recurs).getOrElse(Schedule.forever)
/**
* Executes calls through a token bucket stream, ensuring a maximum rate of calls
*
* Allows for bursting
*
* @param units
* Maximum number of calls per duration
* @param duration
* Duration for nr of tokens
* @return
* The original function with rate limiting applied, as a managed resource
*/
def throttledFunction[R, I, E, A](units: Int, duration: Duration)(
f: I => ZIO[R, E, A]
): ZIO[Scope with Clock, Nothing, I => ZIO[R, E, A]] =
for {
requestsQueue <- Queue.bounded[(IO[E, A], Promise[E, A])](units / 2 * 2)
_ <- ZStream
.fromQueueWithShutdown(requestsQueue)
.throttleShape(units.toLong, duration, units.toLong)(_ => 1)
.mapZIO { case (effect, promise) => promise.completeWith(effect) }
.runDrain
.forkScoped
} yield (input: I) =>
for {
env <- ZIO.environment[R]
promise <- Promise.make[E, A]
_ <- requestsQueue.offer((f(input).provideEnvironment(env), promise))
result <- promise.await
} yield result
def throttledFunctionN(units: Int, duration: Duration): ThrottledFunctionPartial =
ThrottledFunctionPartial(units, duration)
final case class ThrottledFunctionPartial(units: Int, duration: Duration) {
def apply[R, I0, I1, E, A](f: (I0, I1) => ZIO[R, E, A]): ZIO[Scope with Clock, Nothing, (I0, I1) => ZIO[R, E, A]] =
throttledFunction[R, (I0, I1), E, A](units, duration) { case (i0, i1) =>
f(i0, i1)
}.map(Function.untupled(_))
def apply[R, I0, I1, I2, E, A](
f: (I0, I1, I2) => ZIO[R, E, A]
): ZIO[Scope with Clock, Nothing, (I0, I1, I2) => ZIO[R, E, A]] =
throttledFunction[R, (I0, I1, I2), E, A](units, duration) { case (i0, i1, i2) =>
f(i0, i1, i2)
}.map(Function.untupled(_))
}
/**
* Creates a resource that executes `effect` with intervals of `period` or via manual invocation
*
* After manual invocation, the next effect execution will be after interval. Any triggers during effect execution are
* ignored.
*
* @return
* ZIO that when executed, immediately starts execution of `effect`
*/
def periodicAndTriggerableOperation[R, A](
effect: ZIO[R, Nothing, A],
period: Duration
): ZIO[Scope with R with Clock, Nothing, UIO[Unit]] =
for {
queue <- ZIO.acquireRelease(Queue.dropping[Unit](1))(_.shutdown)
_ <- ((queue.take raceFirst ZIO.sleep(period)) *> effect *> queue.takeAll).forever.forkScoped
} yield queue.offer(()).unit
}