Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add uTP support #1328

Closed
wants to merge 7 commits into from
Closed
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -58,7 +58,8 @@ If `opts` is specified, then the default options (shown below) will be overridde
peerId: String|Buffer, // Wire protocol peer ID (default=randomly generated)
tracker: Boolean|Object, // Enable trackers (default=true), or options object for Tracker
dht: Boolean|Object, // Enable DHT (default=true), or options object for DHT
webSeeds: Boolean // Enable BEP19 web seeds (default=true)
webSeeds: Boolean, // Enable BEP19 web seeds (default=true)
utp: Boolean // Enable BEP29 uTP (default=true)
}
```

@@ -19,7 +19,7 @@ var randombytes = require('randombytes')
var speedometer = require('speedometer')
var zeroFill = require('zero-fill')

var TCPPool = require('./lib/tcp-pool') // browser exclude
var ConnPool = require('./lib/conn-pool') // browser exclude
var Torrent = require('./lib/torrent')

/**
@@ -87,6 +87,7 @@ function WebTorrent (opts) {
self.tracker = opts.tracker !== undefined ? opts.tracker : {}
self.torrents = []
self.maxConns = Number(opts.maxConns) || 55
self.utp = opts.utp !== false

self._debug(
'new webtorrent (peerId %s, nodeId %s, port %s)',
@@ -110,8 +111,8 @@ function WebTorrent (opts) {
}
}

if (typeof TCPPool === 'function') {
self._tcpPool = new TCPPool(self)
if (typeof ConnPool === 'function') {
self._connPool = new ConnPool(self)
} else {
process.nextTick(function () {
self._onListening()
@@ -394,8 +395,8 @@ WebTorrent.prototype._remove = function (torrentId, cb) {

WebTorrent.prototype.address = function () {
if (!this.listening) return null
return this._tcpPool
? this._tcpPool.server.address()
return this._connPool
? this._connPool.tcpServer.address()
: { address: '0.0.0.0', family: 'IPv4', port: 0 }
}

@@ -419,9 +420,9 @@ WebTorrent.prototype._destroy = function (err, cb) {
}
})

if (self._tcpPool) {
if (self._connPool) {
tasks.push(function (cb) {
self._tcpPool.destroy(cb)
self._connPool.destroy(cb)
})
}

@@ -436,17 +437,17 @@ WebTorrent.prototype._destroy = function (err, cb) {
if (err) self.emit('error', err)

self.torrents = []
self._tcpPool = null
self._connPool = null
self.dht = null
}

WebTorrent.prototype._onListening = function () {
this._debug('listening')
this.listening = true

if (this._tcpPool) {
if (this._connPool) {
// Sometimes server.address() returns `null` in Docker.
var address = this._tcpPool.server.address()
var address = this._connPool.tcpServer.address()
if (address) this.torrentPort = address.port
}

@@ -1,30 +1,29 @@
module.exports = TCPPool
module.exports = ConnPool

var arrayRemove = require('unordered-array-remove')
var debug = require('debug')('webtorrent:tcp-pool')
var debug = require('debug')('webtorrent:conn-pool')
var net = require('net') // browser exclude
var utp = require('utp-native') // browser exclude

var Peer = require('./peer')

/**
* TCPPool
* Connection Pool
*
* A "TCP pool" allows multiple swarms to listen on the same TCP port and determines
* A connection pool allows multiple swarms to listen on the same TCP/UDP port and determines
* which swarm incoming connections are intended for by inspecting the bittorrent
* handshake that the remote peer sends.
*
* @param {number} port
*/
function TCPPool (client) {
function ConnPool (client) {
var self = this
debug('create tcp pool (port %s)', client.torrentPort)

self.server = net.createServer()
self._client = client
debug('create connection pool (port %s)', client.torrentPort)

// Temporarily store incoming connections so they can be destroyed if the server is
// closed before the connection is passed off to a Torrent.
self._pendingConns = []
self.utp = client.utp

self._onConnectionBound = function (conn) {
self._onConnection(conn)
@@ -38,24 +37,48 @@ function TCPPool (client) {
self._client._destroy(err)
}

self.server.on('connection', self._onConnectionBound)
self.server.on('listening', self._onListening)
self.server.on('error', self._onError)
self._client = client

self.server.listen(client.torrentPort)
// Setup TCP
self.tcpServer = net.createServer()
self.tcpServer.on('connection', self._onConnectionBound)
self.tcpServer.on('error', self._onError)

// Setup uTP
if (self.utp) {
self.utpServer = utp.createServer()
self.utpServer.on('connection', self._onConnectionBound)
self.utpServer.on('listening', self._onListening)
self.utpServer.on('error', self._onError)
}

// Start listening TCP then uTP
self.tcpServer.listen(client.torrentPort, function () {
if (self.utp) {
self.utpServer.listen(this.address().port)
} else {
self._onListening()
}
})
}

/**
* Destroy this TCP pool.
* Destroy this pool.
* @param {function} cb
*/
TCPPool.prototype.destroy = function (cb) {
ConnPool.prototype.destroy = function (cb) {
var self = this
debug('destroy tcp pool')
debug('destroy pool')

if (self.utp) {
self.utpServer.removeListener('connection', self._onConnectionBound)
self.utpServer.removeListener('listening', self._onListening)
self.utpServer.removeListener('error', self._onError)
}

self.server.removeListener('connection', self._onConnectionBound)
self.server.removeListener('listening', self._onListening)
self.server.removeListener('error', self._onError)
self.tcpServer.removeListener('connection', self._onConnectionBound)
self.tcpServer.removeListener('listening', self._onListening)
self.tcpServer.removeListener('error', self._onError)

// Destroy all open connection objects so server can close gracefully without waiting
// for connection timeout or remote peer to disconnect.
@@ -65,12 +88,17 @@ TCPPool.prototype.destroy = function (cb) {
})

try {
self.server.close(cb)
self.utpServer.close()
} catch (e) { }

try {
self.tcpServer.close(cb)
} catch (err) {
if (cb) process.nextTick(cb)
}

self.server = null
self.tcpServer = null
self.utpServer = null
self._client = null
self._pendingConns = null
}
@@ -79,14 +107,15 @@ TCPPool.prototype.destroy = function (cb) {
* On incoming connections, we expect the remote peer to send a handshake first. Based
* on the infoHash in that handshake, route the peer to the right swarm.
*/
TCPPool.prototype._onConnection = function (conn) {
ConnPool.prototype._onConnection = function (conn) {
var self = this

// If the connection has already been closed before the `connect` event is fired,
// then `remoteAddress` will not be available, and we can't use this connection.
// - Node.js issue: https://github.com/nodejs/node-v0.x-archive/issues/7566
// - WebTorrent issue: https://github.com/webtorrent/webtorrent/issues/398
if (!conn.remoteAddress) {

if (!conn.address().address) {
conn.on('error', noop)
conn.destroy()
return
@@ -95,7 +124,7 @@ TCPPool.prototype._onConnection = function (conn) {
self._pendingConns.push(conn)
conn.once('close', cleanupPending)

var peer = Peer.createTCPIncomingPeer(conn)
var peer = Peer.createIncomingPeer(conn)

var wire = peer.wire
wire.once('handshake', onHandshake)
@@ -4,17 +4,24 @@ var Wire = require('bittorrent-protocol')

var WebConn = require('./webconn')

var CONNECT_TIMEOUT_TCP = 5000
var CONNECT_TIMEOUT_WEBRTC = 25000
var CONNECT_TIMEOUT = {
utpOutgoing: 5000,
tcpOutgoing: 5000,
webrtc: 2500
}

var HANDSHAKE_TIMEOUT = 25000

var RECONNECT_WAIT = [1000, 5000, 15000]

/**
* WebRTC peer connections start out connected, because WebRTC peers require an
* "introduction" (i.e. WebRTC signaling), and there's no equivalent to an IP address
* that lets you refer to a WebRTC endpoint.
*/
exports.createWebRTCPeer = function (conn, swarm) {
var peer = new Peer(conn.id, 'webrtc')
var peer = new Peer(conn.id)
peer.type = 'webrtc'
peer.conn = conn
peer.swarm = swarm

@@ -34,23 +41,24 @@ exports.createWebRTCPeer = function (conn, swarm) {
* listening port of the TCP server. Until the remote peer sends a handshake, we don't
* know what swarm the connection is intended for.
*/
exports.createTCPIncomingPeer = function (conn) {
var addr = conn.remoteAddress + ':' + conn.remotePort
var peer = new Peer(addr, 'tcpIncoming')
exports.createIncomingPeer = function (conn, type) {
var addr = conn.address()
var addrStr = addr.address + ':' + addr.port
var peer = new Peer(addrStr)
peer.type = conn._utp ? 'utpIncoming' : 'tcpIncoming'
peer.conn = conn
peer.addr = addr
peer.addr = addrStr

peer.onConnect()

return peer
}

/**
* Outgoing TCP peers start out with just an IP address. At some point (when there is an
* Outgoing peers start out with just an IP address. At some point (when there is an
* available connection), the client can attempt to connect to the address.
*/
exports.createTCPOutgoingPeer = function (addr, swarm) {
var peer = new Peer(addr, 'tcpOutgoing')
exports.createOutgoingPeer = function (addr, swarm) {
var peer = new Peer(addr)
peer.addr = addr
peer.swarm = swarm

@@ -76,13 +84,13 @@ exports.createWebSeedPeer = function (url, swarm) {
* @param {string} id "ip:port" string, peer id (for WebRTC peers), or url (for Web Seeds)
* @param {string} type the type of the peer
*/
function Peer (id, type) {
function Peer (id) {
var self = this
self.id = id
self.type = type

debug('new Peer %s', id)

self.type = null
self.addr = null
self.conn = null
self.swarm = null
@@ -105,7 +113,7 @@ Peer.prototype.onConnect = function () {
if (self.destroyed) return
self.connected = true

debug('Peer %s connected', self.id)
debug('Peer %s connected, type %s', self.id, self.type)

clearTimeout(self.connectTimeout)

@@ -185,6 +193,32 @@ Peer.prototype.onHandshake = function (infoHash, peerId) {
if (!self.sentHandshake) self.handshake()
}

// When connection closes, attempt reconnect after timeout (with exponential backoff)
Peer.prototype.onClose = function (torrent) {
var self = this
if (torrent.destroyed || torrent.done) return

if (self.retries >= RECONNECT_WAIT.length) {
debug(
'conn %s closed: will not re-add (max %s attempts)',
self.addr, RECONNECT_WAIT.length
)
return
}

var ms = RECONNECT_WAIT[self.retries]
debug(
'conn %s closed: will re-add to queue in %sms (attempt %s)',
self.addr, ms, self.retries + 1
)

var reconnectTimeout = setTimeout(function reconnectTimeout () {
var newPeer = torrent._addPeer(self.addr)
if (newPeer) newPeer.retries = self.retries + 1
}, ms)
if (reconnectTimeout.unref) reconnectTimeout.unref()
}

Peer.prototype.handshake = function () {
var self = this
var opts = {
@@ -194,12 +228,12 @@ Peer.prototype.handshake = function () {
self.sentHandshake = true
}

Peer.prototype.startConnectTimeout = function () {
Peer.prototype.startConnectTimeout = function (cb) {
var self = this
clearTimeout(self.connectTimeout)
self.connectTimeout = setTimeout(function () {
self.destroy(new Error('connect timeout'))
}, self.type === 'webrtc' ? CONNECT_TIMEOUT_WEBRTC : CONNECT_TIMEOUT_TCP)
function defaultCB () { self.destroy(new Error('connect timeout')) }

self.connectTimeout = setTimeout(cb || defaultCB, CONNECT_TIMEOUT[self.type])
if (self.connectTimeout.unref) self.connectTimeout.unref()
}

ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.