Skip to content

kichooo/object-stream-tools

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

65 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Build Status

object-stream-tools

This package brings goodies of functional programming (map, filter, reduce) to node streams.

Installation

npm install --save object-stream-tools

Usage

arrayToStream

Converts existing array to stream of objects. Useful if you want to inject/merge those object to the existing stream.

const ost = require('object-stream-tools')
ost.arrayToStream([{foo: 'bar'}, {web: 'scale'}])
    .on('data', data => {
        console.log(data)
    })
    .pipe(somewhereWritable)        

Prints

[{foo: 'bar'}, {web: 'scale'}]

streamToSet

Its very useful if you want to get unique elements / set of values

const jsonStream = require('JSONStream')
fs.createReadStream('../test/data.json')
    .pipe(jsonStream.parse('*'))
    .pipe(ost.map(obj => obj.requiredProperty))
    .pipe(ost.streamToSet())
    .on('data', uniqueSet => {
        // here one get array of unique elements
        const uniqueArray = Array.from(uniqueSet.values()).sort()
    })

filter

If you just want to remove some objects from stream, you probably want to use filter function.

fs.createReadStream('../test/data.json')
    .pipe(jsonStream.parse('*'))
    .pipe(ost.filter(e => e.value > 6))
    // here you will get filtered objects
    .pipe(jsonStream.stringify())
    .pipe(process.stdout)

map-reduce

Map is useful when you want to modify existing objects in the stream.

Reduce is useful if you want to get single object/value based on whole stream, but you dont want to load whole stream to memory.

Example: sum / average value of huge stream

const jsonStream = require('JSONStream')
fs.createReadStream('./test/data.json')
    .pipe(jsonStream.parse('*'))
    // pick required property you want to reduce over
    .pipe(ost.map(obj => obj.requiredProperty))
    .pipe(ost.reduce((acc, curr, i) => {
        return acc + curr + i
    }, 0))
    .on('data', reducedValue => {
        // here you will get reduced value
    })

Here is example with buffered/string input output:

const jsonStream = require('JSONStream')
fs.createReadStream('./test/data.json')
    .pipe(jsonStream.parse('*'))
    .pipe(ost.map(obj => obj.requiredProperty))
    .pipe(ost.reduce((acc, curr, i) => {
        return acc + curr + i
    }, 0))
    .on('data', reducedValue =>
        // here you will get reduced value 
    })
    .pipe(jsonStream.stringify())
    .pipe(process.stdout)

Please note that if you do not pass initial value reduce function will start in (prev, curr, i) mode. Objects/Array/Reduce

Promises

Very handy when you want to aggregate streams using reduce or derrivated calls. Keep in mind .promise() will not work if you use only ost.map or ost.reduce features - as they do not aggregate.

fs.createReadStream('../test/data.json')
    .pipe(jsonStream.parse('*'))
    .pipe(ost.streamToArray())
    .promise()
    .then(data => {
        // here you will get your aggregated data - array of values.
    })

find

Find is super handy if we want to quickly check if vale/objects exists in the stream. Think about it as a grep on the steroids.

fs.createReadStream('../test/data.json')
    .pipe(jsonStream.parse('*'))
    .pipe(ost.find(e => e.value > 6))
    .then(foundObj => {
        // here you will get found first object that matches condition
        // or undefined when there were none that matches
    })

Please look at the tests for more use cases.

About

Work on you streams like you work with arrays.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published