Skip to content

Commit

Permalink
Initial support for 0.3.6+.\n\nExperimental support for Request objec…
Browse files Browse the repository at this point in the history
…ts as streams. It's untested and requires a pending patch to node.js
  • Loading branch information
mikeal committed Feb 3, 2011
1 parent 7cf01f0 commit cb403a4
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 36 deletions.
114 changes: 79 additions & 35 deletions main.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@

var http = require('http')
, url = require('url')
, sys = require('sys')
, util = require('util')
, stream = require('stream')
, qs = require('querystring')
;

Expand All @@ -24,8 +25,22 @@ var toBase64 = function(str) {

var isUrl = /^https?:/;

function request (options, callback) {
if (!options.callback) options.callback = callback;
var Request = function (options) {
stream.Stream.call(this);
this.readable = true;
this.writable = true;

for (i in options) {
this[i] = options[i];
}
}
util.inherits(Request, stream.Stream);

Request.prototype.pipe = function (dest) {
this.dest = dest;
}
Request.prototype.request = function () {
var options = this;
if (options.url) {
// People use this property instead all the time so why not just support it.
options.uri = options.url;
Expand Down Expand Up @@ -65,29 +80,31 @@ function request (options, callback) {
else if (options.uri.protocol == 'https:') {options.uri.port = 443;}
}

if (options.bodyStream) {
sys.error('options.bodyStream is deprecated. use options.reponseBodyStream instead.');
options.responseBodyStream = options.bodyStream;
if (options.bodyStream || options.responseBodyStream) {
console.error('options.bodyStream and options.responseBodyStream is deprecated. You should now send the request object to stream.pipe()');
this.pipe(options.responseBodyStream || options.bodyStream)
}
if (!options.client) {
if (options.proxy) {
options.client = http.createClient(options.proxy.port, options.proxy.hostname, options.proxy.protocol === 'https:');
} else {
options.client = http.createClient(options.uri.port, options.uri.hostname, options.uri.protocol === 'https:');
}

if (options.proxy) {
options.port = options.proxy.port;
options.host = options.proxy.hostname;
// options.client = http.createClient(options.proxy.port, options.proxy.hostname, options.proxy.protocol === 'https:');
} else {
options.port = options.uri.port;
options.host = options.uri.hostname;
// options.client = http.createClient(options.uri.port, options.uri.hostname, options.uri.protocol === 'https:');
}

if (options.onResponse === true) {
options.onResponse = options.callback;
delete options.callback;
}

var clientErrorHandler = function (error) {
if (setHost) delete options.headers.host;
if (options.onResponse) options.onResponse(error);
if (options.callback) options.callback(error);
};
options.client.addListener('error', clientErrorHandler);

if (options.uri.auth && !options.headers.authorization) {
options.headers.authorization = "Basic " + toBase64(options.uri.auth.split(':').map(qs.unescape).join(':'));
Expand All @@ -96,10 +113,10 @@ function request (options, callback) {
options.headers['proxy-authorization'] = "Basic " + toBase64(options.proxy.auth.split(':').map(qs.unescape).join(':'));
}

options.fullpath = options.uri.href.replace(options.uri.protocol + '//' + options.uri.host, '');
if (options.fullpath.length === 0) options.fullpath = '/';
options.path = options.uri.href.replace(options.uri.protocol + '//' + options.uri.host, '');
if (options.path.length === 0) options.path = '/';

if (options.proxy) options.fullpath = (options.uri.protocol + '//' + options.uri.host + options.fullpath);
if (options.proxy) options.path = (options.uri.protocol + '//' + options.uri.host + options.path);

if (options.json) {
options.headers['content-type'] = 'application/json';
Expand Down Expand Up @@ -133,26 +150,27 @@ function request (options, callback) {
}
}

options.request = options.client.request(options.method, options.fullpath, options.headers);
options.request.addListener("response", function (response) {
options.req = http.request(options, function (response) {
if (setHost) delete options.headers.host;
response.on("end", function () {
options.client.removeListener("error", clientErrorHandler);
options.req.removeListener("error", clientErrorHandler);
});

if (response.statusCode >= 300 &&
response.statusCode < 400 &&
options.followRedirect &&
options.method !== 'PUT' &&
options.method !== 'POST' &&
response.headers.location) {
if (options._redirectsFollowed >= options.maxRedirects) {
client.emit('error', new Error("Exceeded maxRedirects. Probably stuck in a redirect loop."));
options.emit('error', new Error("Exceeded maxRedirects. Probably stuck in a redirect loop."));
}
options._redirectsFollowed += 1;
if (!isUrl.test(response.headers.location)) {
response.headers.location = url.resolve(options.uri.href, response.headers.location);
}
options.uri = response.headers.location;
delete options.client;
delete options.req;
if (options.headers) {
delete options.headers.host;
}
Expand All @@ -161,8 +179,8 @@ function request (options, callback) {
} else {
options._redirectsFollowed = 0;
if (options.encoding) response.setEncoding(options.encoding);
if (options.responseBodyStream) {
sys.pump(response, options.responseBodyStream);
if (options.dest) {
response.pipe(dest);
if (options.onResponse) options.onResponse(null, response);
if (options.callback) options.callback(null, response, options.responseBodyStream);
} else {
Expand All @@ -178,16 +196,42 @@ function request (options, callback) {
}
}
}
});
})

options.req.on('error', clientErrorHandler);

this.once('pipe', function (src) {
options.src = src;
})

process.nextTick(function () {
if (options.body) {
options.req.write(options.body);
options.req.end();
} else if (options.requestBodyStream) {
console.warn("options.requestBodyStream is deprecated, please pass the request object to stream.pipe.")
options.requestBodyStream.pipe(options);
} else if (!options.src) {
options.req.end();
}
})
}

if (options.body) {
options.request.write(options.body);
options.request.end();
} else if (options.requestBodyStream) {
sys.pump(options.requestBodyStream, options.request);
} else {
options.request.end();
}
Request.prototype.write = function (chunk) {
if (!this.req) throw new Error("This request has been piped before http.request() was called.");
this.req.write(chunk);
}
Request.prototype.end = function () {
this.req.end();
}



function request (options, callback) {
if (callback) options.callback = callback;
var r = new Request(options);
r.request();
return r;
}

module.exports = request;
Expand All @@ -196,14 +240,14 @@ request.get = request;
request.post = function (options, callback) {
options.method = 'POST';
if (!options.body && !options.requestBodyStream && !options.json && !options.multipart) {
sys.error("HTTP POST requests need a body or requestBodyStream");
console.error("HTTP POST requests need a body or requestBodyStream");
}
request(options, callback);
};
request.put = function (options, callback) {
options.method = 'PUT';
if (!options.body && !options.requestBodyStream && !options.json && !options.multipart) {
sys.error("HTTP PUT requests need a body or requestBodyStream");
console.error("HTTP PUT requests need a body or requestBodyStream");
}
request(options, callback);
};
Expand Down
3 changes: 2 additions & 1 deletion tests/test-body.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
var server = require('./server')
, events = require('events')
, stream = require('stream')
, assert = require('assert')
, request = require('../main.js')
;

var s = server.createServer();

var createPostStream = function (text) {
var postStream = new events.EventEmitter();
var postStream = new stream.Stream();
postStream.writeable = true;
postStream.readable = true;
setTimeout(function () {postStream.emit('data', new Buffer(text)); postStream.emit('end')}, 0);
Expand Down

0 comments on commit cb403a4

Please sign in to comment.