Skip to content

Commit

Permalink
Drop support for Node.js 0.12 (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
SamVerschueren authored and sindresorhus committed Apr 30, 2018
1 parent 9730a34 commit 247e1d5
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 104 deletions.
1 change: 0 additions & 1 deletion .travis.yml
Expand Up @@ -3,7 +3,6 @@ node_js:
- '6'
- '5'
- '4'
- '0.12'

after_script:
- 'cat ./coverage/lcov.info | ./node_modules/coveralls/bin/coveralls.js'
34 changes: 17 additions & 17 deletions index.js
@@ -1,8 +1,8 @@
'use strict';
var Observable = require('any-observable');
const Observable = require('any-observable');

function or(option, alternate, required) {
var result = option === false ? false : option || alternate;
const result = option === false ? false : option || alternate;

if ((required && !result) || (result && typeof result !== 'string')) {
throw new TypeError(alternate + 'Event must be a string.');
Expand All @@ -11,25 +11,25 @@ function or(option, alternate, required) {
return result;
}

module.exports = function (stream, opts) {
module.exports = (stream, opts) => {
opts = opts || {};

var complete = false;
var dataListeners = [];
var awaited = opts.await;
var dataEvent = or(opts.dataEvent, 'data', true);
var errorEvent = or(opts.errorEvent, 'error');
var endEvent = or(opts.endEvent, 'end');
let complete = false;
let dataListeners = [];
const awaited = opts.await;
const dataEvent = or(opts.dataEvent, 'data', true);
const errorEvent = or(opts.errorEvent, 'error');
const endEvent = or(opts.endEvent, 'end');

function cleanup() {
complete = true;
dataListeners.forEach(function (listener) {
dataListeners.forEach(listener => {
stream.removeListener(dataEvent, listener);
});
dataListeners = null;
}

var completion = new Promise(function (resolve, reject) {
const completion = new Promise((resolve, reject) => {
function onEnd(result) {
if (awaited) {
awaited.then(resolve);
Expand All @@ -51,15 +51,15 @@ module.exports = function (stream, opts) {
if (awaited) {
awaited.catch(reject);
}
}).catch(function (err) {
}).catch(err => {
cleanup();
throw err;
}).then(function (result) {
}).then(result => {
cleanup();
return result;
});

return new Observable(function (observer) {
return new Observable(observer => {
completion
.then(observer.complete.bind(observer))
.catch(observer.error.bind(observer));
Expand All @@ -68,21 +68,21 @@ module.exports = function (stream, opts) {
return null;
}

var onData = function onData(data) {
const onData = data => {
observer.next(data);
};

stream.on(dataEvent, onData);
dataListeners.push(onData);

return function () {
return () => {
stream.removeListener(dataEvent, onData);

if (complete) {
return;
}

var idx = dataListeners.indexOf(onData);
const idx = dataListeners.indexOf(onData);

if (idx !== -1) {
dataListeners.splice(idx, 1);
Expand Down
31 changes: 9 additions & 22 deletions package.json
Expand Up @@ -10,17 +10,14 @@
"url": "github.com/jamestalmage"
},
"engines": {
"node": ">=0.12.0"
"node": ">=4"
},
"scripts": {
"test": "xo && nyc --reporter=lcov --reporter=text npm run test_both",
"test_both": "ava --require=any-observable/register/rxjs && ava --require=any-observable/register/zen"
},
"files": [
"index.js",
"zen.js",
"rxjs.js",
"rxjs-all.js"
"index.js"
],
"keywords": [
"stream",
Expand All @@ -32,22 +29,12 @@
},
"devDependencies": {
"array-to-events": "^1.0.0",
"ava": "^0.15.2",
"coveralls": "^2.11.9",
"delay": "^1.3.1",
"nyc": "^6.4.0",
"rxjs": "^5.0.0-beta.6",
"xo": "^0.16.0",
"zen-observable": "^0.3.0"
},
"xo": {
"overrides": [
{
"files": [
"test.js"
],
"esnext": true
}
]
"ava": "^0.25.0",
"coveralls": "^3.0.0",
"delay": "^2.0.0",
"nyc": "^11.7.1",
"rxjs": "^5.5.10",
"xo": "^0.20.3",
"zen-observable": "^0.8.8"
}
}
103 changes: 39 additions & 64 deletions test.js
Expand Up @@ -4,7 +4,7 @@ import ZenObservable from 'zen-observable';
import test from 'ava';
import delay from 'delay';
import arrayToEvents from 'array-to-events';
import m from './';
import m from '.';

const isZen = Observable === ZenObservable;
const prefix = isZen ? 'zen' : 'rxjs';
Expand All @@ -13,15 +13,6 @@ function emitSequence(emitter, sequence, cb) {
arrayToEvents(emitter, sequence, {delay: 'immediate', done: cb});
}

// avoid deprecation warnings
// TODO: https://github.com/jden/node-listenercount/pull/1
function listenerCount(emitter, eventName) {
if (emitter.listenerCount) {
return emitter.listenerCount(eventName);
}
return EventEmitter.listenerCount(emitter, eventName);
}

function deferred() {
let res;
let rej;
Expand All @@ -38,7 +29,7 @@ function * expectations(...args) {
yield * args;
}

test(`${prefix}: emits data events`, t => {
test(`${prefix}: emits data events`, async t => {
t.plan(2);
const ee = new EventEmitter();

Expand All @@ -51,28 +42,11 @@ test(`${prefix}: emits data events`, t => {

const expected = expectations('foo', 'bar');

return m(ee)
.forEach(chunk => t.is(chunk, expected.next().value))
.then(delay(10));
await m(ee).forEach(chunk => t.is(chunk, expected.next().value));
await delay(10);
});

// RxJs and Zen-Observable do not honor the same contract - RxJs resolves to null.
if (isZen) {
test(`${prefix}: forEach resolves with the value passed to the "end" event`, async t => {
t.plan(1);
const ee = new EventEmitter();

emitSequence(ee, [
['data', 'foo'],
['end', 'fin']
]);

const result = await m(ee).forEach(() => {});
t.is(result, 'fin');
});
}

test(`${prefix}: forEach resolves after resolution of the awaited promise${isZen ? ', with promise value' : ''}`, async t => {
test(`${prefix}: forEach resolves after resolution of the awaited promise`, async t => {
t.plan(3);
const ee = new EventEmitter();
const awaited = deferred();
Expand All @@ -89,14 +63,12 @@ test(`${prefix}: forEach resolves after resolution of the awaited promise${isZen
}
);

const result =
await m(ee, {endEvent: false, await: awaited})
.forEach(chunk => t.is(chunk, expected.next().value));
const result = await m(ee, {endEvent: false, await: awaited}).forEach(chunk => t.is(chunk, expected.next().value));
await delay(10);
t.is(result, isZen ? 'resolution' : undefined);
t.is(result, undefined);
});

test(`${prefix}: rejects on error events`, t => {
test(`${prefix}: rejects on error events`, async t => {
t.plan(3);

const ee = new EventEmitter();
Expand All @@ -105,19 +77,20 @@ test(`${prefix}: rejects on error events`, t => {
['data', 'foo'],
['data', 'bar'],
['error', new Error('bar')],
['data', 'baz'], // should be ignored
['data', 'baz'], // Should be ignored
['alternate-error', new Error('baz')]
]);

const expected = expectations('foo', 'bar');

return t.throws(
await t.throws(
m(ee).forEach(chunk => t.is(chunk, expected.next().value)),
'bar'
).then(delay(10));
);
await delay(10);
});

test(`${prefix}: change the name of the error event`, t => {
test(`${prefix}: change the name of the error event`, async t => {
t.plan(4);

const ee = new EventEmitter();
Expand All @@ -137,14 +110,14 @@ test(`${prefix}: change the name of the error event`, t => {

const expected = expectations('foo', 'bar', 'baz');

return t.throws(
await t.throws(
m(ee, {errorEvent: 'alternate-error'})
.forEach(chunk => t.is(chunk, expected.next().value)),
'baz'
);
});

test(`${prefix}: endEvent:false, and await:undefined means the Observable will never be resolved`, t => {
test(`${prefix}: endEvent:false, and await:undefined means the Observable will never be resolved`, async t => {
const ee = new EventEmitter();

emitSequence(ee, [
Expand All @@ -162,10 +135,11 @@ test(`${prefix}: endEvent:false, and await:undefined means the Observable will n
completed = true;
});

return delay(30).then(() => t.false(completed));
await delay(30);
t.false(completed);
});

test(`${prefix}: errorEvent can be disabled`, () => {
test(`${prefix}: errorEvent can be disabled`, async t => {
const ee = new EventEmitter();

emitSequence(ee, [
Expand All @@ -178,7 +152,7 @@ test(`${prefix}: errorEvent can be disabled`, () => {

ee.on('error', () => {});

return m(ee, {errorEvent: false});
await t.notThrows(m(ee, {errorEvent: false}));
});

test(`${prefix}: protects against improper arguments`, t => {
Expand All @@ -187,53 +161,54 @@ test(`${prefix}: protects against improper arguments`, t => {
t.throws(() => m(new EventEmitter(), {dataEvent: false}), /dataEvent/);
});

test(`${prefix}: listeners are cleaned up on completion, and no further listeners will be added.`, t => {
test(`${prefix}: listeners are cleaned up on completion, and no further listeners will be added.`, async t => {
t.plan(5);

const ee = new EventEmitter();
t.is(listenerCount(ee, 'data'), 0);
t.is(ee.listenerCount('data'), 0);

const observable = m(ee);

observable.forEach(() => {});
t.is(listenerCount(ee, 'data'), 1);
t.is(ee.listenerCount('data'), 1);

observable.forEach(() => {});
t.is(listenerCount(ee, 'data'), 2);
t.is(ee.listenerCount('data'), 2);

emitSequence(ee, [
['data', 'foo'],
['data', 'bar'],
['end']
]);

return observable
.forEach(() => {})
.then(() => {
t.is(listenerCount(ee, 'data'), 0);
await observable.forEach(() => {});

ee.on = ee.once = function () {
t.fail('should not have added more listeners');
};
t.is(ee.listenerCount('data'), 0);

observable.forEach(() => {});
t.is(listenerCount(ee, 'data'), 0);
const onEvent = () => {
t.fail('should not have added more listeners');
};

return observable.forEach(() => {});
});
ee.on = onEvent;
ee.once = onEvent;

observable.forEach(() => {});
t.is(ee.listenerCount('data'), 0);

await observable.forEach(() => {});
});

test(`${prefix}: unsubscribing reduces the listener count`, async t => {
test(`${prefix}: unsubscribing reduces the listener count`, t => {
const ee = new EventEmitter();
t.is(listenerCount(ee, 'data'), 0);
t.is(ee.listenerCount('data'), 0);

const observable = m(ee);

const subscription = observable.subscribe({});

t.is(listenerCount(ee, 'data'), 1);
t.is(ee.listenerCount('data'), 1);

subscription.unsubscribe();

t.is(listenerCount(ee, 'data'), 0);
t.is(ee.listenerCount('data'), 0);
});

0 comments on commit 247e1d5

Please sign in to comment.