-
Notifications
You must be signed in to change notification settings - Fork 597
/
syntax.scala
100 lines (91 loc) · 4.08 KB
/
syntax.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/*
* Copyright (c) 2013 Functional Streams for Scala
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package fs2
package interop
package flow
import cats.effect.kernel.{Async, Resource}
import java.util.concurrent.Flow.{Publisher, Subscriber}
object syntax {
implicit final class PublisherOps[A](private val publisher: Publisher[A]) extends AnyVal {
/** Creates a [[Stream]] from an [[Publisher]].
*
* @example {{{
* scala> import cats.effect.IO
* scala> import fs2.Stream
* scala> import fs2.interop.flow.syntax._
* scala> import java.util.concurrent.Flow.Publisher
* scala>
* scala> def getThirdPartyPublisher(): Publisher[Int] = ???
* scala>
* scala> // Interop with the third party library.
* scala> Stream.eval(IO.delay(getThirdPartyPublisher())).flatMap { publisher =>
* | publisher.toStream[IO](chunkSize = 16)
* | }
* res0: Stream[IO, Int] = Stream(..)
* }}}
*
* @param chunkSize setup the number of elements asked each time from the [[Publisher]].
* A high number may be useful if the publisher is triggering from IO,
* like requesting elements from a database.
* A high number will also lead to more elements in memory.
* The stream will not emit new element until,
* either the `Chunk` is filled or the publisher finishes.
*/
def toStream[F[_]](chunkSize: Int)(implicit F: Async[F]): Stream[F, A] =
flow.fromPublisher(publisher, chunkSize)
}
implicit final class StreamOps[F[_], A](private val stream: Stream[F, A]) extends AnyVal {
/** Creates a [[Publisher]] from a [[Stream]].
*
* The stream is only ran when elements are requested.
* Closing the [[Resource]] means gracefully shutting down all active subscriptions.
* Thus, no more elements will be published.
*
* @note This Publisher can be reused for multiple Subscribers,
* each subscription will re-run the [[Stream]] from the beginning.
*
* @see [[subscribe]] for a simpler version that only requires a [[Subscriber]].
*/
def toPublisher(implicit F: Async[F]): Resource[F, Publisher[A]] =
flow.toPublisher(stream)
/** Subscribes the provided [[Subscriber]] to this stream.
*
* The returned program will run until
* all the stream elements were consumed.
* Cancelling this program will gracefully shutdown the subscription.
*
* @param subscriber the [[Subscriber]] that will receive the elements of the stream.
*/
def subscribe(subscriber: Subscriber[A])(implicit F: Async[F]): F[Unit] =
flow.subscribeStream(stream, subscriber)
}
final class FromPublisherPartiallyApplied[F[_]](private val dummy: Boolean) extends AnyVal {
def apply[A](
publisher: Publisher[A],
chunkSize: Int
)(implicit
F: Async[F]
): Stream[F, A] =
fromPublisher[F, A](chunkSize) { subscriber =>
F.delay(publisher.subscribe(subscriber))
}
}
}