Permalink
Browse files

Initial commit

  • Loading branch information...
mtrunkat committed Feb 3, 2019
0 parents commit 54c99c2f1dc735a498f8da51d235d1f05c3ed93c
@@ -0,0 +1,11 @@
root = true

[*]
indent_style = space
indent_size = 4
charset = utf-8
trim_trailing_whitespace = true
insert_final_newline = true
end_of_line = lf
# editorconfig-tools is unable to ignore longs strings or urls
max_line_length = null
@@ -0,0 +1,30 @@
{
"extends": "airbnb-base",
"plugins": [
"import",
"promise"
],
"rules": {
"indent": ["error", 4],
"no-underscore-dangle": [2, {
"allow": ["_id"],
"allowAfterThis": true
}],
"no-use-before-define": 0,
"no-param-reassign": 0,
"consistent-return": 0,
"array-callback-return": 0,
"object-curly-newline": 0,
"arrow-body-style": 0,
"no-plusplus": 0,
"strict": ["error", "global"],
"max-len": ["error", 150],
"no-undef": 0,
"func-names": 0,
"import/prefer-default-export": 0,
"import/no-absolute-path": 0,
"import/no-extraneous-dependencies": ["error", { "devDependencies": ["**/test/**/*.js"] }],
"no-await-in-loop": 0,
"no-restricted-syntax": ["error", "ForInStatement", "LabeledStatement", "WithStatement"]
}
}
@@ -0,0 +1,6 @@
node_modules
.DS_Store
coverage
.nyc_output
.idea
package-lock.json
@@ -0,0 +1,76 @@
# @TODO find name

## TOC

## Motivation

## Basic usage

```javascript
const { Readable } = require('stream');
const ObjectStreamUtilities = require('../src/main');
// Let have some stream that will output a serie of objects { n: 0 }, { n: 1 }, { n: 2 }, { n: 3 }, ...
const readable = new Readable({ objectMode: true });
let n = 0;
setInterval(() => readable.push({ n: n++ }), 1000);
const mapped = readable.pipe(new ObjectStreamUtilities());
// Split the stream into a stream of odd objects and even objects
// and extend them with some field is=odd or is=even
const oddStream = mapped
.filter((obj) => obj.x % 2)
.map((obj) => Object.assign({}, obj, { is: 'odd' }));
const evenStream = mapped
.filter((obj) => obj.x % 2 === 0)
.map((obj) => Object.assign({}, obj, { is: 'even' }));
// Then merge them back.
const mergedStream = oddStream.merge(evenStream);
// Chunk them by 100 records.
const chunkedStream = mergedStream.chunk(100);
// Save them to MongoDB in batches of 100 items with concurrency 2.
// This always corks the stream during the period when max concurrency is reached.
chunkedStream.onSeries(async (arrayOf100Items) => {
await datase.collection('test').insert(arrayOf100Items);
}, { concurrency: 2 });
```

## Reference

### merge

### collect

### filter

### chunk

### map

### merge

### omit

### pick

### pluck

### uniq

### weakSort

### onSeries

## Examples

### Batched upload to database



### Weak sorting slightly unordered stream
@@ -0,0 +1,49 @@
{
"name": "object-stream-utilities",
"version": "0.0.1",
"description": "@TODO",
"engines": {
"node": ">=8.0.0"
},
"main": "build/index.js",
"keywords": [
"@TODO"
],
"author": {
"name": "Marek Trunkat",
"url": "https://trunkat.eu"
},
"license": "Apache-2.0",
"repository": {
"type": "git",
"url": "git+https://github.com/@TODO"
},
"bugs": {
"url": "@TODO"
},
"homepage": "@TODO",
"files": [
"src"
],
"scripts": {
"test": "nyc --reporter=html --reporter=text mocha --timeout 60000 --recursive",
"lint": "npm run build && ./node_modules/.bin/eslint ./src ./test",
"lint:fix": "./node_modules/.bin/eslint ./src ./test --ext .js,.jsx --fix"
},
"dependencies": {
"underscore": "^1.9.1"
},
"devDependencies": {
"chai": "^4.2.0",
"chai-as-promised": "^7.1.1",
"eslint": "^5.5.0",
"eslint-config-airbnb": "^17.1.0",
"eslint-config-airbnb-base": "^13.1.0",
"eslint-plugin-import": "^2.2.0",
"eslint-plugin-jsx-a11y": "^6.0.2",
"eslint-plugin-promise": "^4.0.1",
"eslint-plugin-react": "^7.0.1",
"mocha": "^3.5.3",
"nyc": "^13.0.1"
}
}
@@ -0,0 +1,87 @@
const { ObjectIdentityTransform } = require('./utils');
const Map = require('./transformations/Map');
const Filter = require('./transformations/Filter');
const Pluck = require('./transformations/Pluck');
const Chunk = require('./transformations/Chunk');
const Merge = require('./transformations/Merge');
const Omit = require('./transformations/Omit');
const Pick = require('./transformations/Pick');
const Uniq = require('./transformations/Uniq');
const WeakSort = require('./transformations/WeakSort');

const METHODS = {
map: Map,
filter: Filter,
pluck: Pluck,
chunk: Chunk,
omit: Omit,
pick: Pick,
uniq: Uniq,
weakSort: WeakSort,
};

class ObjectStreamUtilities extends ObjectIdentityTransform {
constructor() {
super();
for (const methodName of Object.keys(METHODS)) {
this._registerTransformation(methodName, METHODS[methodName]);
}
}

_registerTransformation(methodName, MethodClass) {
this[methodName] = (...args) => {
return this
.pipe(new MethodClass(...args))
.pipe(new ObjectStreamUtilities());
};
}

merge(stream) {
return this
.pipe(new Merge(this, stream), { end: false })
.pipe(new ObjectStreamUtilities());
}

async onSeries(func, { concurrency = 1 } = {}) {
let processing = 0;

return new Promise((resolve, reject) => {
let finishOnLast = false;

this.on('data', async (chunk) => {
try {
processing++;
if (processing === concurrency) this.pause();
await func(chunk);
this.resume();
processing--;

if (processing === 0 && finishOnLast) {
setTimeout(() => {
if (processing === 0) resolve();
}, 0);
}
} catch (err) {
this.emit('error', err);
}
});
this.on('end', () => {
if (processing === 0) resolve();
else finishOnLast = true;
});
this.on('error', reject);
});
}

async collect() {
return new Promise((resolve, reject) => {
const data = [];

this.on('data', chunk => data.push(chunk));
this.on('end', () => resolve(data));
this.on('error', reject);
});
}
}

module.exports = ObjectStreamUtilities;
@@ -0,0 +1,29 @@
const { ObjectTransform } = require('../utils');

module.exports = class Chunk extends ObjectTransform {
constructor(size) {
super();
this.size = size;
this.buffer = [];
}

_final(callback) {
this._emitBuffer();
callback();
}

_emitBuffer() {
this.push(this.buffer);
this.buffer = [];
}

_transform(chunk, encoding, callback) {
try {
this.buffer.push(chunk);
if (this.buffer.length === this.size) this._emitBuffer();
callback();
} catch (err) {
callback(err);
}
}
};
@@ -0,0 +1,17 @@
const { ObjectTransform } = require('../utils');

module.exports = class Filter extends ObjectTransform {
constructor(func) {
super();
this.func = func;
}

_transform(chunk, encoding, callback) {
try {
if (this.func(chunk)) callback(null, chunk);
else callback();
} catch (err) {
callback(err);
}
}
};
@@ -0,0 +1,16 @@
const { ObjectTransform } = require('../utils');

module.exports = class Map extends ObjectTransform {
constructor(func) {
super();
this.func = func;
}

_transform(chunk, encoding, callback) {
try {
callback(null, this.func(chunk));
} catch (err) {
callback(err);
}
}
};
@@ -0,0 +1,19 @@
const { ObjectIdentityTransform } = require('../utils');

module.exports = class Pluck extends ObjectIdentityTransform {
constructor(sourceStream, mergedStream) {
super();

const sourceStreamEndPromise = new Promise((resolve => sourceStream.on('end', resolve)));
const mergedStreamEndPromise = new Promise((resolve => mergedStream.on('end', resolve)));

mergedStream.pipe(this, { end: false });

Promise
.all([
sourceStreamEndPromise,
mergedStreamEndPromise,
])
.then(() => this.end());
}
};
@@ -0,0 +1,17 @@
const _ = require('underscore');
const { ObjectTransform } = require('../utils');

module.exports = class Omit extends ObjectTransform {
constructor(...fields) {
super();
this.fields = fields;
}

_transform(chunk, encoding, callback) {
try {
callback(null, _.omit(chunk, ...this.fields));
} catch (err) {
callback(err);
}
}
};
@@ -0,0 +1,17 @@
const _ = require('underscore');
const { ObjectTransform } = require('../utils');

module.exports = class Pick extends ObjectTransform {
constructor(...fields) {
super();
this.fields = fields;
}

_transform(chunk, encoding, callback) {
try {
callback(null, _.pick(chunk, ...this.fields));
} catch (err) {
callback(err);
}
}
};
Oops, something went wrong.

0 comments on commit 54c99c2

Please sign in to comment.