Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
maximilianschmitt committed May 19, 2015
0 parents commit d7fded4
Show file tree
Hide file tree
Showing 6 changed files with 242 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
coverage
node_modules
.DS_Store
9 changes: 9 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
language: node_js
node_js:
- '0.10'
- '0.11'
- '0.12'
- 'iojs'
notifications:
email: false
after_success: 'npm run coveralls'
56 changes: 56 additions & 0 deletions main.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
'use strict';

var Readable = require('readable-stream').Readable;

var streamInterval = function(opts, fn, interval) {
if (typeof opts === 'function') {
interval = fn;
fn = opts;
}

var outStream = new Readable(opts);
outStream.stop = function() {
this.stopped = true;
this.push(null);
};
outStream._read = function() {};
scheduleInterval(Date.now());

return outStream;

function buildStream() {
var now = Date.now();
var inStream = fn();

inStream.on('data', function listen(data) {
if (outStream.stopped) {
inStream.removeListener('data', listen);
return;
}

outStream.push(data);
});

inStream.once('end', function onEnd() {
if (outStream.stopped) {
return;
}

scheduleInterval(now);
});

}

function scheduleInterval(then) {
if (outStream.stopWhenPossible) {
outStream.end();
return;
}

var now = Date.now();
var scheduled = interval - (now - then);
setTimeout(buildStream, scheduled > 0 ? scheduled : 0);
}
};

module.exports = streamInterval;
33 changes: 33 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"name": "stream-interval",
"version": "0.0.1",
"description": "setInterval for for functions that return streams",
"homepage": "https://github.com/maximilianschmitt/stream-interval",
"bugs": "https://github.com/maximilianschmitt/stream-interval/issues",
"license": "MIT",
"author": {
"name": "Maximilian Schmitt",
"email": "maximilian.schmitt@googlemail.com",
"url": "http://maximilianschmitt.me"
},
"repository": {
"type": "git",
"url": "https://github.com/maximilianschmitt/stream-interval.git"
},
"main": "./main",
"scripts": {
"test": "istanbul cover _mocha",
"prepublish": "npm test",
"coveralls": "cat ./coverage/lcov.info | coveralls"
},
"dependencies": {
"readable-stream": "^1.0.33"
},
"devDependencies": {
"chai": "^2.3.0",
"coveralls": "^2.11.2",
"istanbul": "^0.3.14",
"mocha": "^2.2.5",
"through2": "^0.6.5"
}
}
5 changes: 5 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# streamInterval

[![Travis Build](http://img.shields.io/travis/maximilianschmitt/stream-interval.svg?style=flat)](https://travis-ci.org/maximilianschmitt/stream-interval) [![Code Coverage](https://img.shields.io/coveralls/maximilianschmitt/stream-interval.svg)](https://coveralls.io/r/maximilianschmitt/stream-interval) [![npm](https://img.shields.io/npm/dm/stream-interval.svg)](https://www.npmjs.com/package/stream-interval)

setInterval for functions that return streams.
136 changes: 136 additions & 0 deletions test/main.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
'use strict';

var expect = require('chai').expect;
var Readable = require('stream').Readable;
var streamInterval = require('../main');

describe('streamInterval', function() {
it('exposes a function', function() {
expect(streamInterval).to.be.a('function');
});

it('stops when told to stop', function(done) {
var i = 0;
var si = threeHellos();

si.on('data', function() {
i++;
});

si.on('end', function() {
expect(i).to.equal(3);
done();
});
});

it('pipes through everything', function(done) {
var all = '';
var si = threeHellos();

si.on('data', function(data) {
all += data;
});

si.on('end', function() {
expect(all).to.equal('hellohellohello');
done();
});
});

it('works with streams in object mode', function(done) {
var all = [];
var si = threeObjects();

si.on('data', function(data) {
all.push(data);
});

si.on('end', function() {
expect(all).to.deep.equal([{ hello: 'world' }, { hello: 'world' }, { hello: 'world' }]);
done();
});
});

it('calls the stream factory at most every x milliseconds', function(done) {
var all = '';
var si = streamInterval(function() {
return new Readable({
read: function() {
this.push('hello');
this.push(null);
}
});
}, 100);

si.on('data', function(data) {
all += data;
});

setTimeout(function() {
expect(all).to.equal('hellohellohellohellohello');
si.stop();
done();
}, 550);
});

it('calls the next function only if the current stream has ended', function(done) {
var all = '';
var calls = 0;
var si = streamInterval(function() {
calls++;

var s = new Readable({
read: function(){}
});

setTimeout(function() {
s.push('hello');
s.push(null);
}, 100);

return s;
});

si.on('data', function(data) {
all += data;
});

setTimeout(function() {
expect(calls).to.equal(4);
expect(all).to.equal('hellohellohello');
si.stop();
done();
}, 350);
});
});

function threeObjects() {
var i = 0;
var si = streamInterval({ objectMode: true }, function() {
var s = new Readable({
objectMode: true,
read: function() {
this.push({ hello: 'world' });
this.push(null);
}
});
if (++i > 3) si.stop();
return s;
});
return si;
}

function threeHellos() {
var i = 0;
var si = streamInterval(function() {
var s = new Readable({
read: function() {
this.push('hello');
this.push(null);
}
});
if (++i > 3) si.stop();
return s;
});
return si;
}

0 comments on commit d7fded4

Please sign in to comment.