Skip to content

An expressive streaming utility for node, inspired by Lo-Dash, Async, and event-stream. Ice-stream goal is to allow for complex processing of streaming data.

Notifications You must be signed in to change notification settings

rossj/ice-stream

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

35 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Ice-Stream

An expressive streaming utility for node, inspired by Lo-Dash, Async, and event-stream.

Ice-Stream aims to make stream processing as easy as the ubiquitous mentioned above make processing arrays and objects.

About Streams

Stream processing is basically pumping data through a number of operations, piece by piece. Using streams is especially useful when:

  • There is more data than available memory
  • The data source is slow, e.g. over a network, user input
  • Some of the data can be processed without all of the data

In some cases it is useful to think about and operate on a stream as a continual flow of data, and sometimes it is better to think about it as a segmented, chunk by chunk flow. Ice-Stream's methods do both, depending on the operation.

Examples

First, to include Ice-Stream

var is = require('ice-stream');

Using the static methods results in a typical Node Stream

// Stream from a file, through a lowercaser, to an output file
is.toLower( fs.createReadStream('file.txt') ).pipe( fs.createWriteStream('file-low.txt') );

Passing a Stream to the constructor generates a wrapped stream, which can be chained

// Parse out unique keywords from the file and output them to stdout
is( fs.createReadStream('file.txt') ).split(' ').toLower().unique().without('ruby', 'python').join('\n').out();

Constructor(mixed)

The Ice-Stream variable can be used as a namespace to access the stream methods, as a function to wrap basic Node streams, and as a constructor to create streams for data.

Examples

// Wrap a basic Stream
var wstream1 = is( fs.createReadStream('file.txt') );

// Create a text stream
var wstream2 = is('stream this data');

// The above result is wrapped so we can immediately chain it
wstream2.split(' ').join('\n').out();

// Create a stream from an array
is(['stream', 'this', 'data']).join('\n').out();

// Create a non-text stream from an array
is([1, 4, 6, 2, 91]).map(function(num) {
  return num*2;
}).join('\n').out();

Methods


### exec(cmd) Spawn an external process process. Input is passed to `stdin` of the new process, and output comes from `stdout`. Any data that is received from `stderr` is emitted as an `error`.

Arguments

  • cmd - The command to run

### split([separator]) Chunks the data based on the delimiter. Concatenates and buffers the input until the delimiter is found, at which point the buffered data is emitted. The delimiters are removed and not emitted. When the input stream closes, the final data chunk is emitted. Note that this method converts input to strings.

Arguments

  • separator - String specifying where to split the input stream. Defaults to \n.

### join([separator]) Injects data in between chunks of the input stream. Note that a `split()` followed by a `join()` will produce the same overall stream, but the chunking will be different.

Arguments

  • separator - The extra data to emit between chunks

### toLower() Converts the input to lower case.
### toUpper() Converts the input to upper case.
### map(iterator) Maps each stream chunk using a synchronous callback function.

Arguments

  • iterator(chunk) - A synchronous function which returns the new chunk.

### mapAsync(iterator) Maps the stream chunks using an async callback. Note that `iterator` will be called in parallel as chunks are received, and the output order is determined by when the callbacks finish, not the input order.

Arguments

  • iterator(chunk, callback) - The user-defined function which performs the mapping. The first callback parameter is an optional error, with the second parameter being the mapped value.

### mapAsyncSeries(iterator) Same as above, except the chunks are guaranteed to remain in order when emitted. Note that the iterator will still be called in parallel as chunks are received, but the results are buffered to ensure proper emission order.

Arguments

  • iterator(chunk, callback) - Same as above.

### filter(iterator) Sends each chunk to a user-defined iterator which determines whether or not to send the chunk on.

Arguments

  • iterator(chunk) - A synchronous function which returns true to keep the chunk

### filterAsync(iterator) Send each chunk to a user-defined asynchronous function. Note that `iterator` will be called in parallel as chunks are received, and the output order is determined by when the callbacks finish, not the input order.

Arguments

  • iterator(chunk, callback) - The user-defined function which performs the filtering. The first callback parameter is a boolean. There is no err callback parameter.

### filterAsyncSeries(callback) Same as above, but the chunks are guaranteed to remain in order

Arguments

  • iterator(chunk, callback) - Same as above.

### dropUntil(token, [emitMatch]) Discards all incoming stream data until the `token` string is found, at which point emitting of the incoming data continues. The matching will span chunk boundaries.

Arguments

  • token - A string to search for in the stream (unaffected by chunk boundaries).
  • emitMatch - A boolean indicating whether to emit the match itself when found. Defaults to false.

### dropUntilChunk(mixed, [emitMatch]) Discards all incoming stream chunks until a match is found, at which point emitting of the incoming chunks continues. The first parameter can either be a string representing a complete chunk, a RegExp expression, or a user-defined function which indicates whether the condition to emit has been met.

Arguments

  • mixed - A string, RegExp, or function. If a string is given, will behave similar to dropUntil except only an exact chunk match will count. If a callback function is given, it will be passed a chunk and should return true to indicate that emitting should start.
  • emitMatch - A boolean indicating whether to emit the matched chunk itself when found. Defaults to false.

### unique() Stores a hash of processed chunks, and discards already seen chunks. This works with string or object streams, but objects will be hashed based on their `toString()` result.
### without(chunk1[, chunk2, chunk3...]) Discard the specified chunks using strict equality. This works on string or object streams.
### out() Simply pipes the stream to stdout.
### each(callback) Executes the provided callback for each stream chunk.

Arguments

  • callback(chunk) - A user-defined function which receives each chunk.

### chars(callback) Counts the number of characters going down the stream and passes it to the callback when the stream ends. This will work on incoming string of Buffer data, assuming utf-8 encoding.

Arguments

  • callback(count) - A user-defined function which receives the final character count.

### bytes(callback) Counts the number of bytes going down the stream and passes it to the callback when the stream ends. This will work on incoming string or Buffer data, assuming utf-8 encoding.

Arguments

  • callback(count) - A user-defined function which receives the final byte count.

About

An expressive streaming utility for node, inspired by Lo-Dash, Async, and event-stream. Ice-stream goal is to allow for complex processing of streaming data.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published