Skip to content

Latest commit

 

History

History
executable file
·
1245 lines (893 loc) · 54.5 KB

data-stream.md

File metadata and controls

executable file
·
1245 lines (893 loc) · 54.5 KB

Scramjet Logo

:DataStream : import("stream").PassThrough

DataStream is the primary stream type for Scramjet. When you parse your stream, just pipe it you can then perform calculations on the data objects streamed through your flow.

Use as:

const { DataStream } = require('scramjet');

await (DataStream.from(aStream) // create a DataStream
    .map(findInFiles)           // read some data asynchronously
    .map(sendToAPI)             // send the data somewhere
    .run());                    // wait until end

Kind: static class
Extends: import("stream").PassThrough
Test: test/methods/data-stream-constructor.js

new DataStream([opts])

Create the DataStream.

Param Type Default Description
[opts] DataStreamOptions {} Stream options passed to superclass

dataStream.map(func, [ClassType]) ↺

Transforms stream objects into new ones, just like Array.prototype.map does.

Map takes an argument which is the Function function operating on every element of the stream. If the function returns a Promise or is an AsyncFunction then the stream will await for the outcome of the operation before pushing the data forwards.

A simple example that turns stream of urls into stream of responses

stream.map(async url => fetch(url));

Multiple subsequent map operations (as well as filter, do, each and other simple ops) will be merged together into a single operation to improve performance. Such behaviour can be suppressed by chaining .tap() after .map().

Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-map.js

Param Type Default Description
func MapCallback The function that creates the new object
[ClassType] function this.constructor The class to be mapped to.

dataStream.filter(func) ↺

Filters object based on the function outcome, just like Array.prototype.filter.

Filter takes a Function argument which should be a Function or an AsyncFunction that will be called on each stream item. If the outcome of the operation is falsy (0, '', false, null or undefined) the item will be filtered from subsequent operations and will not be pushed to the output of the stream. Otherwise the item will not be affected.

A simple example that filters out non-2xx responses from a stream

stream.filter(({statusCode}) => !(statusCode >= 200 && statusCode < 300));

Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-filter.js

Param Type Description
func FilterCallback The function that filters the object

dataStream.reduce(func, into) ⇄

Reduces the stream into a given accumulator

Works similarly to Array.prototype.reduce, so whatever you return in the former operation will be the first operand to the latter. The result is a promise that's resolved with the return value of the last transform executed.

A simple example that sums values from a stream

stream.reduce((accumulator, {value}) => accumulator + value);

This method is serial - meaning that any processing on an entry will occur only after the previous entry is fully processed. This does mean it's much slower than parallel functions.

Kind: instance method of DataStream
Test: test/methods/data-stream-reduce.js

Param Type Description
func ReduceCallback The into object will be passed as the first argument, the data object from the stream as the second.
into object Any object passed initially to the transform function

dataStream.do(func) ↺

Perform an asynchronous operation without changing or resuming the stream.

In essence the stream will use the call to keep the backpressure, but the resolving value has no impact on the streamed data (except for possible mutation of the chunk itself)

Kind: instance method of DataStream
Chainable

Param Type Description
func DoCallback the async function

dataStream.all(functions) ↺

Processes a number of functions in parallel, returns a stream of arrays of results.

This method is to allow running multiple asynchronous operations and receive all the results at one, just like Promise.all behaves.

Keep in mind that if one of your methods rejects, this behaves just like Promise.all you won't be able to receive partial results.

Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-all.js

Param Type Description
functions Array.<function()> list of async functions to run

dataStream.race(functions) ↺

Processes a number of functions in parallel, returns the first resolved.

This method is to allow running multiple asynchronous operations awaiting just the result of the quickest to execute, just like Promise.race behaves.

Keep in mind that if one of your methods it will only raise an error if that was the first method to reject.

Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-race.js

Param Type Description
functions Array.<function()> list of async functions to run

dataStream.unorder(func)

Allows processing items without keeping order

This method useful if you are not concerned about the order in which the chunks are being pushed out of the operation. The maxParallel option is still used for keeping a number of simultaneous number of parallel operations that are currently happening.

Kind: instance method of DataStream

Param Type Description
func MapCallback the async function that will be unordered

dataStream.into(func, into) ↺

Allows own implementation of stream chaining.

The async Function is called on every chunk and should implement writes in it's own way. The resolution will be awaited for flow control. The passed into argument is passed as the first argument to every call.

It returns the DataStream passed as the second argument.

Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-into.js

Param Type Description
func IntoCallback the method that processes incoming chunks
into DataStream the DataStream derived class

dataStream.use(func) ↺

Calls the passed method in place with the stream as first argument, returns result.

The main intention of this method is to run scramjet modules - transforms that allow complex transforms of streams. These modules can also be run with scramjet-cli directly from the command line.

Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-use.js

Param Type Description
func AsyncGeneratorFunction | GeneratorFunction | UseCallback | string | Readable if passed, the function will be called on self to add an option to inspect the stream in place, while not breaking the transform chain. Alternatively this can be a relative path to a scramjet-module. Lastly it can be a Transform stream.
...parameters Array.<any> any additional parameters top be passed to the module

dataStream.run() ⇄

Consumes all stream items doing nothing. Resolves when the stream is ended.

This is very convienient if you're looking to use up the stream in operations that work on each entry like map. This uncorks the stream and allows all preceding operations to be run at any speed.

All the data of the current stream will be discarded.

The function returns a promise that is resolved when the stream ends.

Kind: instance method of DataStream

dataStream.tap() ↺

Stops merging transform Functions at the current place in the command chain.

Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-tap.js

dataStream.whenRead() ⇄

Reads a chunk from the stream and resolves the promise when read.

Kind: instance method of DataStream

dataStream.whenWrote(chunk) ⇄

Writes a chunk to the stream and returns a Promise resolved when more chunks can be written.

Kind: instance method of DataStream

Param Type Description
chunk * a chunk to write
...more Array.<any> more chunks to write

dataStream.whenEnd() ⇄

Resolves when stream ends - rejects on uncaught error

Kind: instance method of DataStream

dataStream.whenDrained() ⇄

Returns a promise that resolves when the stream is drained

Kind: instance method of DataStream

dataStream.whenError() ⇄

Returns a promise that resolves (!) when the stream is errors

Kind: instance method of DataStream

dataStream.setOptions(options) ↺

Allows resetting stream options.

It's much easier to use this in chain than constructing new stream:

    stream.map(myMapper).filter(myFilter).setOptions({maxParallel: 2})

Kind: instance method of DataStream
Chainable
Meta.conditions: keep-order,chain

Param Type
options DataStreamOptions

dataStream.copy(func) ↺

Returns a copy of the stream

Creates a new stream and pushes all the data from the current one to the new one. This can be called serveral times.

Kind: instance method of DataStream
Chainable

Param Type Description
func TeeCallback | Writable The duplicate stream will be passed as first argument.

dataStream.tee(func) ↺

Duplicate the stream

Creates a duplicate stream instance and passes it to the Function.

Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-tee.js

Param Type Description
func TeeCallback | Writable The duplicate stream will be passed as first argument.

dataStream.each(func) ↺

Performs an operation on every chunk, without changing the stream

This is a shorthand for stream.on("data", func) but with flow control. Warning: this resumes the stream!

Kind: instance method of DataStream
Chainable

Param Type Description
func MapCallback a Function called for each chunk.

dataStream.while(func) ↺

Reads the stream while the function outcome is truthy.

Stops reading and emits end as soon as it finds the first chunk that evaluates to false. If you're processing a file until a certain point or you just need to confirm existence of some data, you can use it to end the stream before reaching end.

Keep in mind that whatever you piped to the stream will still need to be handled.

Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-while.js

Param Type Description
func FilterCallback The condition check

dataStream.until(func) ↺

Reads the stream until the function outcome is truthy.

Works opposite of while.

Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-until.js

Param Type Description
func FilterCallback The condition check

dataStream.catch(callback) ↺

Provides a way to catch errors in chained streams.

The handler will be called as asynchronous

  • if it resolves then the error will be muted.
  • if it rejects then the error will be passed to the next handler

If no handlers will resolve the error, an error event will be emitted

Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-catch.js

Param Type Description
callback function Error handler (async function)

dataStream.raise(err) ⇄

Executes all error handlers and if none resolves, then emits an error.

The returned promise will always be resolved even if there are no successful handlers.

Kind: instance method of DataStream
Test: test/methods/data-stream-raise.js

Param Type Description
err Error The thrown error

dataStream.bufferify(serializer) : BufferStream ↺

Creates a BufferStream.

The passed serializer must return a buffer.

Kind: instance method of DataStream
Chainable
Returns: BufferStream - the resulting stream
Meta.noreadme:
Test: test/methods/data-stream-tobufferstream.js

Param Type Description
serializer MapCallback A method that converts chunks to buffers

dataStream.stringify([serializer]) : StringStream ↺

Creates a StringStream.

The passed serializer must return a string. If no serializer is passed chunks toString method will be used.

Kind: instance method of DataStream
Chainable
Returns: StringStream - the resulting stream
Test: test/methods/data-stream-tostringstream.js

Param Type Description
[serializer] MapCallback | never A method that converts chunks to strings

dataStream.toArray([initial]) : Array. ⇄

Aggregates the stream into a single Array

In fact it's just a shorthand for reducing the stream into an Array.

Kind: instance method of DataStream

Param Type Default Description
[initial] Array [] Array to begin with (defaults to an empty array).

dataStream.toGenerator() : Generator.<Promise.>

Returns an async generator

Kind: instance method of DataStream
Returns: Generator.<Promise.<any>> - Returns an iterator that returns a promise for each item.

dataStream.pull(pullable) : Promise. ⇄

Pulls in any readable stream, resolves when the pulled stream ends.

You can also pass anything that can be passed to DataStream.from.

Does not preserve order, does not end this stream.

Kind: instance method of DataStream
Returns: Promise.<any> - resolved when incoming stream ends, rejects on incoming error
Test: test/methods/data-stream-pull.js

Param Type Description
pullable Array | Iterable.<any> | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | function | string | Readable
...args Array.<any> any additional args

dataStream.shift(count, func) ↺

Shifts the first n items from the stream and pushes out the remaining ones.

Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-shift.js

Param Type Description
count number The number of items to shift.
func ShiftCallback Function that receives an array of shifted items

dataStream.peek(count, func) ↺

Allows previewing some of the streams data without removing them from the stream.

Important: Peek does not resume the flow.

Kind: instance method of DataStream
Chainable

Param Type Description
count number The number of items to view before
func ShiftCallback Function called before other streams

dataStream.slice([start], [length]) ↺

Slices out a part of the stream to the passed Function.

Returns a stream consisting of an array of items with 0 to start omitted and length items after start included. Works similarly to Array.prototype.slice.

Takes count from the moment it's called. Any previous items will not be taken into account.

Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-slice.js

Param Type Default Description
[start] number 0 omit this number of entries.
[length] number Infinity get this number of entries to the resulting stream

dataStream.assign(func) ↺

Transforms stream objects by assigning the properties from the returned data along with data from original ones.

The original objects are unaltered.

Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-assign.js

Param Type Description
func MapCallback | object The function that returns new object properties or just the new properties

dataStream.empty(callback) ↺

Called only before the stream ends without passing any items

Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-empty.js

Param Type Description
callback function Function called when stream ends

dataStream.unshift() ↺

Pushes any data at call time (essentially at the beginning of the stream)

This is a synchronous only function.

Kind: instance method of DataStream
Chainable

Param Type Description
...item Array.<any> list of items to unshift (you can pass more items)

dataStream.endWith(item) ↺

Pushes any data at end of stream

Kind: instance method of DataStream
Chainable
Meta.noreadme:
Test: test/methods/data-stream-endwith.js

Param Type Description
item * list of items to push at end

dataStream.accumulate(func, into) : Promise. ⇄

Accumulates data into the object.

Works very similarly to reduce, but result of previous operations have no influence over the accumulator in the next one.

Method works in parallel.

Kind: instance method of DataStream
Returns: Promise.<any> - resolved with the "into" object on stream end.
Meta.noreadme:
Test: test/methods/data-stream-accumulate.js

Param Type Description
func AccumulateCallback The accumulation function
into * Accumulator object

dataStream.consume(func) ⇄

Deprecated

Consumes the stream by running each Function

Kind: instance method of DataStream
Meta.noreadme:

Param Type Description
func ConsumeCallback | AsyncGeneratorFunction | GeneratorFunction the consument
...args Array.<any> additional args will be passed to generators

dataStream.reduceNow(func, into) : * ↺

Reduces the stream into the given object, returning it immediately.

The main difference to reduce is that only the first object will be returned at once (however the method will be called with the previous entry). If the object is an instance of EventEmitter then it will propagate the error from the previous stream.

This method is serial - meaning that any processing on an entry will occur only after the previous entry is fully processed. This does mean it's much slower than parallel functions.

Kind: instance method of DataStream
Chainable
Returns: * - whatever was passed as into
Meta.noreadme:
Test: test/methods/data-stream-reduceNow.js

Param Type Description
func ReduceCallback The into object will be passed as the first argument, the data object from the stream as the second.
into * | EventEmitter Any object passed initially to the transform function

dataStream.remap(func, [ClassType]) ↺

Remaps the stream into a new stream.

This means that every item may emit as many other items as we like.

Kind: instance method of DataStream
Chainable
Meta.noreadme:
Test: test/methods/data-stream-remap.js

Param Type Default Description
func RemapCallback A Function that is called on every chunk
[ClassType] function this.constructor Optional DataStream subclass to be constructed

dataStream.flatMap(func, [ClassType]) ↺

Takes any method that returns any iterable and flattens the result.

The passed Function must return an iterable (otherwise an error will be emitted). The resulting stream will consist of all the items of the returned iterables, one iterable after another.

Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-flatmap.js

Param Type Default Description
func FlatMapCallback A Function that is called on every chunk
[ClassType] function this.constructor Optional DataStream subclass to be constructed
...args Array.<any> additional args will be passed to generators

dataStream.flatten() : DataStream ↺

A shorthand for streams of arrays or iterables to flatten them.

More efficient equivalent of: .flatmap(i => i); Works on streams of async iterables too.

Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-flatten.js

dataStream.concat() ↺

Returns a new stream that will append the passed streams to the callee

Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-concat.js

Param Type Description
...streams Array.<Readable> Streams to be injected into the current stream

dataStream.join(item) ↺

Method will put the passed object between items. It can also be a function call or generator / iterator.

If a generator or iterator is passed, when the iteration is done no items will be interweaved. Generator receives

Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-join.js

Param Type Description
item * | AsyncGeneratorFunction | GeneratorFunction | JoinCallback An object that should be interweaved between stream items
...args Array.<any> additional args will be passed to generators

dataStream.keep([count]) ↺

Keep a buffer of n-chunks for use with {@see DataStream..rewind}

Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-keep.js

Param Type Default Description
[count] number Infinity Number of objects or -1 for all the stream

dataStream.rewind([count]) ↺

Rewinds the buffered chunks the specified length backwards. Requires a prior call to {@see DataStream..keep}

Kind: instance method of DataStream
Chainable

Param Type Default Description
[count] number Infinity Number of objects or -1 for all the buffer

dataStream.stack([count], [drop]) ↺

Returns a stream that stacks up incoming items always feeding out the newest items first. It returns the older items when read

When the stack length exceeds the given count the given drop function is awaited and used for flow control.

By default the drop function ignores and quietly disposes of items not read before overflow.

Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-stack.js

Param Type Default
[count] number 1000
[drop] function

dataStream.distribute([affinity], [clusterFunc], [options]) ↺

Distributes processing into multiple sub-processes or threads if you like.

Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-distribute.js
Todo

  • Currently order is not kept.
  • Example test breaks travis-ci build
Param Type Description
[affinity] AffinityCallback | function | number A Function that affixes the item to specific output stream which must exist in the object for each chunk, must return a string. A number may be passed to identify how many round-robin threads to start up. Defaults to Round Robin to twice the number of CPU threads.
[clusterFunc] function | DataStreamOptions stream transforms similar to {@see DataStream#use method}
[options] DataStreamOptions Options

dataStream.separateInto(streams, affinity) ↺

Separates stream into a hash of streams. Does not create new streams!

Kind: instance method of DataStream
Chainable
Meta.noreadme:

Param Type Description
streams object the object hash of streams. Keys must be the outputs of the affinity function
affinity AffinityCallback the Function that affixes the item to specific streams which must exist in the object for each chunk.

dataStream.separate(affinity, [createOptions], [ClassType]) : MultiStream ↺

Separates execution to multiple streams using the hashes returned by the passed Function.

Calls the given Function for a hash, then makes sure all items with the same hash are processed within a single stream. Thanks to that streams can be distributed to multiple threads.

Kind: instance method of DataStream
Chainable
Returns: MultiStream - separated stream
Meta.noreadme:
Test: test/methods/data-stream-separate.js

Param Type Default Description
affinity AffinityCallback the affinity function
[createOptions] DataStreamOptions options to use to create the separated streams
[ClassType] function this.constructor options to use to create the separated streams

dataStream.delegate(delegateFunc, worker, [plugins]) ↺

Delegates work to a specified worker.

Kind: instance method of DataStream
Chainable
Meta.noreadme:

Param Type Default Description
delegateFunc DelegateCallback A function to be run in the sub-thread.
worker StreamWorker
[plugins] Array []

dataStream.rate(cps, [options]) ↺

Limit the rate of the stream to a given number of chunks per second or given timeframe.

Kind: instance method of DataStream
Chainable
Meta.noreadme:

Param Type Default Description
cps number Chunks per timeframe, the default timeframe is 1000 ms.
[options] RateOptions {} Options for the limiter controlling the timeframe and time source. Both must work on same units.

dataStream.batch(count) ↺

Aggregates chunks in arrays given number of number of items long.

This can be used for micro-batch processing.

Kind: instance method of DataStream
Chainable
Test: test/methods/data-stream-batch.js

Param Type Description
count number How many items to aggregate

dataStream.timeBatch(ms, [count]) ↺

Aggregates chunks to arrays not delaying output by more than the given number of ms.

Kind: instance method of DataStream
Chainable
Meta.noreadme:
Test: test/methods/data-stream-timebatch.js

Param Type Description
ms number Maximum amount of milliseconds
[count] number Maximum number of items in batch (otherwise no limit)

dataStream.nagle([size], [ms]) ↺

Performs the Nagle's algorithm on the data. In essence it waits until we receive some more data and releases them in bulk.

Kind: instance method of DataStream
Chainable
Meta.noreadme:
Todo

  • needs more work, for now it's simply waiting some time, not checking the queues.
Param Type Default Description
[size] number 32 maximum number of items to wait for
[ms] number 10 milliseconds to wait for more data

dataStream.window(length) : WindowStream ↺

Returns a WindowStream of the specified length

Kind: instance method of DataStream
Chainable
Returns: WindowStream - a stream of array's
Meta.noreadme:

Param Type
length number

dataStream.toJSONArray([enclosure]) : StringStream ↺

Transforms the stream to a streamed JSON array.

Kind: instance method of DataStream
Chainable
Meta.noreadme:
Test: test/methods/data-stream-tojsonarray.js

Param Type Default Description
[enclosure] Iterable.<any> '[]' Any iterable object of two items (beginning and end)

dataStream.toJSONObject([entryCallback], [enclosure]) : StringStream ↺

Transforms the stream to a streamed JSON object.

Kind: instance method of DataStream
Chainable
Meta.noreadme:
Meta.noreadme:
Test: test/methods/data-stream-tojsonobject.js

Param Type Default Description
[entryCallback] MapCallback async function returning an entry (array of [key, value])
[enclosure] Iterable.<any> '{}' Any iterable object of two items (beginning and end)

dataStream.JSONStringify([endline]) : StringStream ↺

Returns a StringStream containing JSON per item with optional end line

Kind: instance method of DataStream
Chainable
Returns: StringStream - output stream
Meta.noreadme:
Test: test/methods/data-stream-jsonstringify.js

Param Type Default Description
[endline] Boolean | string os.EOL whether to add endlines (boolean or string as delimiter)

dataStream.CSVStringify([options]) : StringStream ↺

Stringifies CSV to DataString using 'papaparse' module.

Kind: instance method of DataStream
Chainable
Returns: StringStream - stream of parsed items
Test: test/methods/data-stream-csv.js

Param Type Default Description
[options] object {} options for the papaparse.unparse module.

dataStream.exec(command, [options])

Executes a given sub-process with arguments and pipes the current stream into it while returning the output as another DataStream.

Pipes the current stream into the sub-processes stdin. The data is serialized and deserialized as JSON lines by default. You can provide your own alternative methods in the ExecOptions object.

Note: if you're piping both stderr and stdout (options.stream=3) keep in mind that chunks may get mixed up!

Kind: instance method of DataStream
Test: test/methods/data-stream-exec.js

Param Type Default Description
command string command to execute
[options] ExecDataOptions | any {} options to be passed to spawn and defining serialization.
...args Array.<string> additional args will be passed to function

dataStream.debug(func) : DataStream ↺

Injects a debugger statement when called.

Kind: instance method of DataStream
Chainable
Returns: DataStream - self
Meta.noreadme:
Test: test/methods/data-stream-debug.js

Param Type Description
func function if passed, the function will be called on self to add an option to inspect the stream in place, while not breaking the transform chain

dataStream.toBufferStream(serializer) : BufferStream ↺

Creates a BufferStream.

The passed serializer must return a buffer.

Kind: instance method of DataStream
Chainable
Returns: BufferStream - the resulting stream
Meta.noreadme:
Test: test/methods/data-stream-tobufferstream.js

Param Type Description
serializer MapCallback A method that converts chunks to buffers

dataStream.toStringStream([serializer]) : StringStream ↺

Creates a StringStream.

The passed serializer must return a string. If no serializer is passed chunks toString method will be used.

Kind: instance method of DataStream
Chainable
Returns: StringStream - the resulting stream
Test: test/methods/data-stream-tostringstream.js

Param Type Description
[serializer] MapCallback | never A method that converts chunks to strings

dataStream.toBufferStream(serializer) : BufferStream ↺

Creates a BufferStream.

The passed serializer must return a buffer.

Kind: instance method of DataStream
Chainable
Returns: BufferStream - the resulting stream
Meta.noreadme:
Test: test/methods/data-stream-tobufferstream.js

Param Type Description
serializer MapCallback A method that converts chunks to buffers

dataStream.toStringStream([serializer]) : StringStream ↺

Creates a StringStream.

The passed serializer must return a string. If no serializer is passed chunks toString method will be used.

Kind: instance method of DataStream
Chainable
Returns: StringStream - the resulting stream
Test: test/methods/data-stream-tostringstream.js

Param Type Description
[serializer] MapCallback | never A method that converts chunks to strings

DataStream:from(input, [options]) : DataStream

Returns a DataStream from pretty much anything sensibly possible.

Depending on type:

  • self will return self immediately
  • Readable stream will get piped to the current stream with errors forwarded
  • Array will get iterated and all items will be pushed to the returned stream. The stream will also be ended in such case.
  • GeneratorFunction will get executed to return the iterator which will be used as source for items
  • AsyncGeneratorFunction will also work as above (including generators) in node v10.
  • Iterables iterator will be used as a source for streams

You can also pass a Function or AsyncFunction that will be executed and it's outcome will be passed again to from and piped to the initially returned stream. Any additional arguments will be passed as arguments to the function.

If a String is passed, scramjet will attempt to resolve it as a module and use the outcome as an argument to from as in the Function case described above. For more information see modules.md

A simple example from a generator:

DataStream
  .from(function* () {
     while(x < 100) yield {x: x++};
  })
  .each(console.log)
  // {x: 0}
  // {x: 1}
  // ...
  // {x: 99}

Kind: static method of DataStream

Param Type Default Description
input Array | Iterable.<any> | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | Promise.<any> | function | string | Readable argument to be turned into new stream
[options] DataStreamOptions | Writable {} options for creation of a new stream or the target stream
...args Array.<any> additional arguments for the stream - will be passed to the function or generator

DataStream:pipeline(readable) : DataStream

Creates a pipeline of streams and returns a scramjet stream.

This is similar to node.js stream pipeline method, but also takes scramjet modules as possibilities in an array of transforms. It may be used to run a series of non-scramjet transform streams.

The first argument is anything streamable and will be sanitized by DataStream..from.

Each following argument will be understood as a transform and can be any of:

  • AsyncFunction or Function - will be executed by DataStream..use
  • A transform stream that will be piped to the preceding stream

Kind: static method of DataStream
Returns: DataStream - a new DataStream instance of the resulting pipeline

Param Type Description
readable Array | Iterable.<any> | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | function | string | Readable the initial readable argument that is streamable by scramjet.from
...transforms Array.<(AsyncFunction|function()|Transform)> Transform functions (as in DataStream..use) or Transform streams (any number of these as consecutive arguments)

DataStream:fromArray(array, [options]) : DataStream

Create a DataStream from an Array

Kind: static method of DataStream
Test: test/methods/data-stream-fromarray.js

Param Type Default Description
array Array.<*> list of chunks
[options] DataStreamOptions {} the read stream options

DataStream:fromIterator(iterator, [options]) : DataStream

Create a DataStream from an Iterator

Doesn't end the stream until it reaches end of the iterator.

Kind: static method of DataStream
Test: test/methods/data-stream-fromiterator.js

Param Type Default Description
iterator Iterator.<any> the iterator object
[options] DataStreamOptions {} the read stream options