Permalink
Browse files

separated request piping

  • Loading branch information...
1 parent 7082bb4 commit a83d8c6c98cf0e32beab07d7948a279efef45019 @pgte pgte committed Jan 28, 2012
Showing with 166 additions and 155 deletions.
  1. +2 −1 .npmignore
  2. +31 −154 lib/intercept.js
  3. +133 −0 lib/piper.js
View
@@ -1 +1,2 @@
-node_modules
+node_modules
+.gitignore
View
@@ -1,8 +1,9 @@
-var path = require('path')
- , http = require('http')
- , https = require('https')
- , url = require('url')
- , EventEmitter = require('events').EventEmitter;
+var Piper = require('./piper')
+ path = require('path'),
+ http = require('http'),
+ https = require('https'),
+ url = require('url')
+ EventEmitter = require('events').EventEmitter;
var allInterceptors = {};
@@ -15,12 +16,12 @@ function addGlobalInterceptor(key, interceptor) {
function remove(interceptor) {
- var key = interceptor._key.split(' ');
- var u = url.parse(key[1]);
- var hostKey = u.protocol + '//' + u.host;
- var interceptors = allInterceptors[hostKey];
- var interceptor;
- var thisInterceptor;
+ var key = interceptor._key.split(' '),
+ u = url.parse(key[1]),
+ hostKey = u.protocol + '//' + u.host,
+ interceptors = allInterceptors[hostKey],
+ interceptor,
+ thisInterceptor;
for(var i = 0; i < interceptors.length; i++) {
thisInterceptor = interceptors[i];
@@ -33,9 +34,10 @@ function remove(interceptor) {
}
function stringifyRequest(options) {
- var method = options.method || 'GET';
- var path = options.path;
- var body = options.body;
+ var method = options.method || 'GET',
+ path = options.path,
+ body = options.body;
+
if (body && typeof(body) !== 'string') {
body = body.toString();
}
@@ -60,19 +62,23 @@ function setHeader(request, name, value) {
}
function processRequest(interceptors, options, callback) {
- var req = new EventEmitter()
- , response = new EventEmitter()
- , requestBodyBuffers = []
- , aborted
- , end
- , ended;
+ var req = new EventEmitter(),
+ response = new EventEmitter(),
+ requestBodyBuffers = [],
+ aborted,
+ end,
+ ended,
+ headers,
+ keys,
+ key,
+ i;
if (options.headers) {
- var headers = options.headers;
- var keys = Object.keys(headers);
+ headers = options.headers;
+ keys = Object.keys(headers);
- for (var i = 0, l = keys.length; i < l; i++) {
- var key = keys[i];
+ for (i = 0, l = keys.length; i < l; i++) {
+ key = keys[i];
setHeader(req, key, headers[key]);
};
@@ -173,136 +179,7 @@ function processRequest(interceptors, options, callback) {
callnext();
};
- response.pipe = function(dest, options) {
- var source = this;
-
- function ondata(chunk) {
- if (dest.writable) {
- if (false === dest.write(chunk)) source.pause();
- }
- }
-
- function ondrain() {
- if (source.readable) source.resume();
- }
-
- source.on('data', ondata);
- dest.on('drain', ondrain);
-
- // If the 'end' option is not supplied, dest.end() will be called when
- // source gets the 'end' or 'close' events. Only dest.end() once, and
- // only when all sources have ended.
- if (!options || options.end !== false) {
- dest._pipeCount = dest._pipeCount || 0;
- dest._pipeCount++;
-
- source.on('end', onend);
- source.on('close', onclose);
- }
-
- var didOnEnd = false;
- function onend() {
- if (didOnEnd) return;
- didOnEnd = true;
-
- dest._pipeCount--;
-
- // remove the listeners
- cleanup();
-
- if (dest._pipeCount > 0) {
- // waiting for other incoming streams to end.
- return;
- }
-
- dest.end();
- }
-
-
- function onclose() {
- if (didOnEnd) return;
- didOnEnd = true;
-
- dest._pipeCount--;
-
- // remove the listeners
- cleanup();
-
- if (dest._pipeCount > 0) {
- // waiting for other incoming streams to end.
- return;
- }
-
- dest.destroy();
- }
-
- // don't leave dangling pipes when there are errors.
- function onerror(er) {
- cleanup();
- if (this.listeners('error').length === 0) {
- throw er; // Unhandled stream error in pipe.
- }
- }
-
- source.on('error', onerror);
- dest.on('error', onerror);
-
- // guarantee that source streams can be paused and resumed, even
- // if the only effect is to proxy the event back up the pipe chain.
- if (!source.pause) {
- source.pause = function() {
- source.emit('pause');
- };
- }
-
- if (!source.resume) {
- source.resume = function() {
- source.emit('resume');
- };
- }
-
- function onpause() {
- source.pause();
- }
-
- dest.on('pause', onpause);
-
- function onresume() {
- if (source.readable) source.resume();
- }
-
- dest.on('resume', onresume);
-
- // remove all the event listeners that were added.
- function cleanup() {
- source.removeListener('data', ondata);
- dest.removeListener('drain', ondrain);
-
- source.removeListener('end', onend);
- source.removeListener('close', onclose);
-
- dest.removeListener('pause', onpause);
- dest.removeListener('resume', onresume);
-
- source.removeListener('error', onerror);
- dest.removeListener('error', onerror);
-
- source.removeListener('end', cleanup);
- source.removeListener('close', cleanup);
-
- dest.removeListener('end', cleanup);
- dest.removeListener('close', cleanup);
- }
-
- source.on('end', cleanup);
- source.on('close', cleanup);
-
- dest.on('end', cleanup);
- dest.on('close', cleanup);
-
- dest.emit('pipe', source);
- };
-
+ response.pipe = Piper();
next.push(function() {
if (encoding) {
responseBody = responseBody.toString(encoding);
View
@@ -0,0 +1,133 @@
+function pipe(dest, options) {
+ var source = this;
+
+ function ondata(chunk) {
+ if (dest.writable) {
+ if (false === dest.write(chunk)) source.pause();
+ }
+ }
+
+ function ondrain() {
+ if (source.readable) source.resume();
+ }
+
+ source.on('data', ondata);
+ dest.on('drain', ondrain);
+
+ // If the 'end' option is not supplied, dest.end() will be called when
+ // source gets the 'end' or 'close' events. Only dest.end() once, and
+ // only when all sources have ended.
+ if (!options || options.end !== false) {
+ dest._pipeCount = dest._pipeCount || 0;
+ dest._pipeCount++;
+
+ source.on('end', onend);
+ source.on('close', onclose);
+ }
+
+ var didOnEnd = false;
+ function onend() {
+ if (didOnEnd) return;
+ didOnEnd = true;
+
+ dest._pipeCount--;
+
+ // remove the listeners
+ cleanup();
+
+ if (dest._pipeCount > 0) {
+ // waiting for other incoming streams to end.
+ return;
+ }
+
+ dest.end();
+ }
+
+
+ function onclose() {
+ if (didOnEnd) return;
+ didOnEnd = true;
+
+ dest._pipeCount--;
+
+ // remove the listeners
+ cleanup();
+
+ if (dest._pipeCount > 0) {
+ // waiting for other incoming streams to end.
+ return;
+ }
+
+ dest.destroy();
+ }
+
+ // don't leave dangling pipes when there are errors.
+ function onerror(er) {
+ cleanup();
+ if (this.listeners('error').length === 0) {
+ throw er; // Unhandled stream error in pipe.
+ }
+ }
+
+ source.on('error', onerror);
+ dest.on('error', onerror);
+
+ // guarantee that source streams can be paused and resumed, even
+ // if the only effect is to proxy the event back up the pipe chain.
+ if (!source.pause) {
+ source.pause = function() {
+ source.emit('pause');
+ };
+ }
+
+ if (!source.resume) {
+ source.resume = function() {
+ source.emit('resume');
+ };
+ }
+
+ function onpause() {
+ source.pause();
+ }
+
+ dest.on('pause', onpause);
+
+ function onresume() {
+ if (source.readable) source.resume();
+ }
+
+ dest.on('resume', onresume);
+
+ // remove all the event listeners that were added.
+ function cleanup() {
+ source.removeListener('data', ondata);
+ dest.removeListener('drain', ondrain);
+
+ source.removeListener('end', onend);
+ source.removeListener('close', onclose);
+
+ dest.removeListener('pause', onpause);
+ dest.removeListener('resume', onresume);
+
+ source.removeListener('error', onerror);
+ dest.removeListener('error', onerror);
+
+ source.removeListener('end', cleanup);
+ source.removeListener('close', cleanup);
+
+ dest.removeListener('end', cleanup);
+ dest.removeListener('close', cleanup);
+ }
+
+ source.on('end', cleanup);
+ source.on('close', cleanup);
+
+ dest.on('end', cleanup);
+ dest.on('close', cleanup);
+
+ dest.emit('pipe', source);
+};
+
+module.exports = function Pipe() {
+ return pipe;
+};

0 comments on commit a83d8c6

Please sign in to comment.