Skip to content
Eugene Lazutkin edited this page Nov 13, 2020 · 15 revisions

This toolkit is used to process huge files. As such even a millisecond per operation can add up to minutes and hours. For example, a microsecond over 1 billion operations will add ~16.5 minutes. A millisecond over 1 billion operations will add ~11.5 days.

That's why the performance considerations played a major role in the design and implementation of stream-json. Below is the list of best practices for users of the toolkit.

Streams

Reduce pipeline size

Every chain in a stream-based data processing pipeline introduces a latency. Try to minimize the size of your pipeline. While it is tempting to use a lot of small filters/transforms, try to combine them into one component, if possible (the example use stream-chain for simplicity):

const {chain} = require('stream-chain');

// fine-grained, but less efficient
chain([
  sourceStream,
  // filters
  data => data.key % 1 !== 0 ? data : null,
  data => data.value.important ? data : null,
  // transforms
  data => data.value.price,
  price => price * taxRate
]);

// more efficient
chain([
  sourceStream,
  data => {
    if (data.key % 1 !== 0 && data.value.important) {
      return data.value.price * taxRate;
    }
    return null; // ignore
  }
]);

In general, boundaries between streams are relatively expensive, and should be used when stream components generate a varying number of items — this way we can take advantage of the stream's ability to handle a back-pressure correctly. Otherwise, simple function calls are more efficient.

Reduce traffic

The less traffic goes across a pipeline the faster it is. If you use filters, try to remove as many items as possible as early as possible by arranging filters properly:

// let's assume that we have a small number of important objects,
// and valid() is an expensive function to calculate

// fine-grained, but less efficient
chain([
  sourceStream,
  // filters
  data => valid(data) ? data : null,
  data => data.value.important ? data : null
]);

// better
chain([
  sourceStream,
  // filters
  data => data.value.important ? data : null,
  data => valid(data) ? data : null
]);

// best
chain([
  sourceStream,
  // filters
  data => data.value.important && valid(data) ? data : null
]);

The filters should be arranged to minimize the expense of filtering. The same goes for transforms.

Individual components of stream-json

Parser

While Parser streams values just fine, sometimes we need a value itself. Parser can pack values and send out xxxValue itself. No need to do it in your custom code, unless it provides some discernable benefits.

Sometimes, when we have values, we don't need them to be streamed. Suppressing them can reduce traffic. Parser can be told to suppress streaming values using streamXXX options, which work only when packing corresponding values:

const parser1 = new Parser();
// streams values like that:
// {name: 'startString'}
// {name: 'stringChunk', value: 'a'} // zero or more chunks
// {name: 'stringChunk', value: 'b'}
// {name: 'endString'}
// {name: 'stringValue', value: 'ab'}

// In reality, it is unlikely to have chunks one character worth.

const parser2 = new Parser({packValues: false});
// streams values like that:
// {name: 'startString'}
// {name: 'stringChunk', value: 'a'} // zero or more chunks
// {name: 'stringChunk', value: 'b'}
// {name: 'endString'}

const parser3 = new Parser({streamValues: false});
// streams values like that:
// {name: 'stringValue', value: 'ab'}

Some components downstream may have special requirements for a stream. For example, filters may want to have key values present. Replace may have additional requirements described below. Stringer by default uses value chunks but can be switched to use values as described below.

Main module

The main module returns a function, which creates a parser decorated with emit(). Use alternatives, if this functionality is not required:

const makeParser = require('stream-json');
makeParser().pipe(someFilter); // token events are not used

// better #1
const {Parser} = require('stream-json');
new Parser().pipe(someFilter);

// better #2
const {parser} = require('stream-json');
parser().pipe(someFilter);

JSONL AKA NDJSON

(Since 1.6.0) If you deal with a strict JSONL (or NDJSON) format, and convert token streams to JavaScript objects using streamers, use jsonl/Parser to improve performance.

const makeParser     = require('stream-json');
const {streamValues} = require('stream-json/streamers/StreamValues');
chain([
  makeParser({jsonStreaming: true}),
  streamValues(),
  someConsumer
]);

// more efficient
const {parser: jsonlParser} = require('stream-json/jsonl/Parser');
chain([
  jsonlParser(),
  someConsumer
]);

Filters

A common use case is to select just one item from a stream. Frequently after selecting no other items can be matched, yet a filter is applied to them. It is an especially common case for filters specified by a string for the direct match. One way to eliminate this inefficiency is to set {once: true} in options of a filter.

Replace and Ignore

Replace can generate substreams by itself:

  • A replacement substream provided by a user.
  • Property keys can be generated on a replacement.

Ignore is based on Replace.

The former case is 100% controlled by a user, while the latter case can include a streaming part:

const replace1 = new Replace();
// can generate keys like that:
// {name: 'startKey'}
// {name: 'stringChunk', value: 'a'}
// {name: 'endKey'}
// {name: 'keyValue', value: 'a'}

const replace2 = new Replace({streamKeys: false});
// can generate keys like that:
// {name: 'keyValue', value: 'a'}

Usually, we select the same style of values across the whole pipeline.

Streamers

All streamers support objectFilter to control if incoming objects are assembled into JavaScript objects, or discarded without assembling. Depending on many factors, it can be more efficient to filter streams at this level.

The following things should be considered:

  • If a property required to make a filtering decision usually comes last, the whole object will be selected before we can render a decision. In this case, it is unlikely to gain any performance benefit.
  • objectFilter function will be called on every update of an object being assembled. If a filtering function is relatively expensive, it could be more beneficial to filter a stream after assembling an object.
// variant #1
chain([
  sourceStream,
  new StreamArray({objectFilter: asm => {
    const value = asm.current;
    if (value && value.hasProperty('important')) {
      return value.important;
    }
    // return undefined; // we are undecided yet
  }})
]);

// variant #2
chain([
  sourceStream,
  new StreamArray(),
  data => {
    const value = data.value;
    return value && value.important;
  }
]);

Analyze your data stream, or even benchmark it, in order to decide on the most optimal way to filter the stream.

Utilities

Utf8Stream

Utf8Stream provides an additional value only if it reads from binary buffers. If its input is string data, it is essentially a pass-through and it can be excluded from a pipeline.

withParser()

withParser() and static methods of streaming components with the same name return a chain object by stream-chain.

A chain object is an instance of Duplex, which wraps a pipeline adding an extra level of indirection. This overhead can be easily avoided:

// variant #1
const pipeline = StreamArray.withParser();
fs.createReadStream('sample.json').pipe(pipeline);
pipeline.on('end', () => console.log('done!'));

// variant #2: more verbose, yet slightly more efficient
const pipeline = StreamArray.withParser();
fs.createReadStream('sample.json').pipe(pipeline.input);
pipeline.output.on('end', () => console.log('done!'));

As you can see, a chain object looks like a Source of 0.x on steroids and can be used as such.

Assembler

While Assembler comes with consume(data), it is easy to do it yourself without calling a function. The common pattern for Assembler-like objects is simple:

data => asm[data.name] && asm[data.name](data.value)

Consider it, when creating your own components incorporating Assembler or similar objects.

Stringer

Stringer uses value chunks (stringChunk and numberChunk) to produces its output. If streaming of values was disabled in a pipeline with {streamValues: false} or some other means, it will break Stringer. In this case, it should be switched to use packed values. It can be done with the following options: useValues, useKeyValues, useStringValues, and useNumberValues. Always make sure that a constructed pipeline is consistent.

Emitter and emit()

Both Emitter and emit() are helpers. If they are proved to be a bottleneck, we can easily avoid to use them:

// variant #1
const emitter = new Emitter();
sourceStream.pipe(emitter);
emitter.on('startObject', () => console.log('object!'));

// variant #2
emit(sourceStream);
sourceStream.on('startObject', () => console.log('object!'));

// more efficient variant #3
sourceStream.on('data', data => data.name === 'startObject' && console.log('object!'));