-
Notifications
You must be signed in to change notification settings - Fork 595
/
Pull.scala
252 lines (215 loc) · 10.1 KB
/
Pull.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
/*
* Copyright (c) 2013 Functional Streams for Scala
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package fs2
import cats.{Eval => _, _}
import cats.effect._
import cats.syntax.all._
import fs2.internal._
import fs2.internal.FreeC.{Eval, Result}
/** A `p: Pull[F,O,R]` reads values from one or more streams, returns a
* result of type `R`, and produces a `Stream[F,O]` when calling `p.stream`.
*
* Any resources acquired by `p` are freed following the call to `stream`.
*
* Laws:
*
* `Pull` forms a monad in `R` with `pure` and `flatMap`:
* - `pure >=> f == f`
* - `f >=> pure == f`
* - `(f >=> g) >=> h == f >=> (g >=> h)`
* where `f >=> g` is defined as `a => a flatMap f flatMap g`
*
* `raiseError` is caught by `handleErrorWith`:
* - `handleErrorWith(raiseError(e))(f) == f(e)`
*/
final class Pull[+F[_], +O, +R] private[fs2] (private val free: FreeC[F, O, R]) extends AnyVal {
private[fs2] def get: FreeC[F, O, R] = free
/** Alias for `_.map(_ => o2)`. */
def as[R2](r2: R2): Pull[F, O, R2] = map(_ => r2)
/** Returns a pull with the result wrapped in `Right`, or an error wrapped in `Left` if the pull has failed. */
def attempt: Pull[F, O, Either[Throwable, R]] =
new Pull(free.map(r => Right(r)).handleErrorWith(t => Result.Pure(Left(t))))
/** Interpret this `Pull` to produce a `Stream`, introducing a scope.
*
* May only be called on pulls which return a `Unit` result type. Use `p.void.stream` to explicitly
* ignore the result type of the pull.
*/
def stream(implicit ev: R <:< Unit): Stream[F, O] = {
val _ = ev
new Stream(FreeC.scope(free.asInstanceOf[FreeC[F, O, Unit]]))
}
/** Interpret this `Pull` to produce a `Stream` without introducing a scope.
*
* Only use this if you know a scope is not needed. Scope introduction is generally harmless and the risk
* of not introducing a scope is a memory leak in streams that otherwise would execute in constant memory.
*
* May only be called on pulls which return a `Unit` result type. Use `p.void.stream` to explicitly
* ignore the result type of the pull.
*/
def streamNoScope(implicit ev: R <:< Unit): Stream[F, O] = {
val _ = ev
new Stream(free.asInstanceOf[FreeC[F, O, Unit]])
}
/** Applies the resource of this pull to `f` and returns the result. */
def flatMap[F2[x] >: F[x], O2 >: O, R2](f: R => Pull[F2, O2, R2]): Pull[F2, O2, R2] =
new Pull(free.flatMap(r => f(r).free))
/** Alias for `flatMap(_ => p2)`. */
def >>[F2[x] >: F[x], O2 >: O, R2](p2: => Pull[F2, O2, R2]): Pull[F2, O2, R2] =
flatMap(_ => p2)
/** Lifts this pull to the specified effect type. */
def covary[F2[x] >: F[x]]: Pull[F2, O, R] = this
/** Lifts this pull to the specified effect type, output type, and resource type. */
def covaryAll[F2[x] >: F[x], O2 >: O, R2 >: R]: Pull[F2, O2, R2] = this
/** Lifts this pull to the specified output type. */
def covaryOutput[O2 >: O]: Pull[F, O2, R] = this
/** Lifts this pull to the specified resource type. */
def covaryResource[R2 >: R]: Pull[F, O, R2] = this
/** Applies the resource of this pull to `f` and returns the result in a new `Pull`. */
def map[R2](f: R => R2): Pull[F, O, R2] = new Pull(free.map(f))
/** Applies the outputs of this pull to `f` and returns the result in a new `Pull`. */
def mapOutput[O2](f: O => O2): Pull[F, O2, R] = new Pull(free.mapOutput(f))
/** Run `p2` after `this`, regardless of errors during `this`, then reraise any errors encountered during `this`. */
def onComplete[F2[x] >: F[x], O2 >: O, R2 >: R](p2: => Pull[F2, O2, R2]): Pull[F2, O2, R2] =
handleErrorWith(e => p2 >> new Pull(Result.Fail(e))) >> p2
/** If `this` terminates with `Pull.raiseError(e)`, invoke `h(e)`. */
def handleErrorWith[F2[x] >: F[x], O2 >: O, R2 >: R](
h: Throwable => Pull[F2, O2, R2]
): Pull[F2, O2, R2] =
new Pull(free.handleErrorWith(e => h(e).free))
/** Discards the result type of this pull. */
def void: Pull[F, O, Unit] = as(())
}
object Pull extends PullLowPriority {
/** Like [[eval]] but if the effectful value fails, the exception is returned in a `Left`
* instead of failing the pull.
*/
def attemptEval[F[_], R](fr: F[R]): Pull[F, INothing, Either[Throwable, R]] =
new Pull(
Eval[F, R](fr)
.map(r => Right(r): Either[Throwable, R])
.handleErrorWith(t => Result.Pure[Either[Throwable, R]](Left(t)))
)
/** The completed `Pull`. Reads and outputs nothing. */
val done: Pull[Pure, INothing, Unit] =
new Pull(Result.unit)
/** Evaluates the supplied effectful value and returns the result as the resource of the returned pull. */
def eval[F[_], R](fr: F[R]): Pull[F, INothing, R] =
new Pull(Eval[F, R](fr))
/** Extends the scope of the currently open resources to the specified stream, preventing them
* from being finalized until after `s` completes execution, even if the returned pull is converted
* to a stream, compiled, and evaluated before `s` is compiled and evaluated.
*/
def extendScopeTo[F[_], O](
s: Stream[F, O]
)(implicit F: MonadError[F, Throwable]): Pull[F, INothing, Stream[F, O]] =
for {
scope <- Pull.getScope[F]
lease <- Pull.eval(scope.leaseOrError)
} yield s.onFinalize(lease.cancel.redeemWith(F.raiseError(_), _ => F.unit))
/** Repeatedly uses the output of the pull as input for the next step of the pull.
* Halts when a step terminates with `None` or `Pull.raiseError`.
*/
def loop[F[_], O, R](f: R => Pull[F, O, Option[R]]): R => Pull[F, O, Option[R]] =
r => f(r).flatMap(_.map(loop(f)).getOrElse(Pull.pure(None)))
/** Outputs a single value. */
def output1[F[x] >: Pure[x], O](o: O): Pull[F, O, Unit] =
new Pull(FreeC.output1[O](o))
/** Outputs a chunk of values. */
def output[F[x] >: Pure[x], O](os: Chunk[O]): Pull[F, O, Unit] =
if (os.isEmpty) Pull.done else new Pull(FreeC.Output[O](os))
/** Pull that outputs nothing and has result of `r`. */
def pure[F[x] >: Pure[x], R](r: R): Pull[F, INothing, R] =
new Pull(Result.Pure(r))
/** Reads and outputs nothing, and fails with the given error.
*
* The `F` type must be explicitly provided (e.g., via `raiseError[IO]` or `raiseError[Fallible]`).
*/
def raiseError[F[_]: RaiseThrowable](err: Throwable): Pull[F, INothing, INothing] =
new Pull(Result.Fail(err))
final class PartiallyAppliedFromEither[F[_]] {
def apply[A](either: Either[Throwable, A])(implicit ev: RaiseThrowable[F]): Pull[F, A, Unit] =
either.fold(Pull.raiseError[F], Pull.output1)
}
/** Lifts an Either[Throwable, A] to an effectful Pull[F, A, Unit].
*
* @example {{{
* scala> import cats.effect.IO, scala.util.Try
* scala> Pull.fromEither[IO](Right(42)).stream.compile.toList.unsafeRunSync()
* res0: List[Int] = List(42)
* scala> Try(Pull.fromEither[IO](Left(new RuntimeException)).stream.compile.toList.unsafeRunSync())
* res1: Try[List[INothing]] = Failure(java.lang.RuntimeException)
* }}}
*/
def fromEither[F[x]] = new PartiallyAppliedFromEither[F]
/** Gets the current scope, allowing manual leasing or interruption.
* This is a low-level method and generally should not be used by user code.
*/
def getScope[F[_]]: Pull[F, INothing, Scope[F]] =
new Pull(FreeC.GetScope[F]())
/** Returns a pull that evaluates the supplied by-name each time the pull is used,
* allowing use of a mutable value in pull computations.
*/
def suspend[F[x] >: Pure[x], O, R](p: => Pull[F, O, R]): Pull[F, O, R] =
new Pull(FreeC.suspend(p.free))
/** `Sync` instance for `Pull`. */
implicit def syncInstance[F[_], O](implicit
ev: ApplicativeError[F, Throwable]
): Sync[Pull[F, O, *]] = {
val _ = ev
new PullSyncInstance[F, O]
}
/** `FunctionK` instance for `F ~> Pull[F, INothing, *]`
*
* @example {{{
* scala> import cats.Id
* scala> Pull.functionKInstance[Id](42).flatMap(Pull.output1).stream.compile.toList
* res0: cats.Id[List[Int]] = List(42)
* }}}
*/
implicit def functionKInstance[F[_]]: F ~> Pull[F, INothing, *] =
new (F ~> Pull[F, INothing, *]) {
def apply[X](fx: F[X]) = Pull.eval(fx)
}
}
private[fs2] trait PullLowPriority {
implicit def monadInstance[F[_], O]: Monad[Pull[F, O, *]] =
new PullSyncInstance[F, O]
}
private[fs2] class PullSyncInstance[F[_], O] extends Sync[Pull[F, O, *]] {
def pure[A](a: A): Pull[F, O, A] = Pull.pure(a)
def flatMap[A, B](p: Pull[F, O, A])(f: A => Pull[F, O, B]): Pull[F, O, B] =
p.flatMap(f)
override def tailRecM[A, B](a: A)(f: A => Pull[F, O, Either[A, B]]): Pull[F, O, B] =
f(a).flatMap {
case Left(a) => tailRecM(a)(f)
case Right(b) => Pull.pure(b)
}
def raiseError[A](e: Throwable) = new Pull(Result.Fail(e))
def handleErrorWith[A](fa: Pull[F, O, A])(h: Throwable => Pull[F, O, A]) =
fa.handleErrorWith(h)
def suspend[R](p: => Pull[F, O, R]) = Pull.suspend(p)
def bracketCase[A, B](acquire: Pull[F, O, A])(
use: A => Pull[F, O, B]
)(release: (A, ExitCase[Throwable]) => Pull[F, O, Unit]): Pull[F, O, B] =
new Pull(
FreeC.bracketCase(acquire.get, (a: A) => use(a).get, (a: A, c) => release(a, c).get)
)
}