-
Notifications
You must be signed in to change notification settings - Fork 155
/
index.bs
1744 lines (1446 loc) · 74.5 KB
/
index.bs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
<pre class="metadata">
Title: Streams Standard
Group: WHATWG
H1: Streams
Shortname: streams
Status: LS
Editor: Domenic Denicola, Google https://www.google.com/, d@domenic.me, https://domenic.me/
Abstract: This specification provides APIs for creating, composing, and consuming streams of data.
Abstract: These streams are designed to map efficiently to low-level I/O primitives, and allow easy
Abstract: composition with built-in backpressure and queuing. On top of streams, the web platform can
Abstract: build higher-level abstractions, such as filesystem or socket APIs, while at the same time
Abstract: users can use the supplied tools to build their own streams which integrate well with those
Abstract: of the web platform.
Logo: https://resources.whatwg.org/logo-streams.svg
!Participate: <a href="https://github.com/whatwg/streams/issues/new">File an issue</a> (<a href="https://github.com/whatwg/streams/issues?state=open">open issues</a>)
!Participate: <a href="http://wiki.whatwg.org/wiki/IRC">IRC: #whatwg on Freenode</a>
!Version History: <a href="https://github.com/whatwg/streams/commits">https://github.com/whatwg/streams/commits</a>
!Version History: <a href="https://twitter.com/streamsstandard">@streamsstandard</a>
</pre>
<style>
ol > li { margin: 0; }
.note + .example { margin-top: 1em; }
/* Self-link styles */
.heading {
position: relative;
}
a.self-link {
position: absolute;
top: 0;
left: -2.5em;
width: 2em;
height: 2em;
text-align: center;
border: none;
transition: opacity .2s;
opacity: .5;
}
a.self-link:hover {
opacity: 1;
text-decoration: none;
}
.heading > a.self-link {
font-size: 83%;
color: inherit;
}
li > a.self-link {
left: -3.5em;
}
dfn > a.self-link {
top: auto;
left: auto;
opacity: 0;
width: 1.5em;
height: 1.5em;
background: gray;
color: white;
font-style: normal;
transition: opacity .2s, background-color .2s, color .2s;
}
dfn:hover > a.self-link {
opacity: 1;
}
dfn > a.self-link:hover {
color: black;
background-color: #B1C99D;
}
a.self-link::before {
content: "¶";
}
.heading > a.self-link::before {
content: "§";
}
dfn > a.self-link::before {
content: "#";
}
/* Hide editor visually, but leave it for scraping tools */
.head .editor {
display: none;
}
</style>
<h2 id="intro">Introduction</h2>
<em>This section is non-normative.</em>
Large swathes of the web platform are built on streaming data: that is, data that is created, processed, and consumed
in an incremental fashion, without ever reading all of it into memory. The Streams Standard provides a common set of
APIs for creating and interfacing with such streaming data, embodied in <a>readable streams</a>,
<a>writable streams</a>, and <a>transform streams</a>.
This standard provides the base stream primitives which other parts of the web platform can use to expose their
streaming data. For example, [[FETCH]] could expose request bodies as a writable stream, or response bodies as a
readable stream. More generally, the platform is full of streaming abstractions waiting to be expressed as streams:
multimedia streams, file streams, interprocess communication, and more benefit from being able to process data
incrementally instead of buffering it all into memory and processing it in one go. By providing the foundation for
these streams to be exposed to developers, the Streams Standard enables use cases like:
<ul>
<li> Video effects: piping a readable video stream through a transform stream that applies effects in real time.
<li> Decompression: piping a file stream through a transform stream that selectively decompresses files from a
<kbd>.tgz</kbd> archive, turning them into <code>img</code> elements as the user scrolls through an image gallery.
<li> Image decoding: piping a HTTP response stream through a transform stream that decodes bytes into bitmap data,
and then through another transform that translates bitmaps into PNGs. If installed inside the <code>fetch</code>
hook of a service worker [[SERVICE-WORKERS]], this would allow developers to transparently polyfill new image
formats.
</ul>
The APIs described here provide unifying abstraction for all such streams, encouraging an ecosystem to grow around
these shared and composable interfaces. At the same time, they have been carefully designed to map efficiently to
low-level I/O concerns, and to encapsulate the trickier issues (such as <a>backpressure</a>) that come along for the
ride.
<h2 id="model">Model</h2>
A <dfn>chunk</dfn> is a single piece of data that is written to or read from a stream. It can be of any type; streams
can even contain chunks of different types. A chunk will often not be the most atomic unit of data for a given stream;
for example a binary stream might contain chunks consisting of 16 KiB <code>ArrayBuffer</code>s, instead of single
bytes.
<h3 id="rs-model">Readable Streams</h3>
A <dfn>readable stream</dfn> represents a source of data, from which you can read. In other words, data comes
<em>out</em> of a readable stream.
Although a readable stream can be created with arbitrary behavior, most readable streams wrap a lower-level I/O source,
called the <dfn>underlying source</dfn>. There are two types of underlying source: push sources and pull sources.
<dfn title="push source">Push sources</dfn> push data at you, whether or not you are listening for it. They may also
provide a mechanism for pausing and resuming the flow of data. An example push source is a TCP socket, where data is
constantly being pushed from the OS level, at a rate that can be controlled by changing the TCP window size.
<dfn title="pull source">Pull sources</dfn> require you to request data from them. The data may be available
synchronously, e.g. if it is held by the operating system's in-memory buffers, or asynchronously, e.g. if it has to be
read from disk. An example pull source is a file handle, where you seek to specific locations and read specific amounts.
Readable streams are designed to wrap both types of sources behind a single, unified interface.
<a>Chunks</a> are enqueued into the stream by the stream's creator, who usually derives them from the <a>underlying
source</a>. They can then be read one at a time via the stream's public interface.
Code that reads from a readable stream using its public interface is known as a <dfn>consumer</dfn>.
<h3 id="ws-model">Writable Streams</h3>
A <dfn>writable stream</dfn> represents a destination for data, into which you can write. In other words, data goes
<em>in</em> to a writable stream.
Analogously to readable streams, most writable streams wrap a lower-level I/O sink, called the
<dfn>underlying sink</dfn>. Writable streams work to abstract away some of the complexity of the underlying sink, by
queuing subsequent writes and only delivering them to the underlying sink one by one.
<a>Chunks</a> are enqueued into the stream via its public interface, and are passed one at a time to the stream's
creator. In turn, the creator will usually forward them to the <a>underlying sink</a>.
Code that writes into a writable stream using its public interface is known as a <dfn>producer</dfn>.
<h3 id="ts-model">Transform Streams</h3>
A <dfn>transform stream</dfn> consists of a pair of streams: a writable stream, and a readable stream.
In a manner specific to the transform stream in question, writes to the writable side result in new data being made
available for reading from the readable side.
Some examples of transform streams include:
<ul>
<li>A text decoder, to which bytes are written and from which strings are read;</li>
<li>A GZIP compressor, to which uncompressed bytes are written and from which compressed bytes are read;</li>
<li>A video decoder, to which encoded bytes are writen and from which uncompressed video frames are read.</li>
</ul>
<h3 id="pipe-chains">Pipe Chains and Backpressure</h3>
Streams are primarily used by <dfn>piping</dfn> them to each other. A readable stream can be piped directly to a
writable stream, or it can be piped through one or more transform streams first.
A set of streams piped together in this way is referred to as a <dfn>pipe chain</dfn>. In a pipe chain, the
<dfn>original source</dfn> is the <a>underlying source</a> of the first first readable stream in the chain; the
<dfn>ultimate sink</dfn> is the <a>underlying sink</a> of the final writable stream in the chain.
Once a pipe chain is constructed, it can be used to propagate signals regarding how fast data should flow through
it. If any step in the chain cannot yet accept data, it propagates a signal backwards through the pipe chain, until
eventually the original source is told to stop producing data so fast. This process of normalizing data flow from the
original source according to how fast the chain can process data is called <dfn>backpressure</dfn>.
<h3 id="queuing-strategies">Internal Queues and Queuing Strategies</h3>
Both readable and writable streams maintain <dfn>internal queues</dfn>, which they use for similar purposes. In the
case of a readable stream, the internal queue contains <a>chunks</a> that have been enqueued by the <a>underlying
source</a>, but not yet read by the consumer. In the case of a writable stream, the internal queue contains
<a>chunks</a> which have been written to the stream by the producer, but not yet processed and acknowledged by the
<a>underlying sink</a>.
A <dfn>queuing strategy</dfn> is a pair of methods that help determine whether a stream should apply
<a>backpressure</a> based on the state of its <a>internal queue</a>. The queuing strategy assigns a size to each
<a>chunk</a>; later, the stream implementation asks the queuing strategy whether the stream should apply backpressure,
based the total size of all chunks in the stream's internal queue.
<div class="example">
A simple example of a queuing strategy would be one that assigns a size of one to each chunk, and applies
backpressure whenever three or more chunks are in the internal queue. This would mean that up to three chunks could
be enqueued in a readable stream, or three chunks could be written to a writable stream, before they send a
backpressure signal.
</div>
<h2 id="rs">Readable Streams</h2>
<h3 id="rs-intro">Introduction to Readable Streams</h3>
<em>This section is non-normative.</em>
The readable stream API allows wrapping both pull and push sources into a single <code>ReadableStream</code>
abstraction. To accomplish this, the API uses the
<a href="http://domenic.me/2014/02/13/the-revealing-constructor-pattern/">revealing constructor pattern</a>. The
constructor of a given stream instance is supplied with two functions, <code>start</code> and <code>pull</code>, which
each are given the parameters <code>(enqueue, close, error)</code> representing capabilities tied to the internals of
the stream. By mediating all access to the internal state machine through these three functions, the stream's internal
state and bookkeeping can be kept private, allowing nobody but the original producer of the stream to insert data into
it.
<div class="example">
The following function creates readable streams that wrap web sockets [[HTML]], which are push sources that do not
support backpressure signals.
<pre><code class="lang-javascript">
function makeReadableWebSocketStream(url, protocols) {
const ws = new WebSocket(url, protocols);
ws.binaryType = "arraybuffer";
return new ReadableStream({
start(enqueue, close, error) {
// When adapting a push source, usually most of the work happens in start.
ws.onmessage = event => enqueue(event.data);
ws.onend = close;
ws.onerror = error;
},
cancel() {
ws.close();
}
});
}
</code></pre>
We can then use this function to create readable streams for web sockets, and pipe those streams to arbitrary
writable streams:
<pre><code class="lang-javascript">
var webSocketStream = makeReadableWebSocketStream("http://example.com", 80);
webSocketStream.pipeTo(writableStream)
.then(() => console.log("All data successfully written!"))
.catch(e => console.error("Something went wrong!", e));
</code></pre>
</div>
<div class="example">
The following function wraps a push source, represented by a hypothetical "raw socket" interface, which triggers
events for data, end, and error (much like a web socket), but also provides the ability to pause and resume the flow
of data. Thus, this example shows how to apply backpressure to <a>underlying sources</a> that support it.
<pre><code class="lang-javascript">
function makeSocketStream(host, port) {
const rawSocket = createRawSocketObject(host, port);
return new ReadableStream({
start(enqueue, close, error) {
rawSocket.ondata = event => {
if (!enqueue(event.data)) {
// If enqueue returns false, the internal queue is full, so propagate
// the backpressure signal to the underlying source.
rawSocket.readStop();
}
};
rawSocket.onend = close;
rawSocket.onerror = error;
},
pull() {
// This is called if the internal queue has been emptied, but the
// stream's consumer still wants more data. In that case, restart
// the flow of data if we have previously paused it.
rawSocket.readStart();
},
cancel() {
rawSocket.readStop();
}
});
}
</code></pre>
We can then use this function to create readable streams for such "raw sockets" in the same way we do for web
sockets. This time, however, when we pipe to a destination that cannot accept data as fast as the socket is producing
it, a backpressure signal will be sent to the raw socket.
</div>
<div class="example">
The following function wraps a pull source, represented by a "raw file handle," which provides methods for opening,
reading from, and closing itself. These methods can call their callbacks either synchronously or asynchronously—a
<a href="http://blog.izs.me/post/59142742143/designing-apis-for-asynchrony">Zalgo-releasing</a> horror which we can
hide from our users by wrapping them in a readable stream.
<pre><code class="lang-javascript">
function makeReadableFileStream(filename) {
const fileHandle = createRawFileHandle(filename, "r");
return new ReadableStream({
start() {
return new Promise((resolve, reject) => {
fileHandle.open(err => {
if (err) {
reject(err);
}
resolve();
});
});
},
pull(enqueue, close, error) {
// When adapting a pull source, usually most of the work happens in pull.
fileHandle.read((err, isDone, chunk) => {
if (err) {
// If trying to read data results in an error, report that.
error(err);
} else if (isDone) {
// If there's no more data to read, be sure to close the underlying
// source, ensuring that it succeeds before reporting success.
fileHandle.close(err => {
if (err) {
error(err);
}
close();
});
} else {
// If data was read successfully, enqueue it into the internal queue.
enqueue(chunk);
}
});
},
cancel() {
fileHandle.close();
}
});
}
</code></pre>
We can then create and use readable streams for files just as we could before for sockets.
</div>
<div class="example">
Although readable streams will usually be used by piping them to a writable stream, you can also "pump" them
directly, alternating between using the <code>read()</code> and <code>wait()</code> methods according to the current
value of the <code>state</code> property. For example, this function writes the contents of a readable stream to the
console as fast as they are available.
<pre><code class="lang-javascript">
function streamToConsole(readableStream) {
pump();
function pump() {
while (readableStream.state === "readable") {
console.log(readableStream.read());
}
if (readableStream.state === "closed") {
console.log("--- all done!");
} else {
// If we're in an error state, the returned promise will be rejected with
// that error, so no need to handle "waiting" vs. "errored" separately.
readableStream.ready.then(pump, e => console.error(e));
}
}
}
</code></pre>
</div>
<h3 id="rs-state-diagram">The Readable Stream State Diagram</h3>
<em>This section is non-normative.</em>
As evidenced by the above explanations, readable streams have a fairly complex internal state machine, which is
responsible for keeping track of the internal queue, and initiating appropriate actions in response to calls to a
stream's methods. This can be roughly summarized in the following diagram.
<figure>
<img src="readable-stream.svg" width="670" alt="The readable stream state machine diagram." />
<figcaption>
<dl>
<dt><span style="font-style: normal; font-weight: normal; font-family: monospace;">monospace</span></dt>
<dd>Methods of the stream</dd>
<dt><span style="font-style: normal; font-weight: bold;">bold</span></dt>
<dd>Constructor parameters</dd>
<dt><span style="font-style: italic; font-weight: normal;">italic</span></dt>
<dd>Capabilities given to constructor parameters</dd>
</dl>
</figcaption>
</figure>
<h3 id="rs-class">Class <code>ReadableStream</code></h3>
<h4 id="rs-class-definition">Class Definition </h4>
<em>This section is non-normative.</em>
If one were to write the <code>ReadableStream</code> class in something close to the syntax of [[!ECMASCRIPT]], it
would look like
<pre><code class="lang-javascript">
class ReadableStream {
constructor({
start = (enqueue, close, error) => {},
pull = (enqueue, close, error) => {},
cancel = (reason) => {},
strategy = %DefaultReadableStreamStrategy%
} = {})
get closed()
get ready()
get state()
cancel(reason)
pipeThrough({ writable, readable }, options)
pipeTo(dest, { preventClose, preventAbort, preventCancel } = {})
read()
}
</code></pre>
<h4 id="rs-internal-slots">Internal Slots </h4>
Instances of <code>ReadableStream</code> are created with the internal slots described in the following table:
<table>
<thead>
<tr>
<th>Internal Slot</th>
<th>Description (<em>non-normative</em>)</th>
</tr>
</thead>
<tr>
<td>\[[close]]
<td>A <a>Readable Stream Close Function</a> created with the ability to close this stream
</tr>
<tr>
<td>\[[closedPromise]]
<td>A promise that becomes fulfilled when the stream becomes <code>"closed"</code>; returned by the
<code>closed</code> getter
</tr>
<tr>
<td>\[[draining]]
<td>A boolean flag indicating whether the stream has been closed, but still has chunks in its internal queue that
have not yet been read
</tr>
<tr>
<td>\[[enqueue]]
<td>A <a>Readable Stream Enqueue Function</a> created with the ability to enqueue a passed <a>chunk</a> in this
stream
</tr>
<tr>
<td>\[[error]]
<td>A <a>Readable Stream Error Function</a> created with the ability to move this stream to an
<code>"errored"</code> state
</tr>
<tr>
<td>\[[onCancel]]
<td>A function passed to the constructor, meant to clean up and release access to the <a>underlying source</a>
</tr>
<tr>
<td>\[[onPull]]
<td>A function passed to the constructor, meant to pull more data from the <a>underlying source</a>
</tr>
<tr>
<td>\[[pulling]]
<td>A boolean flag indicating whether data is currently being pulled from the underlying sink
</tr>
<tr>
<td>\[[queue]]
<td>A List representing the stream's internal queue of <a>chunks</a>
</tr>
<tr>
<td>\[[started]]
<td>A boolean flag indicating whether the <a>underlying source</a> has finished starting
</tr>
<tr>
<td>\[[state]]
<td>A string containing the stream's current state; returned by the <code>state</code> getter
</tr>
<tr>
<td>\[[storedError]]
<td>A value indicating how the stream failed, to be given as a failure reason or exception when trying to operate
on the stream while in the <code>"errored"</code> state
</tr>
<tr>
<td>\[[strategy]]
<td>An object containing the stream's <a>queuing strategy</a>
</tr>
<tr>
<td>\[[readyPromise]]
<td>A promise that becomes fulfilled when the stream becomes <code>"readable"</code>, and is replaced with a new
pending promise when the stream becomes <code>"waiting"</code>; returned by the <code>wait()</code> method
</tr>
</table>
<h4 id="rs-constructor">new ReadableStream({ start, pull, cancel, strategy } = {})</h4>
<div class="note">
The constructor is passed several functions, all optional, which govern how the constructed stream instance behaves:
<ul>
<li> <var>start</var>(<var>enqueue</var>, <var>close</var>, <var>error</var>) is called immediately, and is
typically used to adapt a <a>push source</a> by setting up relevant event listeners, or to acquire access to a
<a>pull source</a>. If this process is asynchronous, it can return a promise to signal success or failure.
<li> <var>pull</var>(<var>enqueue</var>, <var>close</var>, <var>error</var>) is called when the stream's internal
queue of chunks is depleted, and the consumer has signaled that they wish to consume more data. Once it is
called, it will not be called again until the passed <var>enqueue</var> callback is called.
<li> <var>cancel</var>(<var>reason</var>) is called when the consumer signals that they are no longer interested
in the stream. It should perform any actions necessary to release access to the <a>underlying source</a>. If this
process is asynchronous, it can return a promise to signal success or failure.
</ul>
Both <var>start</var> and <var>pull</var> are given the ability to manipulate the stream's internal queue and state
via the passed <var>enqueue</var>, <var>close</var>, and <var>error</var> callbacks. This is an example of the
<a href="http://domenic.me/2014/02/13/the-revealing-constructor-pattern/">revealing constructor pattern</a>.
</div>
<ol>
<li> If <var>start</var> is <b>undefined</b>, set <var>start</var> to a no-op function.
<li> If IsCallable(<var>start</var>) is <b>false</b>, throw a <b>TypeError</b> exception.
<li> If <var>pull</var> is <b>undefined</b>, set <var>pull</var> to a no-op function.
<li> If IsCallable(<var>pull</var>) is <b>false</b>, throw a <b>TypeError</b> exception.
<li> If <var>cancel</var> is <b>undefined</b>, set <var>cancel</var> to a no-op function.
<li> If IsCallable(<var>cancel</var>) is <b>false</b>, throw a <b>TypeError</b> exception.
<li> If <var>strategy</var> is <b>undefined</b>, set <var>strategy</var> to %DefaultReadableStreamStrategy%.
<li> Set <b>this</b>@\[[onPull]] to <var>pull</var>.
<li> Set <b>this</b>@\[[onCancel]] to <var>cancel</var>.
<li> Set <b>this</b>@\[[strategy]] to <var>strategy</var>.
<li> Set <b>this</b>@\[[readyPromise]] and <b>this</b>@\[[closedPromise]] to new promises.
<li> Set <b>this</b>@\[[queue]] to a new empty List.
<li> Set <b>this</b>@\[[state]] to <code>"waiting"</code>.
<li> Set <b>this</b>@\[[started]], <b>this</b>@\[[draining]], and <b>this</b>@\[[pulling]] to <b>false</b>.
<li> Set <b>this</b>@\[[enqueue]] to CreateReadableStreamEnqueueFunction(<b>this</b>).
<li> Set <b>this</b>@\[[close]] to CreateReadableStreamCloseFunction(<b>this</b>).
<li> Set <b>this</b>@\[[error]] to CreateReadableStreamErrorFunction(<b>this</b>).
<li> Let <var>startResult</var> be the result of calling <var>start</var>(<b>this</b>@\[[enqueue]],
<b>this</b>@\[[close]], <b>this</b>@\[[error]]).
<li> ReturnIfAbrupt(<var>startResult</var>).
<li> Resolve <var>startResult</var> as a promise:
<ol>
<li> Upon fulfillment,
<ol>
<li> Set <b>this</b>@\[[started]] to <b>true</b>.
<li> Return CallReadableStreamPull(<b>this</b>).
</ol>
<li> Upon rejection with reason <var>r</var>, call-with-rethrow <b>this</b>@\[[error]](<var>r</var>).
</ol>
</ol>
<h4 id="rs-prototype">Properties of the <code>ReadableStream</code> Prototype </h4>
<h5 id="rs-closed">get closed </h5>
<div class="note">
The <code>closed</code> getter returns a promise that will be fulfilled when the stream becomes closed, or rejected
if it ever errors.
</div>
<ol>
<li> Return <b>this</b>@\[[closedPromise]].
</ol>
<h5 id="rs-wait">get ready</h5>
<div class="note">
The <code>ready</code> getter returns a promise that will be fulfilled either when the stream's internal queue becomes
nonempty, or the stream becomes closed. (The promise will be rejected if the stream errors.)
</div>
<ol>
<li> Return <b>this</b>@\[[readyPromise]].
</ol>
<h5 id="rs-state">get state </h5>
<div class="note">
The <code>state</code> getter returns the state of the stream, which will be one of the following:
<dl>
<dt><code>"waiting"</code>
<dd>The stream's internal queue is empty; use <code>.ready</code> to be notified of any changes.
<dt><code>"readable"</code>
<dd>The stream's internal queue has <a>chunks</a> available; call <code>.read()</code> to retrieve the next one.
<dt><code>"closed"</code>
<dd>All data has been successfully read from the stream, and no more is available.
<dt><code>"errored"</code>
<dd>An error occurred interacting with the <a>underlying source</a>, and so the stream is now dead.
</dl>
</div>
<ol>
<li> Return <b>this</b>@\[[state]].
</ol>
<h5 id="rs-cancel">cancel(reason) </h5>
<div class="note">
The <code>cancel</code> method signals a loss of interest in the stream by a consumer. Calling it will immediately
move the stream to a <code>"closed"</code> state, throwing away any queued data, as well as executing any
cancellation mechanism of the <a>underlying source</a>.
</div>
<ol>
<li> If <b>this</b>@\[[state]] is <code>"closed"</code>, return a new promise resolved with <b>undefined</b>.
<li> If <b>this</b>@\[[state]] is <code>"errored"</code>, return a new promise rejected with <b>this</b>@\[[storedError]].
<li> If <b>this</b>@\[[state]] is <code>"waiting"</code>, resolve <b>this</b>@\[[readyPromise]] with <b>undefined</b>.
<li> Let <b>this</b>@\[[queue]] be a new empty List.
<li> Set <b>this</b>@\[[state]] to <code>"closed"</code>.
<li> Resolve <b>this</b>@\[[closedPromise]] with <b>undefined</b>.
<li> Let <var>sourceCancelPromise</var> be the result of promise-calling <b>this</b>@\[[onCancel]](<var>reason</var>).
<li> Return the result of transforming <var>sourceCancelPromise</var> by a fulfillment handler that returns <b>undefined</b>.
</ol>
<h5 id="rs-pipe-through">pipeThrough({ writable, readable }, options)</h5>
<div class="note">
The <code>pipeThrough</code> method provides a convenient, chainable way of <a>piping</a> a <a>readable stream</a>
through a <a>transform stream</a> (or any other <code>{ writable, readable }</code> pair). It simply pipes the stream
into the writable side of the supplied pair, and returns the readable side for further use.
</div>
<div class="example">
A typical example of constructing <a>pipe chain</a> using <code>pipeThrough</code> would look like
<pre><code class="lang-javascript">
httpResponseBody
.pipeThrough(decompressorTransform)
.pipeThrough(ignoreNonImageFilesTransform)
.pipeTo(mediaGallery);
</code></pre>
</div>
<ol>
<li> If Type(<var>writable</var>) is not Object, then throw a <b>TypeError</b> exception.
<li> If Type(<var>readable</var>) is not Object, then throw a <b>TypeError</b> exception.
<li> Call-with-rethrow Invoke(<b>this</b>, <code>"pipeTo"</code>, (<var>writable</var>, <var>options</var>)).
<li> Return <var>readable</var>.
</ol>
<h5 id="rs-pipe-to">pipeTo(dest, options) </h5>
The <code>pipeTo</code> method is one of the more complex methods, and is undergoing some revision and edge-case
bulletproofing before we write it up in prose.
For now, please consider the reference implementation normative:
<a href="https://github.com/whatwg/streams/blob/master/reference-implementation/lib/readable-stream.js">reference-implementation/lib/readable-stream.js</a>,
look for the <code>pipeTo</code> method.
<h5 id="rs-read">read() </h5>
<div class="note">
The <code>read</code> method will return the next <a>chunk</a> from the stream's internal queue, if available. If
this causes the queue to become empty, more data will be pulled from the <a>underlying source</a>.
</div>
<ol>
<li> If <b>this</b>@\[[state]] is <code>"waiting"</code> or <code>"closed"</code>, throw a <b>TypeError</b> exception.
<li> If <b>this</b>@\[[state]] is <code>"errored"</code>, throw <b>this</b>@\[[storedError]].
<li> Assert: <b>this</b>@\[[state]] is <code>"readable"</code>.
<li> Assert: <b>this</b>@\[[queue]] is not empty.
<li> Let <var>chunk</var> be DequeueValue(<b>this</b>@\[[queue]]).
<li> If <b>this</b>@\[[queue]] is now empty,
<ol>
<li> If <b>this</b>@\[[draining]] is <b>true</b>,
<ol>
<li> Set <b>this</b>@\[[state]] to <code>"closed"</code>.
<li> Resolve <b>this</b>@\[[closedPromise]] with <b>undefined</b>.
</ol>
<li> If <b>this</b>@\[[draining]] is <b>false</b>,
<ol>
<li> Set <b>this</b>@\[[state]] to <code>"waiting"</code>.
<li> Let <b>this</b>@\[[readyPromise]] be a new promise.
</ol>
</ol>
<li> Call-with-rethrow CallReadableStreamPull(<b>this</b>).
<li> Return <var>chunk</var>.
</ol>
<h3 id="rs-abstract-ops">Readable Stream Abstract Operations </h3>
<h4 id="call-readable-stream-pull">CallReadableStreamPull ( stream )</h4>
<ol>
<li> If <var>stream</var>@\[[pulling]] is <b>true</b> or <var>stream</var>@\[[draining]] is <b>true</b> or
<var>stream</var>@\[[started]] is <b>false</b> or <var>stream</var>@\[[state]] is <code>"closed"</code> or
<var>stream</var>@\[[state]] is <code>"errored"</code>, return <b>undefined</b>.
<li> Let <var>shouldApplyBackpressure</var> be ShouldReadableStreamApplyBackpressure(<var>stream</var>).
<li> If <var>shouldApplyBackpressure</var> is <b>true</b>, return <b>undefined</b>.
<li> Set <var>stream</var>@\[[pulling]] to <b>true</b>.
<li> Let <var>pullResult</var> be the result of calling <var>stream</var>@\[[onPull]](<var>stream</var>@\[[enqueue]],
<var>stream</var>@\[[close]], <var>stream</var>@\[[error]]).
<li> If <var>pullResult</var> is an abrupt completion,
<ol>
<li> Call-with-rethrow <var>stream</var>@\[[error]](<var>pullResult</var>.\[[value]]).
<li> Return <var>pullResult</var>.
</ol>
<li> Otherwise, return <b>undefined</b>.
</ol>
<h4 id="create-readable-stream-close-function">CreateReadableStreamCloseFunction ( stream )</h4>
<ol>
<li> Return a new <a>Readable Stream Close Function</a> closing over <var>stream</var>.
</ol>
A <dfn>Readable Stream Close Function</dfn> is a built-in anonymous function of zero arguments, closing over a variable
<var>stream</var>, that performs the following steps:
<ol>
<li> If <var>stream</var>@\[[state]] is <code>"waiting"</code>,
<ol>
<li> Resolve <var>stream</var>@\[[readyPromise]] with <b>undefined</b>.
<li> Resolve <var>stream</var>@\[[closedPromise]] with <b>undefined</b>.
<li> Set <var>stream</var>@\[[state]] to <code>"closed"</code>.
</ol>
<li> If <var>stream</var>@\[[state]] is <code>"readable"</code>,
<ol>
<li> Set <var>stream</var>@\[[draining]] to <b>true</b>.
</ol>
</ol>
<h4 id="create-readable-stream-enqueue-function">CreateReadableStreamEnqueueFunction ( stream )</h4>
<ol>
<li> Return a new <a>Readable Stream Enqueue Function</a> closing over <var>stream</var>.
</ol>
A <dfn>Readable Stream Enqueue Function</dfn> is a built-in anonymous function of one argument <var>chunk</var>,
closing over a variable <var>stream</var>, that performs the following steps:
<ol>
<li> If <var>stream</var>@\[[state]] is <code>"errored"</code>, throw <var>stream</var>@\[[storedError]].
<li> If <var>stream</var>@\[[state]] is <code>"closed"</code>, throw a <b>TypeError</b> exception.
<li> If <var>stream</var>@\[[draining]] is <b>true</b>, throw a <b>TypeError</b> exception.
<li> Let <var>chunkSize</var> be Invoke(<var>stream</var>@\[[strategy]], <code>"size"</code>, (<var>chunk</var>)).
<li> If <var>chunkSize</var> is an abrupt completion,
<ol>
<li> Call-with-rethrow <var>stream</var>@\[[error]](<var>chunkSize</var>.\[[value]]).
<li> Return <var>chunkSize</var>.
</ol>
<li>Call-with-rethrow EnqueueValueWithSize(<var>stream</var>@\[[queue]], <var>chunk</var>,
<var>chunkSize</var>.\[[value]]).
<li> Set <var>stream</var>@\[[pulling]] to <b>false</b>.
<li> Let <var>shouldApplyBackpressure</var> be ShouldReadableStreamApplyBackpressure(<var>stream</var>).
<li> If <var>stream</var>@\[[state]] is <code>"waiting"</code>,
<ol>
<li> Set <var>stream</var>@\[[state]] to <code>"readable"</code>.
<li> Resolve <var>stream</var>@\[[readyPromise]] with <b>undefined</b>.
</ol>
<li> If <var>shouldApplyBackpressure</var>.\[[value]] is <b>true</b>, return <b>false</b>.
<li> Return <b>true</b>.
</ol>
<h4 id="create-readable-stream-error-function">CreateReadableStreamErrorFunction ( stream )</h4>
<ol>
<li> Return a new <a>Readable Stream Error Function</a> closing over <var>stream</var>.
</ol>
A <dfn>Readable Stream Error Function</dfn> is a built-in anonymous function of one argument <var>e</var>, closing over
a variable <var>stream</var>, that performs the following steps:
<ol>
<li> If <var>stream</var>@\[[state]] is <code>"waiting"</code>,
<ol>
<li> Set <var>stream</var>@\[[state]] to <code>"errored"</code>.
<li> Set <var>stream</var>@\[[storedError]] to <var>e</var>.
<li> Reject <var>stream</var>@\[[readyPromise]] with <var>e</var>.
<li> Reject <var>stream</var>@\[[closedPromise]] with <var>e</var>.
</ol>
<li> If <var>stream</var>@\[[state]] is <code>"readable"</code>,
<ol>
<li> Let <var>stream</var>@\[[queue]] be a new empty List.
<li> Set <var>stream</var>@\[[state]] to <code>"errored"</code>.
<li> Set <var>stream</var>@\[[storedError]] to <var>e</var>.
<li> Let <var>stream</var>@\[[readyPromise]] be a new promise rejected with <var>e</var>.
<li> Reject <var>stream</var>@\[[closedPromise]] with <var>e</var>.
</ol>
</ol>
<h4 id="should-readable-stream-apply-backpressure">ShouldReadableStreamApplyBackpressure ( stream )</h4>
<ol>
<li> Let <var>queueSize</var> be GetTotalQueueSize(<var>stream</var>@\[[queue]]).
<li> ReturnIfAbrupt(<var>queueSize</var>).
<li> Let <var>shouldApplyBackpressure</var> be ToBoolean(Invoke(<var>stream</var>@\[[strategy]],
<code>"shouldApplyBackpressure"</code>, (<var>queueSize</var>))).
<li> If <var>shouldApplyBackpressure</var> is an abrupt completion,
<ol>
<li> Call-with-rethrow <var>stream</var>@\[[error]](<var>shouldApplyBackpressure</var>.\[[value]]).
<li> Return <var>shouldApplyBackpressure</var>.
</ol>
<li> Otherwise, return <var>shouldApplyBackpressure</var>.\[[value]].
</ol>
<h3 id="default-rs-strategy">%DefaultReadableStreamStrategy%</h3>
%DefaultReadableStreamStrategy% is a well-known intrinsic object representing the default <a>queuing strategy</a> for
<a>readable streams</a>. It has two methods.
<div class="note">
The existence of an independent object for the default readable stream queuing strategy is not actually observable.
Thus, implementations could implement this default strategy by other means, e.g. by incorporating the default logic
into the readable stream algorithms themselves.
</div>
<h4 id="default-rs-strategy-should-apply-backpressure">shouldApplyBackpressure(queueSize)</h4>
<ol>
<li> Assert: <var>queueSize</var> is a non-<b>NaN</b> number.</li>
<li> Return <var>queueSize</var> > 1.
</ol>
<h4 id="default-rs-strategy-size">size()</h4>
<ol>
<li> Return 1.
</ol>
<h2 id="ws">Writable Streams</h2>
<h3 id="ws-intro">Introduction to Writable Streams</h3>
<em>This section is non-normative.</em>
The writable stream API allows wrapping of <a>underlying sinks</a> into an object on which two fundamental operations
can be performed: <a>chunks</a> can be written to the stream, and the stream can be closed.
The writable stream implementation is designed to encapsulate the potential complexity of the <a>underlying sink</a>
from users of the stream API. In particular, users of a stream object can write chunks to the stream at any pace,
without regard for whether previous writes have completed or succeeded. It is the job of the stream implementation to
ensure that writes are forwarded to the <a>underlying sink</a> in order, and only after successful completion of
previous writes. This allows seamless use of the writable stream even in cases such as piping a fast readable file
stream to a slower writable network socket stream, which cannot acknowledge the incoming data at the same rate it
becomes available.
<div class="example">
The following function wraps a web socket [[HTML]] as the <a>underlying sink</a> of a new writable stream. Web
sockets do not provide any way to tell when a given chunk of data has been successfully sent, so this writable stream
has no ability to communicate backpressure signals to any users: it will always be in the <code>"writable"</code>
state.
<pre><code class="lang-javascript">
function makeWritableWebSocketStream(url, protocols) {
const ws = new WebSocket(url, protocols);
return new WritableStream({
start(error) {
ws.onerror = error;
return new Promise(resolve => ws.onopen = resolve);
},
write(chunk) {
ws.send(chunk);
// Return immediately, since the web socket gives us no way to tell
// when the write completes.
},
close() {
return new Promise((resolve, reject) => {
ws.onclose = resolve;
ws.close();
});
}
});
}
</code></pre>
</div>
<div class="example">
The following function wraps an <a>underlying sink</a>, represented as a hypothetical "raw file handle," which
provides methods for opening, writing to, and closing itself. Notably, the raw file handle's <code>write</code> method
calls back to signal when writes are complete, which allows the stream to correctly communicate backpressure signals
to any users by setting its state to <code>"waiting"</code> instead of <code>"writable"</code> when the queue gets too
full. Allow of the raw file handle's methods can call their callbacks either synchronously or asynchronously—a
<a href="http://blog.izs.me/post/59142742143/designing-apis-for-asynchrony">Zalgo-releasing</a> horror which we can
hide from our users by wrapping them in a writable stream.
<pre><code class="lang-javascript">
function makeWritableFileStream(filename) {
const fileHandle = createRawFileHandle(filename, "w");
return new WritableStream({
start() {
return new Promise((resolve, reject) => {
fileHandle.open(err => {
if (err) {
reject(err);
}
resolve();
});
});
},
write(chunk) {
return new Promise((resolve, reject) => {
fileHandle.write(chunk, writeErr => {
if (writeErr) {
// If trying to write results in an error, (attempt to) close the
// underlying file handle; we're not going to write any more.
fileHandle.close(closeErr => {
// If *closing* errors, pass along that error to the stream.
if (closeErr) {
reject(closeErr);
}
// Otherwise, if closing succeeds, pass along the write error.
reject(writeErr);
});
} else {
// If there's no error, then signal that this write completed.
resolve();
}
});
});
},
close() {
return new Promise((resolve, reject) => {
fileHandle.close(err => {
if (err) {
reject(err);
}
resolve();
});
});
}
});
}
</code></pre>
We can then use this function to create a writable stream for a file, and then pipe a readable stream to it:
<pre><code class="lang-javascript">
var fileStream = makeWritableFileStream("/example/path/on/fs.txt");
readableStream.pipeTo(fileStream)
.then(() => console.log("All data successfully written!"))
.catch(e => console.error("Something went wrong!", e));
</code></pre>
Note that if a particular call to <code>fileHandle.write</code> takes a longer time, <code>done</code> will be
called later. In the meantime, additional writes can be queued up, which are stored in the stream's internal queue.
The accumulation of this queue can move the stream into a <code>"waiting"</code> state, which is a signal to users
of the stream that they should back off and stop writing if possible.
</div>
<div class="example">
Although writable streams will usually be used by piping to them from a readable stream, you can also write to them
directly. Since they queue any incoming writes, and take care internally to forward them to the <a>underlying sink</a>
in sequence, you can indiscriminately write to a writable stream without much ceremony:
<pre><code class="lang-javascript">
function writeArrayToStream(array, writableStream) {
array.forEach(chunk => writableStream.write(chunk));
return writableStream.close();
}
writeArrayToStream([1, 2, 3, 4, 5], writableStream)
.then(() => console.log("All done!"))
.catch(e => console.error("Error with the stream: " + e));
</code></pre>
</div>
<h3 id="ws-state-diagram">The Writable Stream State Diagram</h3>
<em>This section is non-normative.</em>
TODO
<h3 id="ws-class">Class <code>WritableStream</code></h3>
<h4 id="ws-class-definition">Class Definition </h4>
<em>This section is non-normative.</em>
If one were to write the <code>WritableStream</code> class in something close to the syntax of [[!ECMASCRIPT]], it