-
Notifications
You must be signed in to change notification settings - Fork 509
/
Async.scala
281 lines (251 loc) · 9.1 KB
/
Async.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
/*
* Copyright (c) 2017-2018 The Typelevel Cats-effect Project Developers
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cats
package effect
import simulacrum._
import cats.data._
import cats.effect.IO.{Delay, Pure, RaiseError}
import cats.effect.internals.{Callback, IORunLoop}
import scala.annotation.implicitNotFound
import scala.concurrent.ExecutionContext
import scala.util.Either
/**
* A monad that can describe asynchronous or synchronous computations
* that produce exactly one result.
*
* ==On Asynchrony==
*
* An asynchronous task represents logic that executes independent of
* the main program flow, or current callstack. It can be a task whose
* result gets computed on another thread, or on some other machine on
* the network.
*
* In terms of types, normally asynchronous processes are represented as:
* {{{
* (A => Unit) => Unit
* }}}
*
* This signature can be recognized in the "Observer pattern" described
* in the "Gang of Four", although it should be noted that without
* an `onComplete` event (like in the Rx Observable pattern) you can't
* detect completion in case this callback can be called zero or
* multiple times.
*
* Some abstractions allow for signaling an error condition
* (e.g. `MonadError` data types), so this would be a signature
* that's closer to Scala's `Future#onComplete`:
*
* {{{
* (Either[Throwable, A] => Unit) => Unit
* }}}
*
* And many times the abstractions built to deal with asynchronous tasks
* also provide a way to cancel such processes, to be used in race
* conditions in order to cleanup resources early:
*
* {{{
* (A => Unit) => Cancelable
* }}}
*
* This is approximately the signature of JavaScript's `setTimeout`,
* which will return a "task ID" that can be used to cancel it.
*
* N.B. this type class in particular is NOT describing cancelable
* async processes, see the [[Concurrent]] type class for that.
*
* ==Async Type class==
*
* This type class allows the modeling of data types that:
*
* 1. can start asynchronous processes
* 1. can emit one result on completion
* 1. can end in error
*
* N.B. on the "one result" signaling, this is not an ''exactly once''
* requirement. At this point streaming types can implement `Async`
* and such an ''exactly once'' requirement is only clear in [[Effect]].
*
* Therefore the signature exposed by the [[Async!.async async]]
* builder is this:
*
* {{{
* (Either[Throwable, A] => Unit) => Unit
* }}}
*
* N.B. such asynchronous processes are not cancelable.
* See the [[Concurrent]] alternative for that.
*/
@typeclass
@implicitNotFound("""Cannot find implicit value for Async[${F}].
Building this implicit value might depend on having an implicit
s.c.ExecutionContext in scope, a Scheduler or some equivalent type.""")
trait Async[F[_]] extends Sync[F] with LiftIO[F] {
/**
* Creates a simple, noncancelable `F[A]` instance that
* executes an asynchronous process on evaluation.
*
* The given function is being injected with a side-effectful
* callback for signaling the final result of an asynchronous
* process.
*
* @param k is a function that should be called with a
* callback for signaling the result once it is ready
*/
def async[A](k: (Either[Throwable, A] => Unit) => Unit): F[A]
/**
* Inherited from [[LiftIO]], defines a conversion from [[IO]]
* in terms of the `Async` type class.
*
* N.B. expressing this conversion in terms of `Async` and its
* capabilities means that the resulting `F` is not cancelable.
* [[Concurrent]] then overrides this with an implementation
* that is.
*
* To access this implementation as a standalone function, you can
* use [[Async$.liftIO Async.liftIO]] (on the object companion).
*/
override def liftIO[A](ioa: IO[A]): F[A] =
Async.liftIO(ioa)(this)
/**
* Returns a non-terminating `F[_]`, that never completes
* with a result, being equivalent to `async(_ => ())`
*/
def never[A]: F[A] = async(_ => ())
/**
* DEPRECATED — moved to [[Async$.shift]].
*
* The reason for the deprecation is that there's no potential
* for optimisations in implementing instances, so this operation
* can be a simple utility.
*
* It's also a lawless operation that might be better served
* with a concrete, non-polymorphic implementation.
*/
@deprecated("Moved to Async$.shift, will be removed in 1.0", "0.10")
private[effect] def shift(ec: ExecutionContext): F[Unit] = {
// $COVERAGE-OFF$
Async.shift(ec)(this)
// $COVERAGE-ON$
}
}
object Async {
/**
* Returns an non-terminating `F[_]`, that never completes
* with a result, being equivalent with `async(_ => ())`.
*/
@deprecated("Moved to Async[F]", "0.10")
def never[F[_], A](implicit F: Async[F]): F[A] =
F.never
/**
* Generic shift operation, defined for any `Async` data type.
*
* Shifts the bind continuation onto the specified thread pool.
* Analogous with [[IO.shift(ec* IO.shift]].
*/
def shift[F[_]](ec: ExecutionContext)(implicit F: Async[F]): F[Unit] =
F.async { cb =>
ec.execute(new Runnable {
def run(): Unit = cb(Callback.rightUnit)
})
}
/**
* Lifts any `IO` value into any data type implementing [[Async]].
*
* This is the default `Async.liftIO` implementation.
*/
def liftIO[F[_], A](io: IO[A])(implicit F: Async[F]): F[A] =
io match {
case Pure(a) => F.pure(a)
case RaiseError(e) => F.raiseError(e)
case Delay(thunk) => F.delay(thunk())
case _ =>
F.suspend {
IORunLoop.step(io) match {
case Pure(a) => F.pure(a)
case RaiseError(e) => F.raiseError(e)
case async => F.async(async.unsafeRunAsync)
}
}
}
/**
* [[Async]] instance built for `cats.data.EitherT` values initialized
* with any `F` data type that also implements `Async`.
*/
implicit def catsEitherTAsync[F[_]: Async, L]: Async[EitherT[F, L, ?]] =
new EitherTAsync[F, L] { def F = Async[F] }
/**
* [[Async]] instance built for `cats.data.OptionT` values initialized
* with any `F` data type that also implements `Async`.
*/
implicit def catsOptionTAsync[F[_]: Async]: Async[OptionT[F, ?]] =
new OptionTAsync[F] { def F = Async[F] }
/**
* [[Async]] instance built for `cats.data.StateT` values initialized
* with any `F` data type that also implements `Async`.
*/
implicit def catsStateTAsync[F[_]: Async, S]: Async[StateT[F, S, ?]] =
new StateTAsync[F, S] { def F = Async[F] }
/**
* [[Async]] instance built for `cats.data.WriterT` values initialized
* with any `F` data type that also implements `Async`.
*/
implicit def catsWriterTAsync[F[_]: Async, L: Monoid]: Async[WriterT[F, L, ?]] =
new WriterTAsync[F, L] { def F = Async[F]; def L = Monoid[L] }
/**
* [[Async]] instance built for `cats.data.Kleisli` values initialized
* with any `F` data type that also implements `Async`.
*/
implicit def catsKleisliAsync[F[_]: Async, R]: Async[Kleisli[F, R, ?]] =
new KleisliAsync[F, R] { def F = Async[F]; }
private[effect] trait EitherTAsync[F[_], L] extends Async[EitherT[F, L, ?]]
with Sync.EitherTSync[F, L]
with LiftIO.EitherTLiftIO[F, L] {
override implicit protected def F: Async[F]
protected def FF = F
def async[A](k: (Either[Throwable, A] => Unit) => Unit): EitherT[F, L, A] =
EitherT.liftF(F.async(k))
}
private[effect] trait OptionTAsync[F[_]] extends Async[OptionT[F, ?]]
with Sync.OptionTSync[F]
with LiftIO.OptionTLiftIO[F] {
override protected implicit def F: Async[F]
protected def FF = F
def async[A](k: (Either[Throwable, A] => Unit) => Unit): OptionT[F, A] =
OptionT.liftF(F.async(k))
}
private[effect] trait StateTAsync[F[_], S] extends Async[StateT[F, S, ?]]
with Sync.StateTSync[F, S]
with LiftIO.StateTLiftIO[F, S] {
override protected implicit def F: Async[F]
protected def FA = F
def async[A](k: (Either[Throwable, A] => Unit) => Unit): StateT[F, S, A] =
StateT.liftF(F.async(k))
}
private[effect] trait WriterTAsync[F[_], L] extends Async[WriterT[F, L, ?]]
with Sync.WriterTSync[F, L]
with LiftIO.WriterTLiftIO[F, L] {
override protected implicit def F: Async[F]
protected def FA = F
def async[A](k: (Either[Throwable, A] => Unit) => Unit): WriterT[F, L, A] =
WriterT.liftF(F.async(k))(L, FA)
}
private[effect] trait KleisliAsync[F[_], R] extends Async[Kleisli[F, R, ?]]
with Sync.KleisliSync[F, R] {
override protected implicit def F: Async[F]
override def async[A](k: (Either[Throwable, A] => Unit) => Unit): Kleisli[F, R, A] =
Kleisli.liftF(F.async(k))
}
}