Skip to content

Commit

Permalink
transports: make socket transport a Duplex stream
Browse files Browse the repository at this point in the history
  • Loading branch information
aqrln committed Mar 1, 2017
1 parent c8f8bfd commit 1982152
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 60 deletions.
16 changes: 4 additions & 12 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ const timers = require('timers');
const util = require('util');

const common = require('./common');
const jsrs = require('./record-serialization');
const errors = require('./errors');
const RemoteProxy = require('./remote-proxy');

Expand Down Expand Up @@ -58,7 +57,7 @@ function Connection(transport, server, client) {

this._heartbeatCallbackInstance = null;

transport.on('packet', this._processPacket.bind(this));
transport.on('data', this._processPacket.bind(this));
transport.on('close', this._onSocketClose.bind(this));
transport.on('error', this._onSocketError.bind(this));
}
Expand Down Expand Up @@ -207,7 +206,7 @@ Connection.prototype.startHeartbeat = function(interval) {
// Internal function used by startHeartbeat
//
Connection.prototype._heartbeatCallback = function(interval) {
this.transport.send('{}');
this.transport.write({});
this.setTimeout(interval, this._heartbeatCallbackInstance);
};

Expand Down Expand Up @@ -279,22 +278,15 @@ Connection.prototype._onTimeout = function() {
// packet - a packet to send
//
Connection.prototype._send = function(packet) {
const data = jsrs.stringify(packet);
this.transport.send(data);
this.transport.write(packet);
};

// Close the connection, optionally sending a final packet
// packet - a packet to send (optional)
//
Connection.prototype._end = function(packet) {
this.stopHeartbeat();

if (packet) {
const data = jsrs.stringify(packet);
this.transport.end(data);
} else {
this.transport.end();
}
this.transport.end(packet);
};

// Closed socket event handler
Expand Down
99 changes: 51 additions & 48 deletions lib/transport.socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const net = require('net');
const tls = require('tls');
const util = require('util');
const events = require('events');
const stream = require('stream');

const jsrs = require('./record-serialization');
const common = require('./common');
Expand Down Expand Up @@ -167,77 +168,79 @@ SocketClient.prototype._onSocketClose = function() {
// socket - socket instance
//
function SocketTransport(socket) {
events.EventEmitter.call(this);
stream.Duplex.call(this, {
readableObjectMode: true,
writableObjectMode: true,
allowHalfOpen: false
});

this.socket = socket;
this._socket = socket;
this._buffer = '';
this._uncorkSocket = this.socket.uncork.bind(this.socket);
this._readable = false;

this.remoteAddress = socket.remoteAddress;

this.socket.setEncoding('utf8');
this.socket.on('readable', this._onSocketReadable.bind(this));

common.forwardMultipleEvents(this.socket, this, [
'error',
'close'
]);
socket.setEncoding('utf8');
socket.on('error', (err) => this.emit('error', err));
socket.on('end', () => this.push(null));
socket.on('readable', () => {
this._readable = true;
this._read();
});
}

util.inherits(SocketTransport, events.EventEmitter);
util.inherits(SocketTransport, stream.Duplex);
sock.SocketTransport = SocketTransport;

// Send data over the connection
// data - Buffer or string
//
SocketTransport.prototype.send = function(data) {
this.socket.cork();
this.socket.write(data);
this.socket.write(SEPARATOR);
process.nextTick(this._uncorkSocket);
};

// End the connection optionally sending the last chunk of data
// data - Buffer or string (optional)
//
SocketTransport.prototype.end = function(data) {
if (data) {
this.socket.cork();
this.socket.write(data);
this.socket.end(SEPARATOR);
} else {
this.socket.end();
}
SocketTransport.prototype._read = function() {
if (!this._readable) return;

this.socket.destroy();
};

SocketTransport.prototype._onSocketReadable = function() {
let chunk = null;
while ((chunk = this.socket.read()) !== null)
while ((chunk = this._socket.read()) !== null) {
this._buffer += chunk;
this._processData();
};
}

// Socket data handler
// data - data received
//
SocketTransport.prototype._processData = function() {
const packets = [];
this._readable = false;

const packets = [];
try {
this._buffer = jsrs.parseNetworkPackets(this._buffer, packets);
} catch (error) {
this.emit('error', error);
return;
this._socket.destroy(error);
}

const packetsCount = packets.length;
for (let i = 0; i < packetsCount; i++) {
this.emit('packet', packets[i]);
const packet = packets[i];
if (packet !== null) this.push(packet);
}

if (this._buffer.length > MAX_PACKET_SIZE) {
this.emit('error', new Error('Maximal packet size exceeded'));
this._socket.destroy(new Error('Maximal packet size exceeded'));
}
};

SocketTransport.prototype._write = function(packet, encoding, callback) {
this._socket.cork();
writeOne(this._socket, packet);
process.nextTick(uncork, this._socket, callback);
};

SocketTransport.prototype._writev = function(packets, callback) {
this._socket.cork();
const count = packets.length;
for (let i = 0; i < count; i++) {
writeOne(this._socket, packets[i].chunk);
}
process.nextTick(uncork, this._socket, callback);
};

function writeOne(socket, packet) {
socket.write(jsrs.stringify(packet));
socket.write(SEPARATOR);
}

function uncork(socket, callback) {
socket.uncork();
callback();
}

0 comments on commit 1982152

Please sign in to comment.