/
scramjet.d.ts
1519 lines (1356 loc) · 59.5 KB
/
scramjet.d.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/**
* Scramjet main exports expose all the stream classes and a number of methods.
*
* All scramjet streams allow writing, reading or transform modes - currently
* exclusively (meaning you can't have two at once). Any of the scramjet streams
* can be constructed with the following options passed to mimic node.js standard streams:
*
* * `async promiseTransform(chunk)` - transform method that resolves with a single output chunk
* * `async promiseWrite(chunk)` - write method that that resolves when chunk is written
* * `async promiseRead(count)` - read method that resolves with an array of chunks when called
*
* See {@link https://nodejs.org/api/stream.html#stream_api_for_stream_implementers node.js API for stream implementers for details}
*
* The object exposes the following classes:
*
* * `DataStream` {@see DataStream} - the basic object stream of any type
* * `StringStream` {@see StringStream} - a stream of strings
* * `BufferStream` {@see BufferStream} - a stream of buffers
* * `MultiStream` {@see MultiStream} - a group of streams
* * `NumberStream` {@see NumberStream} - a stream of numbers
* * `WindowStream` {@see WindowStream} - a stream of windows of objects
*
* The general concept of Scramjet streams is facilitating node's TransformStream mechanism so that you don't need
* to create a number of streams and create the pipeline, but use the concept of chaining instead. When you call `parse`
* method for instance, scramjet creates a new stream, pipes it to the callee and forwards errors.
*
* What's worth mentioning - scramjet tries to limit the number of created transform streams and pushes the transforms
* one after another into the same stream class therefore a code `stream.map(transform1).map(transform2).filter(transform3)`
* will only operate on a single transform stream that evaluates all three transforms one after another.
*/
declare module 'scramjet' {
/**
* Creates a DataStream that's piped from the passed readable.
* @param input argument to be turned into new stream
* @param options options for creation of a new stream or the target stream
* @param ...args additional arguments for the stream - will be passed to the function or generator
*/
function from(input: any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | String | Readable, options?: DataStreamOptions | Writable, ...args: any): DataStream;
/**
* Creates a DataStream from an Array
* @param array list of chunks
* @param options the read stream options
*/
function fromArray(array: any[], options?: ScramjetOptions): DataStream;
/**
* Creates a safe wrapper for scramjet transform module. See [Modules documentation](modules.md) for more info.
* @param transform
* @param options
* @param ...initialArgs
*/
function createTransformModule(transform: UseCallback, options?: CreateModuleOptions, ...initialArgs: any): Function;
/**
* Creates a safe wrapper for scramjet read module. See [Modules documentation](modules.md) for more info.
* @param anything
* @param options
* @param ...initialArgs
*/
function createReadModule(anything: any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | String | Readable, options?: CreateModuleOptions, ...initialArgs: any): Function;
/**
* Options for createModule
*/
interface CreateModuleOptions {
/**
* defines what class should the module assume
*/
StreamClass: DataStream;
}
/**
* Scramjet Error classes
*/
interface ScramjetErrors {
/**
* stream error class
*/
StreamError: Function;
}
/**
* Exposes error classes (undocumented)
*/
var errors: ScramjetErrors;
/**
* Definition of a single mixin for a specific Scramjet class. Should contain any number of stream methods.
*/
interface StreamMixin {
/**
* optional constructor that will be called in the stream constructor (this has to be an own property!)
*/
constructor: Function;
}
/**
* Definition of a plugin in Scramjet
*/
interface ScramjetPlugin {
/**
* definition of constructor and properties for the BufferStream prototype.
*/
BufferStream: StreamMixin;
/**
* definition of constructor and properties for the DataStream prototype.
*/
DataStream: StreamMixin;
/**
* definition of constructor and properties for the MultiStream prototype.
*/
MultiStream: StreamMixin;
/**
* definition of constructor and properties for the StringStream prototype.
*/
StringStream: StreamMixin;
}
/**
* Plugs in methods for any of the classes
* @param mixin the plugin object
*/
function plugin(mixin: ScramjetPlugin): ScramjetPlugin;
/**
* Gets an API version (this may be important for future use)
* @param version The required version (currently only: 1)
*/
function API(version: Number): ScramjetPlugin;
/**
* DataStream is the primary stream type for Scramjet. When you parse your
* stream, just pipe it you can then perform calculations on the data objects
* streamed through your flow.
*
* Use as:
*
* ```javascript
* const { DataStream } = require('scramjet');
*
* await (DataStream.from(aStream) // create a DataStream
* .map(findInFiles) // read some data asynchronously
* .map(sendToAPI) // send the data somewhere
* .run()); // wait until end
* ```
*/
class DataStream {
/**
* Create the DataStream.
* @param opts Stream options passed to superclass
*/
constructor(opts?: DataStreamOptions);
/**
* Returns a DataStream from pretty much anything sensibly possible.
*
* Depending on type:
* * `self` will return self immediately
* * `Readable` stream will get piped to the current stream with errors forwarded
* * `Array` will get iterated and all items will be pushed to the returned stream.
* The stream will also be ended in such case.
* * `GeneratorFunction` will get executed to return the iterator which will be used as source for items
* * `AsyncGeneratorFunction` will also work as above (including generators) in node v10.
* * `Iterable`s iterator will be used as a source for streams
*
* You can also pass a `Function` or `AsyncFunction` that will be executed and it's outcome will be
* passed again to `from` and piped to the initially returned stream. Any additional arguments will be
* passed as arguments to the function.
*
* If a `String` is passed, scramjet will attempt to resolve it as a module and use the outcome
* as an argument to `from` as in the Function case described above. For more information see {@link modules.md}
*
* A simple example from a generator:
*
* ```javascript
* DataStream
* .from(function* () {
* while(x < 100) yield x++;
* })
* .each(console.log)
* // 0
* // 1...
* // 99
* ```
* @param input argument to be turned into new stream
* @param options options for creation of a new stream or the target stream
* @param ...args additional arguments for the stream - will be passed to the function or generator
*/
static from(input: any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | String | Readable, options?: DataStreamOptions | Writable, ...args: any): DataStream;
/**
* Transforms stream objects into new ones, just like Array.prototype.map
* does.
*
* Map takes an argument which is the Function function operating on every element
* of the stream. If the function returns a Promise or is an AsyncFunction then the
* stream will await for the outcome of the operation before pushing the data forwards.
*
* A simple example that turns stream of urls into stream of responses
*
* ```javascript
* stream.map(async url => fetch(url));
* ```
*
* Multiple subsequent map operations (as well as filter, do, each and other simple ops)
* will be merged together into a single operation to improve performance. Such behaviour
* can be suppressed by chaining `.tap()` after `.map()`.
* @param func The function that creates the new object
* @param ClassType The class to be mapped to.
*/
map(func: MapCallback, ClassType?: Function): this;
/**
* Filters object based on the function outcome, just like Array.prototype.filter.
*
* Filter takes a Function argument which should be a Function or an AsyncFunction that
* will be called on each stream item. If the outcome of the operation is `falsy` (`0`, `''`,
* `false`, `null` or `undefined`) the item will be filtered from subsequent operations
* and will not be pushed to the output of the stream. Otherwise the item will not be affected.
*
* A simple example that filters out non-2xx responses from a stream
*
* ```javascript
* stream.filter(({statusCode}) => !(statusCode >= 200 && statusCode < 300));
* ```
* @param func The function that filters the object
*/
filter(func: FilterCallback): this;
/**
* Reduces the stream into a given accumulator
*
* Works similarly to Array.prototype.reduce, so whatever you return in the
* former operation will be the first operand to the latter. The result is a
* promise that's resolved with the return value of the last transform executed.
*
* A simple example that sums values from a stream
*
* ```javascript
* stream.reduce((accumulator, {value}) => accumulator + value);
* ```
*
* This method is serial - meaning that any processing on an entry will
* occur only after the previous entry is fully processed. This does mean
* it's much slower than parallel functions.
* @param func The into object will be passed as the first argument, the data object from the stream as the second.
* @param into Any object passed initially to the transform function
*/
reduce(func: ReduceCallback, into: Object): Promise<any>;
/**
* Perform an asynchronous operation without changing or resuming the stream.
*
* In essence the stream will use the call to keep the backpressure, but the resolving value
* has no impact on the streamed data (except for possible mutation of the chunk itself)
* @param func the async function
*/
do(func: DoCallback): this;
/**
* Processes a number of functions in parallel, returns a stream of arrays of results.
*
* This method is to allow running multiple asynchronous operations and receive all the
* results at one, just like Promise.all behaves.
*
* Keep in mind that if one of your methods rejects, this behaves just like Promise.all
* you won't be able to receive partial results.
* @param functions list of async functions to run
*/
all(functions: any[]): this;
/**
* Processes a number of functions in parallel, returns the first resolved.
*
* This method is to allow running multiple asynchronous operations awaiting just the
* result of the quickest to execute, just like Promise.race behaves.
*
* Keep in mind that if one of your methods it will only raise an error if that was
* the first method to reject.
* @param functions list of async functions to run
*/
race(functions: any[]): this;
/**
* Allows processing items without keeping order
*
* This method useful if you are not concerned about the order in which the
* chunks are being pushed out of the operation. The `maxParallel` option is
* still used for keeping a number of simultaneous number of parallel operations
* that are currently happening.
* @param func the async function that will be unordered
*/
unorder(func: MapCallback): void;
/**
* Allows own implementation of stream chaining.
*
* The async Function is called on every chunk and should implement writes in it's own way. The
* resolution will be awaited for flow control. The passed `into` argument is passed as the first
* argument to every call.
*
* It returns the DataStream passed as the second argument.
* @param func the method that processes incoming chunks
* @param into the DataStream derived class
*/
into(func: IntoCallback, into: DataStream): this;
/**
* Calls the passed method in place with the stream as first argument, returns result.
*
* The main intention of this method is to run scramjet modules - transforms that allow complex transforms of
* streams. These modules can also be run with [scramjet-cli](https://github.com/signicode/scramjet-cli) directly
* from the command line.
* @param func if passed, the function will be called on self to add an option to inspect the stream in place, while not breaking the transform chain. Alternatively this can be a relative path to a scramjet-module. Lastly it can be a Transform stream.
* @param ...parameters any additional parameters top be passed to the module
*/
use(func: AsyncGeneratorFunction | GeneratorFunction | UseCallback<this> | String | Readable, ...parameters: any): this;
/**
* Consumes all stream items doing nothing. Resolves when the stream is ended.
*
* This is very convienient if you're looking to use up the stream in operations that work on each entry like `map`. This uncorks the stream
* and allows all preceding operations to be run at any speed.
*
* All the data of the current stream will be discarded.
*
* The function returns a promise that is resolved when the stream ends.
*/
run(): Promise<any>;
/**
* Creates a pipeline of streams and returns a scramjet stream.
*
* This is similar to node.js stream pipeline method, but also takes scramjet modules
* as possibilities in an array of transforms. It may be used to run a series of non-scramjet
* transform streams.
*
* The first argument is anything streamable and will be sanitized by {@link DataStream..from}.
*
* Each following argument will be understood as a transform and can be any of:
* * AsyncFunction or Function - will be executed by {@link DataStream..use}
* * A transform stream that will be piped to the preceding stream
* @param readable the initial readable argument that is streamable by scramjet.from
* @param ...transforms Transform functions (as in {@link DataStream..use}) or Transform streams (any number of these as consecutive arguments)
* @returns a new DataStream instance of the resulting pipeline
*/
static pipeline(readable: any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | String | Readable, ...transforms: AsyncFunction | Function | Transform): DataStream;
/**
* Stops merging transform Functions at the current place in the command chain.
*/
tap(): this;
/**
* Reads a chunk from the stream and resolves the promise when read.
*/
whenRead(): Promise<any>;
/**
* Writes a chunk to the stream and returns a Promise resolved when more chunks can be written.
* @param chunk a chunk to write
* @param more more chunks to write
*/
whenWrote(chunk: any, ...more?: any): Promise<any>;
/**
* Resolves when stream ends - rejects on uncaught error
*/
whenEnd(): Promise<any>;
/**
* Returns a promise that resolves when the stream is drained
*/
whenDrained(): Promise<any>;
/**
* Returns a promise that resolves (!) when the stream is errors
*/
whenError(): Promise<any>;
/**
* Allows resetting stream options.
*
* It's much easier to use this in chain than constructing new stream:
*
* ```javascript
* stream.map(myMapper).filter(myFilter).setOptions({maxParallel: 2})
* ```
* @param options
*/
setOptions(options: DataStreamOptions): this;
/**
* Duplicate the stream
*
* Creates a duplicate stream instance and passes it to the Function.
* @param func The duplicate stream will be passed as first argument.
*/
tee(func: TeeCallback | Writable): this;
/**
* Performs an operation on every chunk, without changing the stream
*
* This is a shorthand for ```stream.on("data", func)``` but with flow control.
* Warning: this resumes the stream!
* @param func a Function called for each chunk.
*/
each(func: MapCallback): this;
/**
* Reads the stream while the function outcome is truthy.
*
* Stops reading and emits end as soon as it finds the first chunk that evaluates
* to false. If you're processing a file until a certain point or you just need to
* confirm existence of some data, you can use it to end the stream before reaching end.
*
* Keep in mind that whatever you piped to the stream will still need to be handled.
* @param func The condition check
*/
while(func: FilterCallback): this;
/**
* Reads the stream until the function outcome is truthy.
*
* Works opposite of while.
* @param func The condition check
*/
until(func: FilterCallback): this;
/**
* Provides a way to catch errors in chained streams.
*
* The handler will be called as asynchronous
* - if it resolves then the error will be muted.
* - if it rejects then the error will be passed to the next handler
*
* If no handlers will resolve the error, an `error` event will be emitted
* @param callback Error handler (async function)
*/
catch(callback: Function): this;
/**
* Executes all error handlers and if none resolves, then emits an error.
*
* The returned promise will always be resolved even if there are no successful handlers.
* @param err The thrown error
*/
raise(err: Error): Promise<any>;
/**
* Override of node.js Readable pipe.
*
* Except for calling overridden method it proxies errors to piped stream.
* @param to Writable stream to write to
* @param options
*/
pipe(to: Writable, options?: WritableOptions): Writable;
/**
* Creates a BufferStream.
*
* The passed serializer must return a buffer.
* @param serializer A method that converts chunks to buffers
*/
bufferify(serializer: MapCallback): BufferStream;
/**
* Creates a StringStream.
*
* The passed serializer must return a string. If no serializer is passed chunks
* toString method will be used.
* @param serializer A method that converts chunks to strings
*/
stringify(serializer?: MapCallback): StringStream;
/**
* Create a DataStream from an Array
* @param array list of chunks
* @param options the read stream options
*/
static fromArray(array: any[], options?: ScramjetOptions): DataStream;
/**
* Create a DataStream from an Iterator
*
* Doesn't end the stream until it reaches end of the iterator.
* @param iterator the iterator object
* @param options the read stream options
*/
static fromIterator(iterator: Iterator, options?: ScramjetOptions): DataStream;
/**
* Aggregates the stream into a single Array
*
* In fact it's just a shorthand for reducing the stream into an Array.
* @param initial Array to begin with (defaults to an empty array).
* @returns
*/
toArray(initial?: any[]): Promise<Array.<any>>;
/**
* Returns an async generator
*/
toGenerator(): Generator.<Promise.<any>>;
/**
* Pulls in any readable stream, resolves when the pulled stream ends.
*
* You can also pass anything that can be passed to `DataStream.from`.
*
* Does not preserve order, does not end this stream.
* @param pullable
* @param ...args any additional args
* @returns resolved when incoming stream ends, rejects on incoming error
*/
pull(pullable: any[] | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Function | String | Readable, ...args: any): Promise<Promise>;
/**
* Shifts the first n items from the stream and pushes out the remaining ones.
* @param count The number of items to shift.
* @param func Function that receives an array of shifted items
*/
shift(count: Number, func: ShiftCallback): this;
/**
* Allows previewing some of the streams data without removing them from the stream.
*
* Important: Peek does not resume the flow.
* @param count The number of items to view before
* @param func Function called before other streams
*/
peek(count: Number, func: ShiftCallback): this;
/**
* Slices out a part of the stream to the passed Function.
*
* Returns a stream consisting of an array of items with `0` to `start`
* omitted and `length` items after `start` included. Works similarly to
* Array.prototype.slice.
*
* Takes count from the moment it's called. Any previous items will not be
* taken into account.
* @param start omit this number of entries.
* @param length get this number of entries to the resulting stream
*/
slice(start?: Number, length?: Number): this;
/**
* Transforms stream objects by assigning the properties from the returned
* data along with data from original ones.
*
* The original objects are unaltered.
* @param func The function that returns new object properties or just the new properties
*/
assign(func: MapCallback | Object): this;
/**
* Called only before the stream ends without passing any items
* @param callback Function called when stream ends
*/
empty(callback: Function): this;
/**
* Pushes any data at call time (essentially at the beginning of the stream)
*
* This is a synchronous only function.
* @param ...item list of items to unshift (you can pass more items)
*/
unshift(...item: any): this;
/**
* Pushes any data at end of stream
* @param item list of items to push at end
*/
endWith(item: any): this;
/**
* Accumulates data into the object.
*
* Works very similarly to reduce, but result of previous operations have
* no influence over the accumulator in the next one.
*
* Method works in parallel.
* @param func The accumulation function
* @param into Accumulator object
*/
accumulate(func: AccumulateCallback, into: any): Promise<Promise>;
/**
* Consumes the stream by running each Function
* @deprecated use {@link DataStream#each} instead
* @param func the consument
* @param ...args additional args will be passed to generators
*/
consume(func: ConsumeCallback | AsyncGeneratorFunction | GeneratorFunction, ...args: any): Promise<any>;
/**
* Reduces the stream into the given object, returning it immediately.
*
* The main difference to reduce is that only the first object will be
* returned at once (however the method will be called with the previous
* entry).
* If the object is an instance of EventEmitter then it will propagate the
* error from the previous stream.
*
* This method is serial - meaning that any processing on an entry will
* occur only after the previous entry is fully processed. This does mean
* it's much slower than parallel functions.
* @param func The into object will be passed as the first argument, the data object from the stream as the second.
* @param into Any object passed initially to the transform function
*/
reduceNow(func: ReduceCallback, into: any | EventEmitter): any;
/**
* Remaps the stream into a new stream.
*
* This means that every item may emit as many other items as we like.
* @param func A Function that is called on every chunk
* @param ClassType Optional DataStream subclass to be constructed
*/
remap(func: RemapCallback, ClassType?: Function): this;
/**
* Takes any method that returns any iterable and flattens the result.
*
* The passed Function must return an iterable (otherwise an error will be emitted). The resulting stream will
* consist of all the items of the returned iterables, one iterable after another.
* @param func A Function that is called on every chunk
* @param ClassType Optional DataStream subclass to be constructed
*/
flatMap(func: FlatMapCallback, ClassType?: Function): this;
/**
* A shorthand for streams of arrays or iterables to flatten them.
*
* More efficient equivalent of: `.flatmap(i => i);`
* Works on streams of async iterables too.
*/
flatten(): DataStream;
/**
* Returns a new stream that will append the passed streams to the callee
* @param ...streams Streams to be injected into the current stream
*/
concat(...streams: stream.Readable): this;
/**
* Method will put the passed object between items. It can also be a function call or generator / iterator.
*
* If a generator or iterator is passed, when the iteration is done no items will be interweaved.
* Generator receives
* @param item An object that should be interweaved between stream items
*/
join(item: any | AsyncGeneratorFunction | GeneratorFunction | JoinCallback): this;
/**
* Keep a buffer of n-chunks for use with {@see DataStream..rewind}
* @param count Number of objects or -1 for all the stream
*/
keep(count?: number): this;
/**
* Rewinds the buffered chunks the specified length backwards. Requires a prior call to {@see DataStream..keep}
* @param count Number of objects or -1 for all the buffer
*/
rewind(count?: number): this;
/**
* Returns a stream that stacks up incoming items always feeding out the newest items first.
* It returns the older items when read
*
* When the stack length exceeds the given `count` the given `drop` function is awaited
* and used for flow control.
*
* By default the drop function ignores and quietly disposes of items not read before overflow.
* @param count
* @param drop
*/
stack(count?: number, drop?: Function): this;
/**
* Distributes processing into multiple sub-processes or threads if you like.
* @todo Currently order is not kept.
* @todo Example test breaks travis-ci build
* @param affinity A Function that affixes the item to specific output stream which must exist in the object for each chunk, must return a string. A number may be passed to identify how many round-robin threads to start up. Defaults to Round Robin to twice the number of CPU threads.
* @param clusterFunc stream transforms similar to {@see DataStream#use method}
* @param options Options
*/
distribute(affinity?: AffinityCallback | Number, clusterFunc: ClusterCallback, options?: DataStreamOptions): this;
/**
* Separates stream into a hash of streams. Does not create new streams!
* @param streams the object hash of streams. Keys must be the outputs of the affinity function
* @param affinity the Function that affixes the item to specific streams which must exist in the object for each chunk.
*/
separateInto(streams: Object<DataStream>, affinity: AffinityCallback): this;
/**
* Separates execution to multiple streams using the hashes returned by the passed Function.
*
* Calls the given Function for a hash, then makes sure all items with the same hash are processed within a single
* stream. Thanks to that streams can be distributed to multiple threads.
* @param affinity the affinity function
* @param createOptions options to use to create the separated streams
* @param ClassType options to use to create the separated streams
*/
separate(affinity: AffinityCallback, createOptions?: DataStreamOptions, ClassType?: Function): MultiStream;
/**
* Delegates work to a specified worker.
* @param delegateFunc A function to be run in the sub-thread.
* @param worker
* @param plugins
*/
delegate(delegateFunc: DelegateCallback, worker: WorkerStream, plugins?: any[]): this;
/**
* Limit the rate of the stream to a given number of chunks per second or given timeframe.
* @param cps Chunks per timeframe, the default timeframe is 1000 ms.
* @param options Options for the limiter controlling the timeframe and time source. Both must work on same units.
*/
rate(cps: Number, options?: RateOptions): this;
/**
* Aggregates chunks in arrays given number of number of items long.
*
* This can be used for micro-batch processing.
* @param count How many items to aggregate
*/
batch(count: Number): this;
/**
* Aggregates chunks to arrays not delaying output by more than the given number of ms.
* @param ms Maximum amount of milliseconds
* @param count Maximum number of items in batch (otherwise no limit)
*/
timeBatch(ms: Number, count?: Number): this;
/**
* Performs the Nagle's algorithm on the data. In essence it waits until we receive some more data and releases them
* in bulk.
* @todo needs more work, for now it's simply waiting some time, not checking the queues.
* @param size maximum number of items to wait for
* @param ms milliseconds to wait for more data
*/
nagle(size?: number, ms?: number): this;
/**
* Returns a WindowStream of the specified length
* @param length
* @returns a stream of array's
*/
window(length: Number): WindowStream;
/**
* Transforms the stream to a streamed JSON array.
* @param enclosure Any iterable object of two items (beginning and end)
*/
toJSONArray(enclosure?: Iterable): StringStream;
/**
* Transforms the stream to a streamed JSON object.
* @param entryCallback async function returning an entry (array of [key, value])
* @param enclosure Any iterable object of two items (beginning and end)
*/
toJSONObject(entryCallback?: MapCallback, enclosure?: Iterable): StringStream;
/**
* Returns a StringStream containing JSON per item with optional end line
* @param endline whether to add endlines (boolean or string as delimiter)
*/
JSONStringify(endline?: Boolean | String): StringStream;
/**
* Stringifies CSV to DataString using 'papaparse' module.
* @param options options for the papaparse.unparse module.
*/
CSVStringify(options?: Object): StringStream;
/**
* Executes a given sub-process with arguments and pipes the current stream into it while returning the output as another DataStream.
*
* Pipes the current stream into the sub-processes stdin.
* The data is serialized and deserialized as JSON lines by default. You
* can provide your own alternative methods in the ExecOptions object.
*
* Note: if you're piping both stderr and stdout (options.stream=3) keep in mind that chunks may get mixed up!
* @param command command to execute
* @param options options to be passed to `spawn` and defining serialization.
* @param args additional arguments (will overwrite to SpawnOptions args even if not given)
*/
exec(command: String, options?: ExecDataOptions, args: String): void;
/**
* Injects a ```debugger``` statement when called.
* @param func if passed, the function will be called on self to add an option to inspect the stream in place, while not breaking the transform chain
*/
debug(func: Function): DataStream;
/**
* Creates a BufferStream.
*
* The passed serializer must return a buffer.
* @param serializer A method that converts chunks to buffers
*/
toBufferStream(serializer: MapCallback): BufferStream;
/**
* Creates a BufferStream.
*
* The passed serializer must return a buffer.
* @param serializer A method that converts chunks to buffers
*/
toBufferStream(serializer: MapCallback): BufferStream;
/**
* Creates a StringStream.
*
* The passed serializer must return a string. If no serializer is passed chunks
* toString method will be used.
* @param serializer A method that converts chunks to strings
*/
toStringStream(serializer?: MapCallback): StringStream;
/**
* Creates a StringStream.
*
* The passed serializer must return a string. If no serializer is passed chunks
* toString method will be used.
* @param serializer A method that converts chunks to strings
*/
toStringStream(serializer?: MapCallback): StringStream;
}
/**
*
* @param chunk the chunk to be mapped
* @returns the mapped object
*/
type MapCallback = (chunk: any)=>Promise | any;
/**
*
* @param chunk the chunk to be filtered or not
* @returns information if the object should remain in the filtered stream.
*/
type FilterCallback = (chunk: any)=>Promise<Boolean> | Boolean;
/**
*
* @param accumulator the accumulator - the object initially passed or returned
* by the previous reduce operation
* @param chunk the stream chunk.
*/
type ReduceCallback = (accumulator: any, chunk: Object)=>Promise | any;
/**
*
* @param chunk source stream chunk
*/
type DoCallback = (chunk: Object)=>void;
/**
*
* @param into stream passed to the into method
* @param chunk source stream chunk
*/
type IntoCallback = (into: any, chunk: Object)=>any;
/**
*
* @param stream
* @param ...parameters
* @returns
*/
type UseCallback = (stream: DataStream, ...parameters: any)=>DataStream;
/**
*
* @param teed The teed stream
*/
type TeeCallback = (teed: DataStream)=>void;
/**
* Transform async callback. The passed transform should return a new chunk, unless
* the output should be filtered - if so, the transform should return `undefined`.
*
* Additionally the function can reject with `DataStream.filter` - the result will be
* filtered and no other transforms will be run on the chunk.
* @param chunk the stream chunk
* @param encoding encoding of the chunk
* @returns the result, undefined will be treated as filtered out.
*/
type ScramjetTransformCallback = (chunk: Buffer | String | any, encoding: String)=>any | undefined;
/**
* Write async callback. Await your async write and resolve.
* @param chunk the stream chunk
* @param encoding encoding of the chunk
*/
type ScramjetWriteCallback = (chunk: Buffer | String | any, encoding: String)=>void;
/**
* Read async callback. Simply await your async operations and return the result as array.
* @param count the number of chunks that should be read ("this is more like a set of guideline than actual rules").
* @returns the read chunk.
*/
type ScramjetReadCallback = (count: Number)=>any[];
/**
* Standard options for scramjet streams.
*
* Defines async transforms or read/write methods for a stream.
*/
interface DataStreamOptions {
/**
* an async function returning the next read item
*/
promiseRead?: ScramjetReadCallback;
/**
* an async function writing the next written item
*/
promiseWrite?: ScramjetWriteCallback;
/**
* an async function returning a transformed chunk
*/
promiseTransform?: ScramjetTransformCallback;
/**
* an async function run before transform stream ends to push last chunks from the buffer
*/
promiseFlush?: ScramjetReadCallback;
/**
* an async function run before the transform
*/
beforeTransform?: ScramjetTransformCallback;
/**
* an async function run after the transform
*/
afterTransform?: ScramjetTransformCallback;
/**
* the number of transforms done in parallel
*/
maxParallel?: Number;
/**
* a referring stream to point to (if possible the transforms will be pushed to it
*/
referrer?: DataStream;
/**
* should the object mode be used
* instead of creating a new stream)
*/
objectMode?: boolean;
}
/**
* A stream of string objects for further transformation on top of DataStream.
*
* Example:
*
* ```javascript
* StringStream.fromString()
* ```
*/
class StringStream extends DataStream {
/**
* Constructs the stream with the given encoding
* @param encoding the encoding to use
* @param options the encoding to use
*/
constructor(encoding?: String, options?: Object);
/**
* Shifts given length of chars from the original stream
*
* Works the same way as {@see DataStream.shift}, but in this case extracts
* the given number of characters.
* @param bytes The number of characters to shift.
* @param func Function that receives a string of shifted chars.
*/
shift(bytes: Number, func: ShiftStringCallback): this;
/**
* A handy split by line regex to quickly get a line-by-line stream
*/
static SPLIT_LINE: any;
/**
* Splits the string stream by the specified RegExp or string
* @param splitter What to split by
*/
split(splitter: RegExp | String): this;
/**
* Finds matches in the string stream and streams the match results
* @param matcher A function that will be called for every
* stream chunk.
*/
match(matcher: RegExp): this;