Permalink
Browse files

Filter API.

  • Loading branch information...
1 parent fd5a2d1 commit 89892ab542cf13054e29e34737110baba5ad190f @mikeal committed Aug 13, 2010
Showing with 56 additions and 34 deletions.
  1. +42 −34 streams/lib/main.js
  2. +14 −0 streams/package.json
View
@@ -1,44 +1,51 @@
var sys = require('sys')
, events = require('events');
-function createPump (readable, writable) {
- var pump = new events.EventEmitter();
- readable.addListener("data", function (chunk) {
- pump.chunk = chunk;
- pump.emit("data", chunk);
- if (writable.write(pump.chunk) === false) {
- pump.emit("pause");
+/*
+
+Experimental
+
+Simplified API for creating a Stream compatible object that can mutate data coming through the stream.
+
+ // Write 4 chunks at a time.
+ var b = [];
+ var filter = sys.createFilter(function (chunk, write) {
+ if (!chunk || b.length === 4) {
+ b.forEach( function (c) {write(c)} );
+ } else {
+ b.push(chunk);
}
- pump.chunk = null;
- });
- readable.addListener("drain", function () {
- pump.emit("drain");
- pump.emit("resume");
});
- readable.addListener("end", function () {
- pump.emit("end");
- });
- writable.addListener("close", function () {
- pump.emit("close");
- });
-
- pump.pauseListener = function () {readable.pause()};
- pump.addListener("pause", pump.pauseListener);
- pump.resumeListener = function () {readable.resume()};
- pump.addListener("resume", pump.resumeListener);
- pump.endListener = function () {writable.end()};
- pump.addListener("end", pump.endListener);
-
- pump.removeDefaults = function ( ) {
- pump.removeListener("pause", pump.pauseListener);
- pump.removeListener("resume", pump.resumeListener);
- pump.removeListener("end", pump.endListener);
+ sys.pump(readable, filter);
+ sys.pump(filter, writable);
+*/
+
+function Filter(handler) {
+ var self = this;
+ var write = function(chunk) {
+ self.emit('data', chunk)
@isaacs

isaacs Aug 13, 2010

shouldn't that be something like this?

self.emit( chunk == null ? 'end' : 'data', chunk)

ie, you can just call write() one last time to tie off the end.

}
-
- return pump;
+
+ self.write = function(chunk) {
+ handler(chunk, write);
+ }
+ self.addListener("end", function() {
+ handler(null, write)
+ })
+};
+Filter.prototype.pause = function() {
+ this.emit("pause")
+}
+Filter.prototype.resume = function() {
+ this.emit("resume")
}
+sys.inherits(Filter, events.EventEmitter)
+
+function createFilter (handler) {
+ return new Filter(handler);
+};
-function createMultiPump(readables, writables) {
+function createMultiPump (readables, writables) {
var mpump = new events.EventEmitter();
for (var i;i<readables.length;i+=1) {
@@ -98,5 +105,6 @@ function createMultiPump(readables, writables) {
return mpump;
}
-exports.createPump = createPump;
+exports.Filter = Filter;
+exports.createFilter = createFilter;
exports.createMultiPump = createMultiPump;
View
@@ -0,0 +1,14 @@
+{ "name" : "streams"
+, "description" : "Stream utilities."
+, "tags" : ["streams", "simple", "util", "utility"]
+, "version" : "0.3.0"
+, "author" : "Mikeal Rogers <mikeal.rogers@gmail.com>"
+, "repository" :
+ { "type" : "git"
+ , "url" : "http://github.com/mikeal/node-utils.git"
+ }
+, "bugs" :
+ { "web" : "http://github.com/mikeal/node-utils/issues" }
+, "engines" : ["node >=0.1.90"]
+, "main" : "./lib/main"
+}

0 comments on commit 89892ab

Please sign in to comment.