-
Notifications
You must be signed in to change notification settings - Fork 8
/
index.mld
372 lines (287 loc) · 12.8 KB
/
index.mld
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
{0 Streaming}
Streaming abstractions that combine, transform and reduce large amounts of
sequential data efficiently, in constant space and without leaking resources.
{1 Overview}
{e Streaming} uses composable stream producers (sources), consumers (sinks) and
transformers (flows). The central model that abstracts over them is a
{{!module:Streaming.Stream} [Stream]}.
The following features are provided:
- {b Constant memory usage}: large or infinite streams can be computed in constant
and small space. Buffering of the input is possible when needed.
- {b Excellent performance}: all models were designed with performance at the
core. See {{:https://github.com/rizo/streams-bench} benchmarks} for detailed
comparison with other libraries.
- {b Resource safety}: resources in effectful streaming pipelines are allocated
lazily and released as early as possible. Resources are guaranteed to
be terminated even when streams rise exceptions.
- {b Flexibility and loose coupling}: push-based and pull-based models are
implemented to allow efficient zipping, concatenation and implementation of
decoupled sources, sinks and flows.
- {b Streaming notation}: build streams and sinks using a convenient
comprehension and applicative notations (see examples {{:#using-stream-notation} below}).
{1 Modules}
{%html:
<table class="xodoc">
<tr>
<td><a href="Streaming/Stream/index.html"><b><code>Stream</code></b></td>
<td>
Push-based streams with excellent overall performance, safe and lazy
resource management and a multitude of operations. Streams can be built
with <a
href="#using-stream-notation">comprehensions</a>.
</td>
</tr>
<tr>
<td><a href="Streaming/Source/index.html"><b><code>Source</code></b></td>
<td>
Pull-based producers of values with good performance, safe and lazy
resource management and zipping operations.
</td>
</tr>
<tr>
<td><a href="Streaming/Sink/index.html"><b><code>Sink</code></b></td>
<td>
Consumers of values with excellent performance, safe and lazy
resource management and flexible <a
href="Streaming/Sink/index.html/#combining-sinks">composition
operations</a>. Sinks are like first-class folds with early termination!
</td>
</tr>
<tr>
<td><a href="Streaming/Flow/index.html"><b><code>Flow</code></b></td>
<td>
Transformers of values that form composable streaming pipelines.
Excellent for defining source and sink independent value transformations.
</td>
</tr>
</table>
%}
For more information on each module consult the entrypoint module
{!module:Streaming}.
{1 Quickstart}
{2 Install using opam}
The library can be installed with OPAM: [opam install streaming]. You can run
[opam info streaming] to make sure that the library is installed.
{2 Install using esy}
Add ["@opam/streaming": "0.1"] to dependencies in your [package.json] file and
install the dependencies with [esy install]. Run [esy ls-modules] to make sure
that the library is installed for your project.
{2 Use with dune}
To start using {!module:Streaming} in your dune project add it to [libraries]
in the [dune] file.
{[
(executable
(public_name myexe)
(libraries streaming))
]}
Open the entrypoint module in your code to start using Streams:
{[
open Streaming
]}
{2 Use in the top-level}
Fire up {{:https://github.com/ocaml-community/utop} utop} or {{:https://github.com/dbuenzli/down} down} and run:
{[
# #require "streaming";;
# open Streaming;;
# Stream.(stdin |> stdout);;
We're streaming!<Enter>
We're streaming!
<Ctrl+d>
- : unit = ()
]}
That's it! Scroll down to see some examples or jump into the {{!module:Streaming} API documentation}.
{1 Examples}
{3 Read lines from [STDIN]}
{[
# Stream.stdin
|> Stream.filter ((<>) "")
|> Stream.map (fun line -> "You wrote: " ^ line)
|> Stream.each print_endline
]}
{3 Using stream notation}
{[
# let items =
let open Stream.Syntax in
let* n = Stream.range 1 3 in
let* c = Stream.of_list ['x'; 'y'] in
yield (n, c)
# Stream.to_list items
- : (int * char) list = [(1, 'x'); (1, 'y'); (2, 'x'); (2, 'y')])
]}
{3 Using sink notation}
Compute the arithmetic mean in a single iteration of the input.
{[
# let mean =
let open Sink.Syntax in
let+ total = Sink.sum
and+ count = Sink.len in
total / count
# Stream.(iota 20 |> into mean)
- : int = 9
]}
{1 Resource handling}
Input and output resources involved in stream processing will have the
following two properties:
- Resources are aquired lazily and only if neeeded. For example,
{!val:Streaming.Stream.of_file} will only open the file for reading when the
stream is being consumed. Similarly, a sink for a file will only open the
file when elements are being written into it. Sinks are initialized before
sources, when the computation begins.
- Stream operations do not leak resources. The clean up functions for sources
and sinks are guaranteed to be called in the presence of early termination
(when not all input is consumed) and in case of exceptions in the streaming
pipeline.
The following examples demonstrate these two properties in practice.
{3 Examples}
{[
(* Create a source that must not be initialized. *)
# let bomb () =
Source.make
~init:(fun () -> failwith "Boom!")
~pull:(fun () -> None)
()
val bomb : unit -> 'a source = <fun>
(* Feed it into a stream that terminates early. *)
# bomb ()
|> Stream.from
|> Stream.take 0
|> Stream.to_list
- : 'a list = []
]}
And... nothing! As you can see our "bomb" was never detonated. This is because
in streams, sinks (in our case it's [to_list] combined with [take]) are
checked for "fullness" before sources are initialized.
In the next example let's look at how streams behave in the presence of exceptions.
{[
# let i'm_a_source_i_must_not_leak () =
Source.make
~init:(fun () -> `Dangerous_input)
~pull:(fun st -> Some ("always blue", st))
~stop:(fun `Dangerous_input ->
print_endline "Stopping input... Phew!")
()
val i'm_a_source_i_must_not_leak : unit -> string source = <fun>
# let i'm_a_sink_i_must_not_leak () =
Sink.make
~init:(fun () -> `Dangerous_output)
~push:(fun `Dangerous_output x -> `Dangerous_output)
~stop:(fun `Dangerous_output ->
print_endline "Stopping output... That was close!")
()
val i'm_a_sink_i_must_not_leak : unit -> ('a, unit) sink = <fun>
# i'm_a_source_i_must_not_leak ()
|> Stream.from
|> Stream.map (fun x -> failwith "Boom!")
|> Stream.into (i'm_a_sink_i_must_not_leak ())
Stopping input... Phew!
Stopping output... That was close!
Exception: Failure "Boom!".
]}
Our termination functions run just before the world exploded!
{3 Limitations}
The termination functions in streaming are always guaranteed to be called. What
is currently not well specified is the state they will be called with. It is
possible for a source, for example, to stream lines from multiple files, while
seamlessly opening and closing them as the input is read. In normal termination
conditions, streaming will correctly call all termination functions with
correct states.
The same is not true in situations when exceptions are raised. Currently, when
there is an exception, streaming will call the termination function on the
first instance of the state, even though it might have changed.
This is not a difficult problem to solve, but a correct implementation has a
high performance cost.
In the future, streaming might expose safer stream management functions to help
with these situations. For now, it is recommended that sources and sinks
implement their [stop] functions in a way that allows them to close all
allocated resources when given only the first state. This can be achieved by
aggregating the intermediate states or using refs to allow the initial state to
point to the latest sate.
{1 Questions }
{2 When should I use [streaming]?}
[streaming] is a general-purpose streaming library with abstractions meant to
be used as a drop-in replacement for concrete sequential data-structures such
as lists. It is always a good idea to use a streaming model if you need
sequential access to data. For very small collections that do not need to be
processed multiple times, using lists is adequate. In all other situations,
streams are significantly better in terms of performance and composition.
Streaming abstractions are an excellent choice for stateful producers and
consumers that require precise resource management. Consuming elements
from a file or a database handler with streams is significantly safer. All
models in [streaming] are lazy (they will only initialise resources when
needed) and support prompt termination (the resources will be terminated
immediately when they are no longer needed). This is guaranteed even when
streaming pipelines raise exceptions.
Finally, streaming encourages implementation of small decoupled sources, sinks
and flow that can be reused in a wide spectrum of situations.
{2 How fast are Streams compared to other streaming models?}
In short - fast ({{:https://github.com/rizo/streams-bench} see} for yourself).
The {!module:Streaming.Stream} module was designed to improve the most
efficient iteration model currently available for OCaml, which is the so called
"internal iterator" with the type [('a -> unit) -> unit]. This iterator is
related to the commonly available "iter" functions in the standard library.
Streams are a variant of internal iterators that add support for resource
safety, early termination and avoid the need for mutations and exceptions in
the combinators. In addition to that, they provide a similar performance
profile.
{2 What's the difference between Sources and Streams?}
Both sources and streams produce values. The main difference is flow control:
with sources, consumers of the elements are in charge of control; while with
streams, it's the producers who drive the computation. This means that sources
should be used for situations where the elements are requested on demand, while
streams are best suited for "reactive" inputs.
In general, streams offer better performance than sources for the most common
operations (including concatenation) and offer
{{:Streaming/Stream/index.html#adaptors} integration} with sinks and
flows. On the other hand, sources are easier to create, and support
{{:Streaming/Source/index.html#zipping-sources} zipping}.
It is recommended to use sources to define decoupled producers that can be
consumed with streams. Any source can become a stream, but the opposite is not
so easy.
{2 Why are all streaming types in this library abstract?}
Even though the core types in streaming are very simple, stable and have many
nice properties, it is possible that the types might change in the future to
support new functionality such as backpressure and concurrency. To avoid
breaking changes, and until the library reaches the 1.0 milestone, the types are
going to be abstract.
If you have a use-case that requires access to the internal type, please
{{:https://github.com/odis-labs/streaming/issues/new} open an issue}.
{2 How can the net amount of entropy of the universe be massively decreased?}
{%html: <details style="cursor: pointer"><summary>You should ask AC.</summary> %}
{[
let answer () =
let x = "84726982693273833265833289698432737883857070736773697" ^
"8843268658465327079823265327769657873787170857632657883876982
What if there was no space and time?" in
let open Format in let (%^) = printf in
let open Streaming in let open Stream in
let (%)=fun f g x->f (g x) and
(|/-)=(%^)"\027[1m\027[41m";"\\|/-"in let (//) x = (|/-)
|> of_string
|> interpose '\b' |> append '\b'
|> cycle ~times:(Random.int 10)
|> append x
|> each (fun c -> "%c%!" %^ (Unix.sleepf 0.003; c)) in
from @@ (let pull i =
if i >= 114 then None
else Some (String.sub x i 2, i + 2)
in Source.make ~init:(fun () -> 0) ~pull ())
|> via (Flow.map (char_of_int%int_of_string))
|> into (Sink.each (//)); (%^) "\027[0m"
]}
{%html: </details> %}
{1 Troubleshooting}
- {b Incorrect [Stream] module} - if you forget to open [Streaming] you might
start using the [Stream] module from the standard library. This will result
in [Unbound value] and type mismatch errors.
- {b My stream pipeline never terminates} - this indicates that one of your
producers or transformers produces an infinite stream. Make sure that you are
limiting the stream input with operations such as
{!val:Streaming.Stream.take} or {!val:Streaming.Stream.drop}.
{1 Acknowledgements}
This library is based on ideas found in other libraries and research projects
such as: Haskell's {{:https://github.com/Gabriel439/Haskell-Pipes-Library}
Pipes } and {{:https://github.com/Gabriel439/Haskell-Foldl-Library} Foldl}
libraries, Scala's {{:https://zio.dev} ZIO Streams}, Clojure's
{{:https://clojure.org/reference/transducers} Transducers} and the
{{:http://okmij.org/ftp/Streams.html} Iteratees} streaming model by Oleg
Kiselyov.