Skip to content
This repository has been archived by the owner on Mar 30, 2022. It is now read-only.

Commit

Permalink
Support close frames.
Browse files Browse the repository at this point in the history
- Send close frames on close() or on receipt of a close frame from the
  server.
- Add a timeout option to close() to handle servers that don't properly
  support client-initiated close handshaking.
- Re-jigger unit tests to avoid client-initiated closes, as the WS
  server that we're using doesn't support this.
  • Loading branch information
pgriess committed Dec 5, 2010
1 parent edb14f1 commit 57bf980
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 54 deletions.
135 changes: 86 additions & 49 deletions lib/websocket.js
Expand Up @@ -177,9 +177,12 @@ var WebSocket = function(url, proto, opts) {
// Retain a reference to our object
var self = this;

// State of the connection
// State of our end of the connection
var readyState = CONNECTING;

// Whether or not the server has sent a close handshake
var serverClosed = false;

// Our underlying net.Stream instance
var stream = undefined;

Expand Down Expand Up @@ -210,17 +213,16 @@ var WebSocket = function(url, proto, opts) {
// FRAME_NO
function(buf, off) {
if (buf[off] & 0x80) {
throw new Error('High-byte frames not yet supported');
frameType = FRAME_HI;
} else {
frameType = FRAME_LO;
}

frameType = FRAME_LO;
return 1;
},

// FRAME_LO
function(buf, off) {
assert.ok(bufs.length > 0 || bufsBytes == 0);

debug('frame_lo(' + sys.inspect(buf) + ', ' + off + ')');

// Find the first instance of 0xff, our terminating byte
Expand Down Expand Up @@ -281,16 +283,20 @@ var WebSocket = function(url, proto, opts) {

// FRAME_HI
function(buf, off) {
debug('High-byte framing not yet supported');
debug('frame_hi(' + sys.inspect(buf) + ', ' + off + ')');

frameType = FRAME_NO;
return buf.length - off;
if (buf[off] !== 0) {
throw new Error('High-byte framing not supported.');
}

serverClosed = true;
return 1;
}
];

// Handle data coming from our socket
var dataListener = function(buf) {
if (buf.length <= 0) {
if (buf.length <= 0 || serverClosed) {
return;
}

Expand All @@ -304,10 +310,17 @@ var WebSocket = function(url, proto, opts) {
throw new Error('Unexpected frame type: ' + frameType);
}

assert.equal(bufs.length === 0, bufsBytes === 0);
assert.ok(off < buf.length);

consumed = frameFuncs[frameType](buf, off);
off += consumed;
} while (consumed > 0 && off < buf.length);
} while (!serverClosed && consumed > 0 && off < buf.length);

if (serverClosed) {
serverCloseHandler();
}

if (consumed == 0) {
bufs.push(buf.slice(off, buf.length));
bufsBytes += buf.length - off;
Expand All @@ -330,53 +343,74 @@ var WebSocket = function(url, proto, opts) {
});
};

// External API
self.close = function() {
var f = function() {
readyState = CLOSED;

if (stream) {
stream.end();
stream.destroy();
stream = undefined;
// Finish the closing process; destroy the socket and tell the application
// that we've closed.
var finishClose = function() {
readyState = CLOSED;

if (stream) {
stream.end();
stream.destroy();
stream = undefined;
}

process.nextTick(function() {
self.emit('close');
if (self.onclose) {
self.onclose();
}
});
};

process.nextTick(function() {
self.emit('close');
// Send a close frame to the server
var sendClose = function() {
assert.equal(OPEN, readyState);

if (self.onclose) {
self.onclose();
}
});
};
readyState = CLOSING;
stream.write('\xff\x00', 'binary');
};

switch (readyState) {
case CLOSED:
case CLOSING:
break;
// Handle a close packet sent from the server
var serverCloseHandler = function() {
assert.ok(serverClosed);
assert.ok(readyState === OPEN || readyState === CLOSING);

case CONNECTING:
f();
break;
bufs = [];
bufsBytes = 0;

default:
readyState = CLOSING;

// Run f() on the next tick so that we conform a little closer to
// the spirit of the API in that the caller never sees us
// transition directly to CLOSED. Instead, we just seem to have an
// infinitely fast closing handshake.
if (stream.write('', 'binary')) {
process.nextTick(f);
} else {
stream.on('drain', f);
// Handle state transitions asynchronously so that we don't change
// readyState before the application has had a chance to process data
// events which are already in the delivery pipeline. For example, a
// 'data' event could be delivered with a readyState of CLOSING if we
// received both frames in the same packet.
process.nextTick(function() {
if (readyState === OPEN) {
sendClose();
}

finishClose();
});
};

// External API
self.close = function(timeout) {
if (readyState === CONNECTING) {
// If we're still in the process of connecting, the server is not
// in a position to understand our close frame. Just nuke the
// connection and call it a day.
finishClose();
} else if (readyState === OPEN) {
sendClose();

if (timeout) {
setTimeout(finishClose, timeout * 1000);
}
}
};

self.send = function(str, fd) {
if (readyState != OPEN) {
throw new Error('Cannot write to non-open WebSocket client');
return;
}

stream.write('\x00', 'binary');
Expand Down Expand Up @@ -502,7 +536,7 @@ var WebSocket = function(url, proto, opts) {
self.onerror();
}

self.close();
finishClose();
});
}

Expand All @@ -517,7 +551,8 @@ var WebSocket = function(url, proto, opts) {
stream.removeAllListeners('data');
stream.on('data', dataListener);

// Fire the 'open' event
readyState = OPEN;

process.nextTick(function() {
self.emit('open');

Expand All @@ -526,15 +561,17 @@ var WebSocket = function(url, proto, opts) {
}
});

readyState = OPEN;

// Consume any leftover data
if (data.length > 16) {
stream.emit('data', data.slice(16, data.length));
}
}
});
stream.on('fd', fdListener);
stream.on('error', errorListener);
stream.on('close', function() {
errorListener(new Error('Stream closed unexpectedly.'));
});

stream.emit('data', head);
};
Expand Down
12 changes: 9 additions & 3 deletions test/test-basic.js
Expand Up @@ -15,23 +15,26 @@ var clientGotData = false;
var clientGotMessage = false;
var serverGotMessage = false;
var serverGotClose = false;
var clientGotClose = false;

var wss = new WebSocketServer();
wss.listen(PORT, 'localhost');
wss.on('connection', function(c) {
serverGotConnection = true;

c.write(S_MSG);

c.on('message', function(m) {
assert.equal(m, C_MSG);
serverGotMessage = true;

c.close();
});

c.on('close', function() {
serverGotClose = true;
wss.close();
});

c.write(S_MSG);
});

var ws = new WebSocket('ws://localhost:' + PORT + '/', 'biff');
Expand All @@ -45,12 +48,14 @@ ws.on('data', function(buf) {
clientGotData = true;

ws.send(C_MSG);
ws.close();
});
ws.onmessage = function(m) {
assert.deepEqual(m, {data : S_MSG});
clientGotMessage = true;
};
ws.onclose = function() {
clientGotClose = true;
};

process.on('exit', function() {
assert.ok(serverGotConnection);
Expand All @@ -59,4 +64,5 @@ process.on('exit', function() {
assert.ok(clientGotMessage);
assert.ok(serverGotMessage);
assert.ok(serverGotClose);
assert.ok(clientGotClose);
});
32 changes: 32 additions & 0 deletions test/test-close.js
@@ -0,0 +1,32 @@
// Verify that a connection can be closed gracefully from both directions.

var assert = require('assert');
var WebSocket = require('../lib/websocket').WebSocket;
var WebSocketServer = require('websocket-server/ws/server').Server;

var PORT = 1024 + Math.floor(Math.random() * 4096);

var clientGotServerClose = false;
var serverGotClientClose = false;

var wss = new WebSocketServer();
wss.listen(PORT, 'localhost');
wss.on('connection', function(c) {
c.on('close', function() {
serverGotClientClose = true;
wss.close();
});

c.close();
});

var ws = new WebSocket('ws://localhost:' + PORT);
ws.onclose = function() {
assert.equal(ws.CLOSED, ws.readyState);
clientGotServerClose = true;
};

process.on('exit', function() {
assert.ok(clientGotServerClose);
assert.ok(serverGotClientClose);
});
3 changes: 1 addition & 2 deletions test/test-readonly-attrs.js
Expand Up @@ -10,6 +10,7 @@ var PORT = 1024 + Math.floor(Math.random() * 4096);
var wss = new WebSocketServer();
wss.listen(PORT, 'localhost');
wss.on('connection', function(c) {
c.close();
wss.close();
});
var ws = new WebSocket('ws://localhost:' + PORT + '/', 'biff');
Expand Down Expand Up @@ -39,6 +40,4 @@ ws.on('open', function() {
} catch (e) {
assert.equal(e.type, 'no_setter_in_callback');
}

ws.close();
});
3 changes: 3 additions & 0 deletions test/test-ready-state.js
Expand Up @@ -8,6 +8,9 @@ var PORT = 1024 + Math.floor(Math.random() * 4096);

var wss = new WebSocketServer();
wss.listen(PORT, 'localhost');
wss.on('connection', function(c) {
c.close();
});

var ws = new WebSocket('ws://localhost:' + PORT);
assert.equal(ws.readyState, ws.CONNECTING);
Expand Down

0 comments on commit 57bf980

Please sign in to comment.