Skip to content

Commit

Permalink
Making request really smart about pipeing to itself so that we can do…
Browse files Browse the repository at this point in the history
… simple proxy cats
  • Loading branch information
mikeal committed May 18, 2011
1 parent 8bb7f98 commit 63125a3
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 61 deletions.
139 changes: 79 additions & 60 deletions main.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ var Request = function (options) {
}
if (!this.pool) this.pool = globalPool;
this.dests = [];
this.__isRequestRequest = true;
}
util.inherits(Request, stream.Stream);
Request.prototype.getAgent = function (host, port) {
Expand Down Expand Up @@ -204,67 +205,79 @@ Request.prototype.request = function () {
options.agent.maxSockets = options.pool.maxSockets;
}
}

options.start = function () {
options._started = true;
options.req = options.httpModule.request(options, function (response) {
options.response = response;
if (setHost) delete options.headers.host;

options.req = options.httpModule.request(options, function (response) {
options.response = response;
if (setHost) delete options.headers.host;

if (response.statusCode >= 300 &&
response.statusCode < 400 &&
options.followRedirect &&
options.method !== 'PUT' &&
options.method !== 'POST' &&
response.headers.location) {
if (options._redirectsFollowed >= options.maxRedirects) {
options.emit('error', new Error("Exceeded maxRedirects. Probably stuck in a redirect loop."));
}
options._redirectsFollowed += 1;
if (!isUrl.test(response.headers.location)) {
response.headers.location = url.resolve(options.uri.href, response.headers.location);
}
options.uri = response.headers.location;
delete options.req;
delete options.agent;
if (options.headers) {
delete options.headers.host;
}
request(options, options.callback);
return; // Ignore the rest of the response
} else {
options._redirectsFollowed = 0;
// Be a good stream and emit end when the response is finished.
// Hack to emit end on close becuase of a core bug that never fires end
response.on('close', function () {options.emit('end')})

if (options.encoding) {
if (options.dests.length !== 0) {
console.error("Ingoring encoding parameter as this stream is being piped to another stream which makes the encoding option invalid.");
} else {
response.setEncoding(options.encoding);
if (response.statusCode >= 300 &&
response.statusCode < 400 &&
options.followRedirect &&
options.method !== 'PUT' &&
options.method !== 'POST' &&
response.headers.location) {
if (options._redirectsFollowed >= options.maxRedirects) {
options.emit('error', new Error("Exceeded maxRedirects. Probably stuck in a redirect loop."));
}
}

response.on("data", function (chunk) {options.emit("data", chunk)});
response.on("end", function (chunk) {options.emit("end", chunk)});
response.on("close", function () {options.emit("close")});

if (options.onResponse) {
options.onResponse(null, response);
}
if (options.callback) {
var buffer = '';
options.on("data", function (chunk) {
buffer += chunk;
})
options.on("end", function () {
options.callback(null, response, buffer);
options._redirectsFollowed += 1;
if (!isUrl.test(response.headers.location)) {
response.headers.location = url.resolve(options.uri.href, response.headers.location);
}
options.uri = response.headers.location;
delete options.req;
delete options.agent;
if (options.headers) {
delete options.headers.host;
}
request(options, options.callback);
return; // Ignore the rest of the response
} else {
options._redirectsFollowed = 0;
// Be a good stream and emit end when the response is finished.
// Hack to emit end on close becuase of a core bug that never fires end
response.on('close', function () {options.emit('end')})

if (options.encoding) {
if (options.dests.length !== 0) {
console.error("Ingoring encoding parameter as this stream is being piped to another stream which makes the encoding option invalid.");
} else {
response.setEncoding(options.encoding);
}
}

options.dests.forEach(function (dest) {
if (dest.headers) {
dest.headers['content-type'] = response.headers['content-type'];
if (response.headers['content-length']) {
dest.headers['content-length'] = response.headers['content-length'];
}
}
})
;

response.on("data", function (chunk) {options.emit("data", chunk)});
response.on("end", function (chunk) {options.emit("end", chunk)});
response.on("close", function () {options.emit("close")});

if (options.onResponse) {
options.onResponse(null, response);
}
if (options.callback) {
var buffer = '';
options.on("data", function (chunk) {
buffer += chunk;
})
options.on("end", function () {
options.callback(null, response, buffer);
})
;
}
}
}
})

options.req.on('error', clientErrorHandler);
})

options.req.on('error', clientErrorHandler);
}

options.once('pipe', function (src) {
if (options.ntick) throw new Error("You cannot pipe to this stream after the first nextTick() after creation of the request stream.")
Expand All @@ -276,13 +289,13 @@ Request.prototype.request = function () {

process.nextTick(function () {
if (options.body) {
options.req.write(options.body);
options.req.end();
options.write(options.body);
options.end();
} else if (options.requestBodyStream) {
console.warn("options.requestBodyStream is deprecated, please pass the request object to stream.pipe.")
options.requestBodyStream.pipe(options);
} else if (!options.src) {
options.req.end();
options.end();
}
options.ntick = true;
})
Expand All @@ -293,10 +306,12 @@ Request.prototype.pipe = function (dest) {
stream.Stream.prototype.pipe.call(this, dest)
}
Request.prototype.write = function () {
if (!this._started) this.start();
if (!this.req) throw new Error("This request has been piped before http.request() was called.");
this.req.write.apply(this.req, arguments);
}
Request.prototype.end = function () {
if (!this._started) this.start();
if (!this.req) throw new Error("This request has been piped before http.request() was called.");
this.req.end.apply(this.req, arguments);
}
Expand Down Expand Up @@ -340,21 +355,25 @@ request.defaults = function (options) {

request.get = request;
request.post = function (options, callback) {
if (typeof options === 'string') options = {uri:options};
options.method = 'POST';
return request(options, callback);
};
request.put = function (options, callback) {
if (typeof options === 'string') options = {uri:options};
options.method = 'PUT';
return request(options, callback);
};
request.head = function (options, callback) {
if (typeof options === 'string') options = {uri:options};
options.method = 'HEAD';
if (options.body || options.requestBodyStream || options.json || options.multipart) {
throw new Error("HTTP HEAD requests MUST NOT include a request body.");
}
return request(options, callback);
};
request.del = function (options, callback) {
if (typeof options === 'string') options = {uri:options};
options.method = 'DELETE';
return request(options, callback);
}
25 changes: 24 additions & 1 deletion tests/test-pipes.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ var s = server.createServer(3453);
passes = 0;

function check () {
if (passes === 2) {
if (passes === 3) {
console.log('All tests passed.')
process.exit();
}
Expand Down Expand Up @@ -51,3 +51,26 @@ mypulldata.end = function () {
check();
};


s.on('/cat', function (req, resp) {
if (req.method === "GET") {
resp.writeHead(200, {'content-type':'text/plain', 'content-length':4});
resp.write('asdf');
resp.end()
} else if (req.method === "PUT") {
assert.ok(req.headers['content-type'] === 'text/plain');
assert.ok(req.headers['content-length'] == 4)
var validate = '';
req.on('data', function (chunk) {validate += chunk})
req.on('end', function () {
resp.writeHead(201);
resp.end();
assert.ok(validate === 'asdf');
passes += 1;
check();
})
}
})

request.get('http://localhost:3453/cat').pipe(request.put('http://localhost:3453/cat'))

2 comments on commit 63125a3

@heapwolf
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice! =)

@EvanCarroll
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if you would look at the patch I supplied and critique it or accept it.

Please sign in to comment.