Skip to content

Commit

Permalink
Streams to skip a count, skip a periodic count, and gather a ste of e…
Browse files Browse the repository at this point in the history
…lements into an array
  • Loading branch information
meschbach committed Feb 13, 2020
1 parent 5789f2a commit e32c96b
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 0 deletions.
37 changes: 37 additions & 0 deletions streams/gather.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
const {Transform} = require("stream");

/**
* Groups a specific set of writes together to be dispatched as a single array.
*/
class GatherTransform extends Transform {
constructor(groupCount, itemTranform = identity ) {
super({
objectMode: true
});
this.groupCount = groupCount;
this.pendingGroup = [];
this.itemTranform = itemTranform;
}

_transform(data, encoding, callback){
const item = this.itemTranform(data);
this.pendingGroup.push(item);
if( this.pendingGroup.length >= this.groupCount ){
this.push(this.pendingGroup);
this.pendingGroup = [];
}
callback(null);
}

_flush(callback){
if( this.pendingGroup.length > 0 ){
this.push(this.pendingGroup);
this.pendingGroup = [];
}
callback(null);
}
}

module.exports = {
GatherTransform
};
53 changes: 53 additions & 0 deletions streams/skips.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
const {Transform} = require("stream");

/**
* Skips a specific number of elements before continuing the stream. This transform may be modified after constructions
* to optional filter out a number of elements.
*/
class SkipCount extends Transform {
constructor(count) {
super({});
this.skip = count;
}

_transform(data, encoding, callback){
if( this.skip <= 0 ){
this.push(data);
} else {
this.skip--;
}
callback();
}
}

/**
* Skips specific elements within a period. For example our input stream structure represents a different entity every
* 9 elements. Within each entity we are only interested in the 4th and 7th elements. To accomplish this a PeriodicSkip
* would be configured with a period of 9, matching [4,9].
*/
class PeriodicSkip extends Transform {
constructor(period, match, options) {
super(options);
this.match = match;
this.period = period;
this.index = 0;
}

_transform(data, encoding, callback){
const current = this.index;
this.index++;
if( this.index >= this.period ){
this.index = 0;
}

if( this.match.includes(current) ) {
this.push(data);
}
callback();
}
}

module.exports = {
SkipCount,
PeriodicSkip
};

0 comments on commit e32c96b

Please sign in to comment.