Permalink
Browse files

Fixed an issue for WebSocket clients where writing the client on the …

…connection event would send the written data before the WebSocket handshake reply was sent. Also fixed an issue with long polling clients where queued data was never being sent to the client.
  • Loading branch information...
mscdex committed Jul 21, 2010
1 parent 7f7fd03 commit 046c7a36c299b96f51f9b1b5d3bbc462eede0fe6
Showing with 19 additions and 12 deletions.
  1. +0 −1 lib/grappler.js
  2. +19 −11 lib/http.client.js
View
@@ -238,7 +238,6 @@ Server.prototype.listen = function(port, host) {
};
Server.prototype.broadcast = function(data, except) {
- var except = (typeof except != 'undefined' ? except : null);
for (var i=0,keys=Object.keys(this.connections),len=keys.length; i<len; ++i)
if (!except || (keys[i] != except && this.connections[keys[i]] != except))
this.connections[keys[i]].write(data);
View
@@ -103,12 +103,14 @@ var HttpClient = function(req, res) {
outgoingdata = outgoingdata.concat(['', '']).join('\r\n') + hash.digest('binary');
}
self._makePerm();
- server.emit('connection', this);
req.connection.setNoDelay(true);
req.connection.setKeepAlive(true, server.options.pingInterval);
req.connection.write(outgoingdata, (draft == 75 ? 'ascii' : 'binary'));
+ if (req.connection.readyState == 'open')
+ server.emit('connection', this);
+
req.connection.addListener('data', function(data) {
var beginMarker = 0, endMarker = 255, curIdx, tmp;
if (!inBuffer || inBuffer.length == 0) {
@@ -201,6 +203,11 @@ var HttpClient = function(req, res) {
// Initial connection
server.connections[cookie] = this;
server.emit('connection', this);
+ server.connections[cookie]._checkQueue = function() {
+ // Send any pending data sent to this long poll client
+ if (server.connections[cookie]._queue && server.connections[cookie]._queue.length)
+ server.connections[cookie].write();
+ };
} else {
// The original HttpClient just needs to reuse subsequent connections'
// ServerRequest and ServerResponse objects so that we can still write
@@ -227,10 +234,6 @@ var HttpClient = function(req, res) {
});
server.options.logger('HttpClient :: Client prefers id of ' + cookie + ' instead of ' + self.id + (isSubsequent ? ' (subsequent)' : ''), grappler.LOG.INFO);
server.options.logger('HttpClient :: Using long polling for client id ' + cookie, grappler.LOG.INFO);
-
- // Send any pending data sent to this long poll client
- if (server.connections[cookie]._queue && server.connections[cookie]._queue.length)
- server.connections[cookie].write.apply(server.connections[cookie], server.connections[cookie]._queue.shift());
}
}
break;
@@ -292,7 +295,8 @@ HttpClient.prototype._makePerm = function() {
HttpClient.prototype.write = function(data, encoding) {
var isMultipart = (this._request.headers.accept && this._request.headers.accept.indexOf('multipart/x-mixed-replace') > -1),
isSSEDOM = (this._request.headers.accept && this._request.headers.accept.indexOf('application/x-dom-event-stream') > -1),
- isSSE = ((this._request.headers.accept && this._request.headers.accept.indexOf('text/event-stream') > -1) || isSSEDOM);
+ isSSE = ((this._request.headers.accept && this._request.headers.accept.indexOf('text/event-stream') > -1) || isSSEDOM),
+ self = this;
try {
if (this.state & grappler.STATE.PROTO_WEBSOCKET) { // WebSocket
@@ -315,29 +319,33 @@ HttpClient.prototype.write = function(data, encoding) {
if (!this._queue)
this._queue = [];
- // Always append to the initial connection's write queue unless we are writing previously queued data.
+ // Always append to the initial connection's write queue.
// Queueing every piece of data provides consistency and correct ordering of incoming writes, no matter
// if the long poll client is in the process of reconnecting or not.
//
// TODO: Have a callback for write to know if the message was successfully sent?
// For example, a bunch of writes could be queued up for a long poll client, but they never end up
// reconnecting -- thus losing all of those messages. However, it is currently assumed they were
// in fact sent successfully. We should let the sender know they were not received by the recipient.
- if (!Array.isArray(data))
- this._queue.push([data, encoding]);
- else {
+ if (arguments.length > 0)
+ this._queue.push([data, encoding]);
+
+ if (this._request.connection.readyState == 'open') {
+ data = this._queue.shift();
encoding = data[1];
data = data[0];
this._response.writeHead(200, {
'Content-Type': (data instanceof Buffer && (!encoding || encoding == 'binary') ? "application/octet-stream" : "text/plain"),
'Content-Length': data.length,
- 'Connection': 'close',
+ 'Connection': 'keep-alive',
'Expires': 'Fri, 01 Jan 1990 00:00:00 GMT',
'Cache-Control': 'no-cache, no-store, max-age=0, must-revalidate',
'Pragma': 'no-cache'
});
this._response.end(data, encoding);
}
+ process.nextTick(function() { self._checkQueue(); });
+
}
} catch(e) {} // silently trap "stream is not writable" errors for now
};

0 comments on commit 046c7a3

Please sign in to comment.