Skip to content

Commit

Permalink
Merge f52a379 into abac77b
Browse files Browse the repository at this point in the history
  • Loading branch information
lupomontero committed Sep 11, 2018
2 parents abac77b + f52a379 commit e7e659d
Show file tree
Hide file tree
Showing 13 changed files with 1,785 additions and 71 deletions.
10 changes: 10 additions & 0 deletions .babelrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"presets": [["env", { "modules": false }]],
"env": {
"test": {
"plugins": [
"transform-es2015-modules-commonjs"
]
}
}
}
1 change: 1 addition & 0 deletions .eslintignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
coverage/
dist/
65 changes: 33 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,9 @@ npm install lupomontero/pact
A `Promise` that will resolve to an array with the results for each _task_.
Results will be in the same order as in the input _tasks_ array.

<!--
### `EventEmitter pact.emitter(tasks, concurrency, interval, failFast)`
#### Arguments
#### Return value
### `Stream pact.stream(tasks, concurrency, interval, failFast, streamOtions)`
#### Arguments
#### Return value
-->

## Examples
#### Examples

## Series
##### Series

Process each task after the other, sequentially. Each task will wait for the
previous one to complete. Concurrency set to `1` (one task at a time).
Expand All @@ -64,7 +50,7 @@ pact(tasks)
.catch(console.error);
```
## Batches
##### Batches
Process _tasks_ in _batches_ based on a given _concurrency_. In this example
_tasks_ will be processed in batches of 5 _tasks_ each. Each batch waits for the
Expand All @@ -76,7 +62,7 @@ pact(tasks, 5)
.catch(console.error);
```
## Throttled
##### Throttled
Same as example above but adding a 1000ms delay between batches.
Expand All @@ -86,7 +72,7 @@ pact(tasks, 5, 1000)
.catch(console.error);
```
## failFast=false (don't bail out on errors)
##### failFast=false (don't bail out on errors)
Same as above, but in this case if a promise fails, processing will continue
instead of stopping the whole thing. When `failFast` is set to `false`, errors
Expand All @@ -96,33 +82,48 @@ will appear as the value/result for the relevant element in the results array
```js
pact(tasks, 5, 1000, false)
.then(console.log)
.catch(console.error);
```
<!--
## EventEmitter
### `stream.Readable pact.createStream(tasks, concurrency, interval, failFast)`
```js
const { emitter } = require('pact');
#### Arguments
emitter(tasks, 5, 1000, false)
Same as `pact()`.
#### Return value
A readable stream (`stream.Readable`) instead of a `Promise`. Each result will
be emitted as a data event and the stream will operate in `objectMode`.
#### Examples
##### Handling each event independently... (old school)
```js
pact.createStream(tasks, 5, 1000, false)
.on('error', err => console.error('error', err))
.on('data', data => console.log('data', data))
.on('log', (level, message) => console[level](`${level}: ${message}`))
.on('end', _ => console.log('ended!'));
```
## Stream
##### Piping to a writable stream
```js
// This example assumes that tasks will resolve to string values so that the
// resulting stream can be directly piped to stdout.
pact.createStream(tasks, 5, 1000, false).pipe(process.stdout);
```
<!--
### `EventEmitter pact.createEventEmitter(tasks, concurrency, interval, failFast)`
```js
const { stream } = require('pact');
const { emitter } = require('pact');

stream(tasks, 5, 1000, false, { objectMode: true })
emitter(tasks, 5, 1000, false)
.on('error', err => console.error('error', err))
.on('data', data => console.log('data', data))
.on('log', (level, message) => console[level](`${level}: ${message}`))
.on('end', _ => console.log('ended!'));


stream(tasks, 5, 1000, false).pipe(process.stdout);
```
-->
15 changes: 8 additions & 7 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@
const splitArrayIntoBatches = (arr, limit) => arr.reduce((memo, item) => {
if (memo.length && memo[memo.length - 1].length < limit) {
memo[memo.length - 1].push(item);
return memo;
}
return [...memo, [item]];
}, []);
const splitArrayIntoBatches = require('./lib/splitArrayIntoBatches');


module.exports = (tasks, concurrency, interval = 0, failFast = true) => {
Expand All @@ -28,3 +22,10 @@ module.exports = (tasks, concurrency, interval = 0, failFast = true) => {

return processBatches(splitArrayIntoBatches(tasks, concurrency));
};


// Exclude `createStream` method in basic build for browsers.
if (!process.env.PACT_NO_STREAMS) {
// eslint-disable-next-line global-require
module.exports.createStream = require('./lib/createStream');
}
38 changes: 38 additions & 0 deletions lib/createStream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// WARNING ONLY NODE???
const { Readable } = require('stream');
const splitArrayIntoBatches = require('./splitArrayIntoBatches');


module.exports = (tasks, concurrency, interval = 0, failFast = true) => {
const stream = new Readable({
objectMode: true,
read() {},
});

let hasErrored = false;
const error = (err) => {
hasErrored = true;
stream.emit('error', err);
};

const push = chunk => (!hasErrored && stream.readable && stream.push(chunk));

const processBatches = (batches) => {
if (!batches.length) {
return push(null);
}

const wrap = fn => fn()
.then(result => push(result))
.catch(err => (failFast ? error(err) : push(err)));

return Promise.all(batches[0].map(wrap)).then(() => setTimeout(
() => (!hasErrored && stream.readable && processBatches(batches.slice(1))),
interval,
));
};

processBatches(splitArrayIntoBatches(tasks, concurrency));

return stream;
};
7 changes: 7 additions & 0 deletions lib/splitArrayIntoBatches.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module.exports = (arr, limit) => arr.reduce((memo, item) => {
if (memo.length && memo[memo.length - 1].length < limit) {
memo[memo.length - 1].push(item);
return memo;
}
return [...memo, [item]];
}, []);
15 changes: 11 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
{
"name": "pact",
"version": "1.0.0",
"main": "index.js",
"main": "./index.js",
"license": "MIT",
"scripts": {
"pretest": "eslint .",
"test": "jest --verbose --coverage",
"coveralls": "cat ./coverage/lcov.info | coveralls"
"test": "jest --coverage",
"coveralls": "cat ./coverage/lcov.info | coveralls",
"build:basic": "webpack --define process.env.PACT_NO_STREAMS=true --mode production --entry ./index.js -o ./dist/pact.basic.min.js --output-library pact --output-library-target umd",
"build:full": "webpack --mode production --entry ./index.js -o ./dist/pact.full.min.js --output-library pact --output-library-target umd",
"build": "yarn build:basic && yarn build:full"
},
"devDependencies": {
"babel-core": "^6.26.3",
"babel-preset-env": "^1.7.0",
"coveralls": "^3.0.2",
"eslint": "^5.5.0",
"eslint-config-airbnb-base": "^13.1.0",
"eslint-plugin-import": "^2.14.0",
"jest": "^23.6.0"
"jest": "^23.6.0",
"webpack": "^4.18.0",
"webpack-cli": "^3.1.0"
}
}
File renamed without changes.
17 changes: 17 additions & 0 deletions test/basic.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
describe('pact (basic - no streams)', () => {
beforeAll(() => {
process.env.PACT_NO_STREAMS = true;
});

afterAll(() => {
process.env.PACT_NO_STREAMS = undefined;
// done();
});

it('should be a function and NOT have `createStream`', () => {
// eslint-disable-next-line global-require
const pact = require('../');
expect(typeof pact).toBe('function');
expect(pact.createStream).toBe(undefined);
});
});
2 changes: 1 addition & 1 deletion test/pact.spec.js → test/index.spec.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const { createDelayedPromise, createFailedDelayedPromise, timed } = require('./common');
const { createDelayedPromise, createFailedDelayedPromise, timed } = require('./testUtils');
const pact = require('../');


Expand Down
59 changes: 59 additions & 0 deletions test/lib/createStream.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
const { Readable } = require('stream');
const { createDelayedPromise, createFailedDelayedPromise } = require('../testUtils');
const createStream = require('../../lib/createStream');


describe('createStream', () => {
it('should be a function', () => {
expect(typeof createStream).toBe('function');
});

it('should return a readable stream', () => {
expect(createStream([]) instanceof Readable).toBe(true);
});

it('should process tasks in series and emit results as data events', (done) => {
const results = [];
createStream([
() => createDelayedPromise('xxx', 20),
() => createDelayedPromise('yyy', 10),
])
.on('data', result => results.push(result))
.on('end', () => {
expect(results).toEqual(['xxx', 'yyy']);
done();
});
});

it('should failFast by default', (done) => {
createStream([
() => createFailedDelayedPromise('xxx', 10),
() => createDelayedPromise('yyy', 10),
() => createFailedDelayedPromise('zzz', 10),
])
.on('error', (err) => {
expect(err instanceof Error).toBe(true);
expect(err.message).toBe('Error: xxx');
done();
});
});

it('should not bail out when failFast is false', (done) => {
const onData = jest.fn();
createStream([
() => createFailedDelayedPromise('xxx', 10),
() => createDelayedPromise('yyy', 10),
() => createFailedDelayedPromise('zzz', 10),
], 1, 0, false)
.on('data', onData)
.on('end', () => {
expect(onData.mock.calls.length).toBe(3);
expect(onData.mock.calls[0].length).toBe(1);
expect(onData.mock.calls[1].length).toBe(1);
expect(onData.mock.calls[0][0] instanceof Error).toBe(true);
expect(onData.mock.calls[1][0]).toBe('yyy');
expect(onData.mock.calls[2][0] instanceof Error).toBe(true);
done();
});
});
});
File renamed without changes.

0 comments on commit e7e659d

Please sign in to comment.