Skip to content
This repository has been archived by the owner on Oct 30, 2018. It is now read-only.

Commit

Permalink
fix a bunch of broken tunnel stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
Gordon Hall committed Apr 22, 2016
1 parent 02de47a commit 085305e
Show file tree
Hide file tree
Showing 12 changed files with 68 additions and 22 deletions.
4 changes: 2 additions & 2 deletions lib/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ module.exports = {
CLEAN_INTERVAL: 3600000,
/** @constant {Number} OPCODE_TUNRPC_PREFIX - Opcode for tunnel rpc message */
OPCODE_TUNRPC_PREFIX: 0x0c,
/** @constant {Number} OPCODE_TUNDC_PREFIX - Opcode for tunnel datachannel */
OPCODE_TUNDC_PREFIX: 0x0d,
/** @constant {Number} OPCODE_TUNDCX_PREFIX - Opcode for tunnel datachannel */
OPCODE_TUNDCX_PREFIX: 0x0d,
/** @constant {Number} OPCODE_TUNNELER_PREFIX - Prefix opcode for tunneler */
OPCODE_TUNNELER_PREFIX: 0x0e,
/** @constant {Number} OPCODE_CONTRACT_PREFIX - Prefix opcode for contracts */
Expand Down
4 changes: 3 additions & 1 deletion lib/datachannel/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ DataChannelClient.prototype.retrieve = function(token, hash) {
}

if (data.code && data.code !== 200) {
pstream.emit('error', new Error(data.message));
return pstream.emit('error', new Error(data.message));
}

return this.close();
}

pstream.write(data);
Expand Down
4 changes: 3 additions & 1 deletion lib/datachannel/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,9 @@ DataChannelServer.prototype._handleRetrieveStream = function(socket, token) {
});

filestream.on('end', function() {
self.reject(token);
socket.send(JSON.stringify({ code: 200 }), function() {
self.reject(token);
});
});
});
};
Expand Down
11 changes: 8 additions & 3 deletions lib/network/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var TunnelClient = require('../tunnel/client');
* @param {Array} options.opcodes - List of topic strings to subscribe
* @param {Boolean} options.noforward - Flag for skipping traversal strategies
* @param {Number} options.tunnels - Max number of tunnels to provide
* @param {Number} options.tunport - Port for tunnel server to use
*/
function Network(options) {
if (!(this instanceof Network)) {
Expand All @@ -57,7 +58,8 @@ Network.DEFAULTS = {
address: '127.0.0.1',
port: 4000,
noforward: false,
tunnels: 3
tunnels: 3,
tunport: 0 // NB: Pick random open port
};

/**
Expand Down Expand Up @@ -174,6 +176,7 @@ Network.prototype._initNetworkInterface = function() {
logger: this._logger,
cors: true,
tunnels: this._options.tunnels,
tunport: this._options.tunport,
noforward: this._options.noforward
});
this._router = new kad.Router({
Expand Down Expand Up @@ -367,8 +370,8 @@ Network.prototype._setupTunnelClient = function() {
}

this._logger.info('requesting probe from nearest neighbor');
this._requestProbe(neighbor, function(err) {
if (err) {
this._requestProbe(neighbor, function(err, result) {
if (err || result.error) {
return self._findTunnel(neighbor);
}

Expand Down Expand Up @@ -477,6 +480,8 @@ Network.prototype._establishTunnel = function(tunnels) {
self._logger.warn('tunnel connection lost, trying another...');
self._establishTunnel(tunnels);
});

tunclient.open();
});
};

Expand Down
11 changes: 10 additions & 1 deletion lib/network/protocol.js
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,17 @@ Protocol.prototype._handleOpenTunnel = function(params, callback) {
return callback(err);
}

var tunnel = [
'ws://',
self._network._contact.address,
':',
self._network._transport._tunserver.getListeningPort(),
'/tun?token=',
gateway.getEntranceToken()
].join('');

callback(null, {
tunnel: '',
tunnel: tunnel,
alias: {
address: self._network._contact.address,
port: gateway.getEntranceAddress().port
Expand Down
10 changes: 6 additions & 4 deletions lib/network/transport.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,20 @@ var TunnelServer = require('../tunnel/server');
* Custom HTTP transport adapter
* @constructor
* @param {kad.Contact} contact - Contact object to binding to port
* @param {Object} options
* @param {Logger} options.logger - Logger for diagnositcs
* @param {Object} options
* @param {Logger} options.logger - Logger for diagnositcs
* @param {Boolean} options.cors - Enable cross origin resource sharing
* @param {Number} options.tunnels - Number of tunnels to provide to network
* @param {Number} options.tunnels - Number of tunnels to provide to network
* @param {Boolean} options.noforward - Do not try to punch out of NAT
* @param {Number} options.tunport - Port for tunnel server to listen on
*/
function Transport(contact, options) {
if (!(this instanceof Transport)) {
return new Transport(contact, options);
}

this._maxTunnels = options.tunnels;
this._tunport = options.tunport || 0;
this._noforward = options.noforward;

kad.transports.HTTP.call(this, contact, options);
Expand Down Expand Up @@ -93,7 +95,7 @@ Transport.prototype._bindTunnelServer = function() {
);
this._tunserver = new TunnelServer({
maxTunnels: this._maxTunnels,
server: this._server
port: this._tunport
});
};

Expand Down
24 changes: 21 additions & 3 deletions lib/tunnel/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,7 @@ TunnelClient.prototype._handleDataChannel = function(object) {
var quid = object.flags.quid;

if (this._channels[object.flags.quid]) {
return this._channels[quid].send(object.data, {
binary: object.flags.binary
});
return this._sendToExistingSocket(object);
}

var socket = this._channels[quid] = new WebSocketClient(destination);
Expand Down Expand Up @@ -209,4 +207,24 @@ TunnelClient.prototype._handleDataChannel = function(object) {
});
};

/**
* Sends the object to an already open socket
* @private
*/
TunnelClient.prototype._sendToExistingSocket = function(object) {
var sock = this._channels[object.flags.quid];

if (sock.readyState !== WebSocketClient.OPEN) {
return sock.once('open', function() {
sock.send(object.data, {
binary: object.flags.binary
});
});
}

return sock.send(object.data, {
binary: object.flags.binary
});
};

module.exports = TunnelClient;
2 changes: 1 addition & 1 deletion lib/tunnel/demultiplexer.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ TunnelDemuxer.prototype._transform = function(buffer, encoding, callback) {
switch (buffer[0]) {
case constants.OPCODE_TUNRPC_PREFIX:
return this._demuxRPC(buffer, callback);
case constants.OPCODE_TUNDC_PREFIX:
case constants.OPCODE_TUNDCX_PREFIX:
return this._demuxDataChannel(buffer, callback);
default:
return callback(new Error('Invalid input for tunnel demuxing'));
Expand Down
2 changes: 1 addition & 1 deletion lib/tunnel/multiplexer.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ TunnelMuxer.prototype._muxRPC = function(message, callback) {
*/
TunnelMuxer.prototype._muxDataChannel = function(data, flags, callback) {
callback(null, Buffer.concat([
Buffer([constants.OPCODE_TUNDC_PREFIX]),
Buffer([constants.OPCODE_TUNDCX_PREFIX]),
Buffer([flags.binary ? 0x02 : 0x01]), // NB: WebSocket opcode for frame type
Buffer(flags.quid, 'hex'),
Buffer(data)
Expand Down
10 changes: 9 additions & 1 deletion lib/tunnel/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ function TunnelServer(options) {
inherits(TunnelServer, events.EventEmitter);

TunnelServer.DEFAULTS = {
port: 8081,
port: 0,
server: null,
maxTunnels: 3
};
Expand Down Expand Up @@ -103,6 +103,14 @@ TunnelServer.prototype.hasTunnelAvailable = function() {
return Object.keys(this._gateways).length !== this._options.maxTunnels;
};

/**
* Returns the port the tunnel server is listening on
* @returns {Number} port
*/
TunnelServer.prototype.getListeningPort = function() {
return this._server._server.address().port;
};

/**
* Handles the verfication of a connecting client by the supplied token
* @private
Expand Down
4 changes: 2 additions & 2 deletions test/tunnel/demultiplexer.unit.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ describe('TunnelDemuxer', function() {
done();
});
tdmuxer.write(Buffer.concat([
Buffer([constants.OPCODE_TUNDC_PREFIX]),
Buffer([constants.OPCODE_TUNDCX_PREFIX]),
Buffer([0x01]),
Buffer([1, 2, 3, 4, 5, 6]),
Buffer(JSON.stringify({
Expand All @@ -76,7 +76,7 @@ describe('TunnelDemuxer', function() {
done();
});
tdmuxer.write(Buffer.concat([
Buffer([constants.OPCODE_TUNDC_PREFIX]),
Buffer([constants.OPCODE_TUNDCX_PREFIX]),
Buffer([0x02]),
Buffer([1, 2, 3, 4, 5, 6]),
Buffer('hello demuxer')
Expand Down
4 changes: 2 additions & 2 deletions test/tunnel/multiplexer.unit.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ describe('TunnelMuxer', function() {
tmuxer.once('data', function(data) {
expect(Buffer.compare(
data.slice(0, 1),
Buffer([constants.OPCODE_TUNDC_PREFIX])
Buffer([constants.OPCODE_TUNDCX_PREFIX])
)).to.equal(0);
expect(Buffer.compare(
data.slice(1, 2),
Expand All @@ -77,7 +77,7 @@ describe('TunnelMuxer', function() {
tmuxer.once('data', function(data) {
expect(Buffer.compare(
data.slice(0, 1),
Buffer([constants.OPCODE_TUNDC_PREFIX])
Buffer([constants.OPCODE_TUNDCX_PREFIX])
)).to.equal(0);
expect(Buffer.compare(
data.slice(1, 2),
Expand Down

0 comments on commit 085305e

Please sign in to comment.