Skip to content
This repository has been archived by the owner on Apr 22, 2023. It is now read-only.

Commit

Permalink
Add parser to agent
Browse files Browse the repository at this point in the history
  • Loading branch information
ry committed Jan 21, 2011
1 parent 60aea96 commit e576d4e
Showing 1 changed file with 96 additions and 4 deletions.
100 changes: 96 additions & 4 deletions lib/http.js
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,13 @@ OutgoingMessage.prototype.assignSocket = function(socket) {
};


OutgoingMessage.prototype.detachSocket = function(socket) {
assert(socket._httpMessage == this);
socket._httpMessage = null;
this.socket = this.connection = null;
};


OutgoingMessage.prototype.destroy = function(error) {
this.socket.destroy(error);
};
Expand All @@ -343,7 +350,9 @@ OutgoingMessage.prototype._send = function(data, encoding) {


OutgoingMessage.prototype._writeRaw = function(data, encoding) {
if (this.connection._httpMessage === this && this.connection.writable) {
if (this.connection &&
this.connection._httpMessage === this &&
this.connection.writable) {
// There might be pending data in the this.output buffer.
while (this.output.length) {
if (!this.connection.writable) {
Expand Down Expand Up @@ -602,8 +611,6 @@ OutgoingMessage.prototype.end = function(data, encoding) {


OutgoingMessage.prototype._finish = function() {
this.socket._httpMessage = null;
this.socket = this.connection = null;
this.emit('finish');
};

Expand Down Expand Up @@ -868,6 +875,8 @@ function connectionListener(socket) {
// When we're finished writing the response, check if this is the last
// respose, if so destroy the socket.
res.on('finish', function() {
res.detachSocket(socket);

if (res._last) {
socket.destroySoon();
} else {
Expand Down Expand Up @@ -915,37 +924,117 @@ Agent.prototype.appendMessage = function(options) {
var req = new ClientRequest(options);
this.queue.push(req);

/*
req.on('finish', function () {
self._cycle();
});
*/

this._cycle();

return req;
};


Agent.prototype._establishNewConnection = function(socket, message) {
Agent.prototype._establishNewConnection = function() {
var self = this;
assert(this.sockets.length < this.maxSockets);

// Grab a new "socket". Depending on the implementation of _getConnection
// this could either be a raw TCP socket or a TLS stream.
var socket = this._getConnection(this.host, this.port, function () {
debug("Agent _getConnection callback");
self._cycle();
});

this.sockets.push(socket);

// Add a parser to the socket.
var parser = parsers.alloc();
parser.reinitialize('response');
parser.socket = socket;

socket.ondata = function(d, start, end) {
var ret = parser.execute(d, start, end - start);
if (ret instanceof Error) {
debug('parse error');
socket.destroy(ret);
} else if (parser.incoming && parser.incoming.upgrade) {
var bytesParsed = ret;
socket.ondata = null;
socket.onend = null;

var res = parser.incoming;

// This is start + byteParsed + 1 due to the error of getting \n
// in the upgradeHead from the closing lines of the headers
var upgradeHead = d.slice(start + bytesParsed + 1, end);

if (self.listeners('upgrade').length) {
self.emit('upgrade', res, res.socket, upgradeHead);
} else {
// Got upgrade header, but have no handler.
socket.destroy();
}
}
};

socket.onend = function() {
parser.finish();
socket.destroy();
};

// When the socket closes remove it from the list of available sockets.
socket.on('close', function() {
var i = self.sockets.indexOf(socket);
if (i >= 0) self.sockets.splice(i, 1);
// unref the parser for easy gc
parsers.free(parser);
});

parser.onIncoming = function(res, shouldKeepAlive) {
debug('AGENT incoming response!');

var req = socket._httpMessage;
assert(req);

// Responses to HEAD requests are AWFUL. Ask Ryan.
// A major oversight in HTTP. Hence this nastiness.
var isHeadResponse = req.method == 'HEAD';
debug('AGENT isHeadResponse ' + isHeadResponse);

if (res.statusCode == 100) {
// restart the parser, as this is a continue message.
req.emit('continue');
return true;
}

if (req.shouldKeepAlive && res.headers.connection === 'close') {
req.shouldKeepAlive = false;
}

res.addListener('end', function() {
debug('AGENT request complete disconnecting.');
// For the moment we reconnect for every request. FIXME!
// All that should be required for keep-alive is to not reconnect,
// but outgoingFlush instead.
if (!req.shouldKeepAlive) socket.end();

req.detachSocket(socket);
self._cycle();
});

req.emit('response', res);

return isHeadResponse;
};
};


// Sub-classes can overwrite this method with e.g. something that supplies
// TLS streams.
Agent.prototype._getConnection = function(host, port, cb) {
debug("Agent connected!");
var c = net.createConnection(port, host);
c.on('connect', cb);
return c;
Expand All @@ -956,6 +1045,8 @@ Agent.prototype._getConnection = function(host, port, cb) {
// waiting sockets. If a waiting socket cannot be found, it will
// start the process of establishing one.
Agent.prototype._cycle = function() {
debug("Agent _cycle");

var first = this.queue[0];
if (!first) return;

Expand All @@ -965,6 +1056,7 @@ Agent.prototype._cycle = function() {
// If the socket doesn't already have a message it's sending out
// and the socket is available for writing...
if (!socket._httpMessage && (socket.writable && socket.readable)) {
debug("Agent found socket, shift");
// We found an available connection!
this.queue.shift(); // remove first from queue.
first.assignSocket(socket);
Expand Down

0 comments on commit e576d4e

Please sign in to comment.