Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add utp support #1486

Open
wants to merge 2 commits into
base: master
from
Open
Changes from 1 commit
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

Next

utp: add utp support

  • Loading branch information
pldubouilh committed Aug 26, 2018
commit 29b6216fd024b00ea638e99f8a1c57a1c3abe4d2
@@ -19,7 +19,7 @@ var randombytes = require('randombytes')
var speedometer = require('speedometer')
var zeroFill = require('zero-fill')

var TCPPool = require('./lib/tcp-pool') // browser exclude
var ConnPool = require('./lib/conn-pool') // browser exclude
var Torrent = require('./lib/torrent')

/**
@@ -87,6 +87,7 @@ function WebTorrent (opts) {
self.tracker = opts.tracker !== undefined ? opts.tracker : {}
self.torrents = []
self.maxConns = Number(opts.maxConns) || 55
self.utp = opts.utp !== false

self._debug(
'new webtorrent (peerId %s, nodeId %s, port %s)',
@@ -110,8 +111,8 @@ function WebTorrent (opts) {
}
}

if (typeof TCPPool === 'function') {
self._tcpPool = new TCPPool(self)
if (typeof ConnPool === 'function') {
self._connPool = new ConnPool(self)
} else {
process.nextTick(function () {
self._onListening()
@@ -395,8 +396,8 @@ WebTorrent.prototype._remove = function (torrentId, cb) {

WebTorrent.prototype.address = function () {
if (!this.listening) return null
return this._tcpPool
? this._tcpPool.server.address()
return this._connPool
? this._connPool.tcpServer.address()
: { address: '0.0.0.0', family: 'IPv4', port: 0 }
}

@@ -420,9 +421,9 @@ WebTorrent.prototype._destroy = function (err, cb) {
}
})

if (self._tcpPool) {
if (self._connPool) {
tasks.push(function (cb) {
self._tcpPool.destroy(cb)
self._connPool.destroy(cb)
})
}

@@ -437,17 +438,17 @@ WebTorrent.prototype._destroy = function (err, cb) {
if (err) self.emit('error', err)

self.torrents = []
self._tcpPool = null
self._connPool = null
self.dht = null
}

WebTorrent.prototype._onListening = function () {
this._debug('listening')
this.listening = true

if (this._tcpPool) {
if (this._connPool) {
// Sometimes server.address() returns `null` in Docker.
var address = this._tcpPool.server.address()
var address = this._connPool.tcpServer.address()
if (address) this.torrentPort = address.port
}

@@ -1,24 +1,26 @@
const arrayRemove = require('unordered-array-remove')
const debug = require('debug')('webtorrent:tcp-pool')
const debug = require('debug')('webtorrent:conn-pool')
const net = require('net') // browser exclude
const utp = require('utp-native') // browser exclude

const Peer = require('./peer')

/**
* TCPPool
* Connection Pool
*
* A "TCP pool" allows multiple swarms to listen on the same TCP port and determines
* A connection pool allows multiple swarms to listen on the same TCP/UDP port and determines
* which swarm incoming connections are intended for by inspecting the bittorrent
* handshake that the remote peer sends.
*
* @param {number} port
*/
class TCPPool {
class ConnPool {
constructor (client) {
debug('create tcp pool (port %s)', client.torrentPort)

this.server = net.createServer()
debug('create pool (port %s)', client.torrentPort)
const self = this
let i = 0
this._client = client
this.utp = client.utp

// Temporarily store incoming connections so they can be destroyed if the server is
// closed before the connection is passed off to a Torrent.
@@ -29,30 +31,52 @@ class TCPPool {
}

this._onListening = () => {
this._client._onListening()
// Kickoff client onListening when everything's setup
if (!self._client.utp || ++i === 2) {
self._client._onListening()
// Start UTP if needed
} else if (self._client.utp) {
self.utpServer.listen(self.tcpServer.address().port)
}
}

this._onError = err => {
this._client._destroy(err)
this._onError = e => {
this._client._destroy(e)
}

this.server.on('connection', this._onConnectionBound)
this.server.on('listening', this._onListening)
this.server.on('error', this._onError)
// Setup TCP
this.tcpServer = net.createServer()
this.tcpServer.on('connection', this._onConnectionBound)
this.tcpServer.on('error', this._onError)

// Setup uTP
if (this.utp) {
this.utpServer = utp.createServer()
this.utpServer.on('connection', this._onConnectionBound)
this.utpServer.on('error', this._onError)
this.utpServer.on('listening', this._onListening)
}

this.server.listen(client.torrentPort)
// Start TCP
this.tcpServer.listen(client.torrentPort, this._onListening)
}

/**
/**
* Destroy this TCP pool.
* @param {function} cb
*/
destroy (cb) {
debug('destroy tcp pool')
debug('destroy conn pool')

this.server.removeListener('connection', this._onConnectionBound)
this.server.removeListener('listening', this._onListening)
this.server.removeListener('error', this._onError)
if (this._client.utp) {
this.utpServer.removeListener('connection', this._onConnectionBound)
this.utpServer.removeListener('listening', this._onListening)
this.utpServer.removeListener('error', this._onError)
}

this.tcpServer.removeListener('connection', this._onConnectionBound)
this.tcpServer.removeListener('listening', this._onListening)
this.tcpServer.removeListener('error', this._onError)

// Destroy all open connection objects so server can close gracefully without waiting
// for connection timeout or remote peer to disconnect.
@@ -61,13 +85,18 @@ class TCPPool {
conn.destroy()
})

try {
this.utpServer.close()
} catch (e) { }

try {
this.server.close(cb)
} catch (err) {
if (cb) process.nextTick(cb)
}

this.server = null
this.tcpServer = null
this.utpServer = null
this._client = null
this._pendingConns = null
}
@@ -83,17 +112,15 @@ class TCPPool {
// then `remoteAddress` will not be available, and we can't use this connection.
// - Node.js issue: https://github.com/nodejs/node-v0.x-archive/issues/7566
// - WebTorrent issue: https://github.com/webtorrent/webtorrent/issues/398
if (!conn.remoteAddress) {
conn.on('error', noop)
if (!conn.address().address) {
conn.destroy()
return
}

self._pendingConns.push(conn)
conn.once('close', cleanupPending)

const peer = Peer.createTCPIncomingPeer(conn)

const peer = Peer.createIncomingPeer(conn)
const wire = peer.wire
wire.once('handshake', onHandshake)

@@ -125,4 +152,4 @@ class TCPPool {

function noop () {}

module.exports = TCPPool
module.exports = ConnPool
@@ -4,10 +4,16 @@ var Wire = require('bittorrent-protocol')

var WebConn = require('./webconn')

var CONNECT_TIMEOUT_TCP = 5000
var CONNECT_TIMEOUT_WEBRTC = 25000
var CONNECT_TIMEOUT = {
utpOutgoing: 5000,
tcpOutgoing: 5000,
webrtc: 2500
}

var HANDSHAKE_TIMEOUT = 25000

var RECONNECT_WAIT = [1000, 5000, 15000]

/**
* WebRTC peer connections start out connected, because WebRTC peers require an
* "introduction" (i.e. WebRTC signaling), and there's no equivalent to an IP address
@@ -34,23 +40,23 @@ exports.createWebRTCPeer = function (conn, swarm) {
* listening port of the TCP server. Until the remote peer sends a handshake, we don't
* know what swarm the connection is intended for.
*/
exports.createTCPIncomingPeer = function (conn) {
var addr = conn.remoteAddress + ':' + conn.remotePort
var peer = new Peer(addr, 'tcpIncoming')
exports.createIncomingPeer = function (conn, type) {
var addr = conn.address()
var addrStr = addr.address + ':' + addr.port
var peer = new Peer(addrStr, conn._utp ? 'utpIncoming' : 'tcpIncoming')
peer.conn = conn
peer.addr = addr
peer.addr = addrStr

peer.onConnect()

return peer
}

/**
* Outgoing TCP peers start out with just an IP address. At some point (when there is an
* Outgoing peers start out with just an IP address. At some point (when there is an
* available connection), the client can attempt to connect to the address.
*/
exports.createTCPOutgoingPeer = function (addr, swarm) {
var peer = new Peer(addr, 'tcpOutgoing')
exports.createOutgoingPeer = function (addr, swarm) {
var peer = new Peer(addr, 'outgoing')
peer.addr = addr
peer.swarm = swarm

@@ -78,11 +84,10 @@ exports.createWebSeedPeer = function (url, swarm) {
*/
function Peer (id, type) {
var self = this
self.id = id
self.type = type

debug('new %s Peer %s', type, id)

self.id = id
self.type = type
self.addr = null
self.conn = null
self.swarm = null
@@ -105,7 +110,7 @@ Peer.prototype.onConnect = function () {
if (self.destroyed) return
self.connected = true

debug('Peer %s connected', self.id)
debug('Peer %s connected, type %s', self.id, self.type)

clearTimeout(self.connectTimeout)

@@ -185,6 +190,32 @@ Peer.prototype.onHandshake = function (infoHash, peerId) {
if (!self.sentHandshake) self.handshake()
}

// When connection closes, attempt reconnect after timeout (with exponential backoff)
Peer.prototype.onClose = function (torrent) {
var self = this
if (torrent.destroyed || torrent.done) return

if (self.retries >= RECONNECT_WAIT.length) {
debug(
'conn %s closed: will not re-add (max %s attempts)',
self.addr, RECONNECT_WAIT.length
)
return
}

var ms = RECONNECT_WAIT[self.retries]
debug(
'conn %s closed: will re-add to queue in %sms (attempt %s)',
self.addr, ms, self.retries + 1
)

var reconnectTimeout = setTimeout(function reconnectTimeout () {
var newPeer = torrent._addPeer(self.addr)
if (newPeer) newPeer.retries = self.retries + 1
}, ms)
if (reconnectTimeout.unref) reconnectTimeout.unref()
}

Peer.prototype.handshake = function () {
var self = this
var opts = {
@@ -194,12 +225,12 @@ Peer.prototype.handshake = function () {
self.sentHandshake = true
}

Peer.prototype.startConnectTimeout = function () {
Peer.prototype.startConnectTimeout = function (cb) {
var self = this
clearTimeout(self.connectTimeout)
self.connectTimeout = setTimeout(function () {
self.destroy(new Error('connect timeout'))
}, self.type === 'webrtc' ? CONNECT_TIMEOUT_WEBRTC : CONNECT_TIMEOUT_TCP)
function defaultCB () { self.destroy(new Error('connect timeout')) }

self.connectTimeout = setTimeout(cb || defaultCB, CONNECT_TIMEOUT[self.type])
if (self.connectTimeout.unref) self.connectTimeout.unref()
}

ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.