From d7fded4732f2148079b51458cd915b9b4abcd996 Mon Sep 17 00:00:00 2001 From: Maximilian Schmitt Date: Tue, 19 May 2015 13:30:22 +0200 Subject: [PATCH] initial commit --- .gitignore | 3 ++ .travis.yml | 9 ++++ main.js | 56 +++++++++++++++++++++ package.json | 33 +++++++++++++ readme.md | 5 ++ test/main.js | 136 +++++++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 242 insertions(+) create mode 100644 .gitignore create mode 100644 .travis.yml create mode 100644 main.js create mode 100644 package.json create mode 100644 readme.md create mode 100644 test/main.js diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b600741 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +coverage +node_modules +.DS_Store diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..56fb73a --- /dev/null +++ b/.travis.yml @@ -0,0 +1,9 @@ +language: node_js +node_js: + - '0.10' + - '0.11' + - '0.12' + - 'iojs' +notifications: + email: false +after_success: 'npm run coveralls' diff --git a/main.js b/main.js new file mode 100644 index 0000000..4b1a856 --- /dev/null +++ b/main.js @@ -0,0 +1,56 @@ +'use strict'; + +var Readable = require('readable-stream').Readable; + +var streamInterval = function(opts, fn, interval) { + if (typeof opts === 'function') { + interval = fn; + fn = opts; + } + + var outStream = new Readable(opts); + outStream.stop = function() { + this.stopped = true; + this.push(null); + }; + outStream._read = function() {}; + scheduleInterval(Date.now()); + + return outStream; + + function buildStream() { + var now = Date.now(); + var inStream = fn(); + + inStream.on('data', function listen(data) { + if (outStream.stopped) { + inStream.removeListener('data', listen); + return; + } + + outStream.push(data); + }); + + inStream.once('end', function onEnd() { + if (outStream.stopped) { + return; + } + + scheduleInterval(now); + }); + + } + + function scheduleInterval(then) { + if (outStream.stopWhenPossible) { + outStream.end(); + return; + } + + var now = Date.now(); + var scheduled = interval - (now - then); + setTimeout(buildStream, scheduled > 0 ? scheduled : 0); + } +}; + +module.exports = streamInterval; diff --git a/package.json b/package.json new file mode 100644 index 0000000..98c0272 --- /dev/null +++ b/package.json @@ -0,0 +1,33 @@ +{ + "name": "stream-interval", + "version": "0.0.1", + "description": "setInterval for for functions that return streams", + "homepage": "https://github.com/maximilianschmitt/stream-interval", + "bugs": "https://github.com/maximilianschmitt/stream-interval/issues", + "license": "MIT", + "author": { + "name": "Maximilian Schmitt", + "email": "maximilian.schmitt@googlemail.com", + "url": "http://maximilianschmitt.me" + }, + "repository": { + "type": "git", + "url": "https://github.com/maximilianschmitt/stream-interval.git" + }, + "main": "./main", + "scripts": { + "test": "istanbul cover _mocha", + "prepublish": "npm test", + "coveralls": "cat ./coverage/lcov.info | coveralls" + }, + "dependencies": { + "readable-stream": "^1.0.33" + }, + "devDependencies": { + "chai": "^2.3.0", + "coveralls": "^2.11.2", + "istanbul": "^0.3.14", + "mocha": "^2.2.5", + "through2": "^0.6.5" + } +} diff --git a/readme.md b/readme.md new file mode 100644 index 0000000..7a6a8fc --- /dev/null +++ b/readme.md @@ -0,0 +1,5 @@ +# streamInterval + +[![Travis Build](http://img.shields.io/travis/maximilianschmitt/stream-interval.svg?style=flat)](https://travis-ci.org/maximilianschmitt/stream-interval) [![Code Coverage](https://img.shields.io/coveralls/maximilianschmitt/stream-interval.svg)](https://coveralls.io/r/maximilianschmitt/stream-interval) [![npm](https://img.shields.io/npm/dm/stream-interval.svg)](https://www.npmjs.com/package/stream-interval) + +setInterval for functions that return streams. diff --git a/test/main.js b/test/main.js new file mode 100644 index 0000000..7446c38 --- /dev/null +++ b/test/main.js @@ -0,0 +1,136 @@ +'use strict'; + +var expect = require('chai').expect; +var Readable = require('stream').Readable; +var streamInterval = require('../main'); + +describe('streamInterval', function() { + it('exposes a function', function() { + expect(streamInterval).to.be.a('function'); + }); + + it('stops when told to stop', function(done) { + var i = 0; + var si = threeHellos(); + + si.on('data', function() { + i++; + }); + + si.on('end', function() { + expect(i).to.equal(3); + done(); + }); + }); + + it('pipes through everything', function(done) { + var all = ''; + var si = threeHellos(); + + si.on('data', function(data) { + all += data; + }); + + si.on('end', function() { + expect(all).to.equal('hellohellohello'); + done(); + }); + }); + + it('works with streams in object mode', function(done) { + var all = []; + var si = threeObjects(); + + si.on('data', function(data) { + all.push(data); + }); + + si.on('end', function() { + expect(all).to.deep.equal([{ hello: 'world' }, { hello: 'world' }, { hello: 'world' }]); + done(); + }); + }); + + it('calls the stream factory at most every x milliseconds', function(done) { + var all = ''; + var si = streamInterval(function() { + return new Readable({ + read: function() { + this.push('hello'); + this.push(null); + } + }); + }, 100); + + si.on('data', function(data) { + all += data; + }); + + setTimeout(function() { + expect(all).to.equal('hellohellohellohellohello'); + si.stop(); + done(); + }, 550); + }); + + it('calls the next function only if the current stream has ended', function(done) { + var all = ''; + var calls = 0; + var si = streamInterval(function() { + calls++; + + var s = new Readable({ + read: function(){} + }); + + setTimeout(function() { + s.push('hello'); + s.push(null); + }, 100); + + return s; + }); + + si.on('data', function(data) { + all += data; + }); + + setTimeout(function() { + expect(calls).to.equal(4); + expect(all).to.equal('hellohellohello'); + si.stop(); + done(); + }, 350); + }); +}); + +function threeObjects() { + var i = 0; + var si = streamInterval({ objectMode: true }, function() { + var s = new Readable({ + objectMode: true, + read: function() { + this.push({ hello: 'world' }); + this.push(null); + } + }); + if (++i > 3) si.stop(); + return s; + }); + return si; +} + +function threeHellos() { + var i = 0; + var si = streamInterval(function() { + var s = new Readable({ + read: function() { + this.push('hello'); + this.push(null); + } + }); + if (++i > 3) si.stop(); + return s; + }); + return si; +}