Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add uTP support #1328

Closed
wants to merge 7 commits into from
Closed
Changes from 1 commit
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

Next

uTP

  • Loading branch information
pldubouilh committed Mar 7, 2018
commit 6c7dec46800924d50b724d2413bdc948c4d76bd4
@@ -19,7 +19,7 @@ var randombytes = require('randombytes')
var speedometer = require('speedometer')
var zeroFill = require('zero-fill')

var TCPPool = require('./lib/tcp-pool') // browser exclude
var ConnPool = require('./lib/conn-pool') // browser exclude
var Torrent = require('./lib/torrent')

/**
@@ -110,8 +110,8 @@ function WebTorrent (opts) {
}
}

if (typeof TCPPool === 'function') {
self._tcpPool = new TCPPool(self)
if (typeof ConnPool === 'function') {
self._connPool = new ConnPool(self)
} else {
process.nextTick(function () {
self._onListening()
@@ -394,8 +394,8 @@ WebTorrent.prototype._remove = function (torrentId, cb) {

WebTorrent.prototype.address = function () {
if (!this.listening) return null
return this._tcpPool
? this._tcpPool.server.address()
return this._connPool
? this._connPool.tcpServer.address()
: { address: '0.0.0.0', family: 'IPv4', port: 0 }
}

@@ -419,9 +419,9 @@ WebTorrent.prototype._destroy = function (err, cb) {
}
})

if (self._tcpPool) {
if (self._connPool) {
tasks.push(function (cb) {
self._tcpPool.destroy(cb)
self._connPool.destroy(cb)
})
}

@@ -436,17 +436,17 @@ WebTorrent.prototype._destroy = function (err, cb) {
if (err) self.emit('error', err)

self.torrents = []
self._tcpPool = null
self._connPool = null
self.dht = null
}

WebTorrent.prototype._onListening = function () {
this._debug('listening')
this.listening = true

if (this._tcpPool) {
if (this._connPool) {
// Sometimes server.address() returns `null` in Docker.
var address = this._tcpPool.server.address()
var address = this._connPool.tcpServer.address()
if (address) this.torrentPort = address.port
}

@@ -1,61 +1,71 @@
module.exports = TCPPool
module.exports = ConnPool

var arrayRemove = require('unordered-array-remove')
var debug = require('debug')('webtorrent:tcp-pool')
var debug = require('debug')('webtorrent:conn-pool')
var net = require('net') // browser exclude
var utp = require('utp') // browser exclude

var Peer = require('./peer')

/**
* TCPPool
* Connection Pool
*
* A "TCP pool" allows multiple swarms to listen on the same TCP port and determines
* A connection pool allows multiple swarms to listen on the same TCP/UDP port and determines
* which swarm incoming connections are intended for by inspecting the bittorrent
* handshake that the remote peer sends.
*
* @param {number} port
*/
function TCPPool (client) {
function ConnPool (client) {
var self = this
debug('create tcp pool (port %s)', client.torrentPort)

self.server = net.createServer()
self._client = client
debug('create connection pool (port %s)', client.torrentPort)

// Temporarily store incoming connections so they can be destroyed if the server is
// closed before the connection is passed off to a Torrent.
self._pendingConns = []

self._onConnectionBound = function (conn) {
self._onConnectionBoundTcp = function (conn) {
self._onConnection(conn)
}

self._onListening = function () {
self._onConnectionBoundUtp = function (socket) {
self._onConnection(socket)
}

self._onListeningTCP = function (e) {
// Start listening UDP when TCP is listening
self.utpServer = utp.createServer(self._onConnectionBoundUtp)
self.utpServer.on('error', self._onError)
self.utpServer.listen(this.address().port)

self._client._onListening()
}

self._onError = function (err) {
self._client._destroy(err)
}

self.server.on('connection', self._onConnectionBound)
self.server.on('listening', self._onListening)
self.server.on('error', self._onError)
self._client = client

self.server.listen(client.torrentPort)
// Start listening TCP
self.tcpServer = net.createServer()
self.tcpServer.on('connection', self._onConnectionBoundTcp)
self.tcpServer.on('listening', self._onListeningTCP)
self.tcpServer.on('error', self._onError)
self.tcpServer.listen(client.torrentPort)
}

/**
* Destroy this TCP pool.
* @param {function} cb
*/
TCPPool.prototype.destroy = function (cb) {
ConnPool.prototype.destroy = function (cb) {
var self = this
debug('destroy tcp pool')
debug('destroy pool')

self.server.removeListener('connection', self._onConnectionBound)
self.server.removeListener('listening', self._onListening)
self.server.removeListener('error', self._onError)
self.tcpServer.removeListener('connection', self._onConnectionBoundTcp)
self.tcpServer.removeListener('listening', self._onListeningTCP)
self.tcpServer.removeListener('error', self._onError)

// Destroy all open connection objects so server can close gracefully without waiting
// for connection timeout or remote peer to disconnect.
@@ -65,12 +75,17 @@ TCPPool.prototype.destroy = function (cb) {
})

try {
self.server.close(cb)
self.utpServer.close()
} catch (e) { }

try {
self.tcpServer.close(cb)
} catch (err) {
if (cb) process.nextTick(cb)
}

self.server = null
self.tcpServer = null
self.utpServer = null
self._client = null
self._pendingConns = null
}
@@ -79,14 +94,15 @@ TCPPool.prototype.destroy = function (cb) {
* On incoming connections, we expect the remote peer to send a handshake first. Based
* on the infoHash in that handshake, route the peer to the right swarm.
*/
TCPPool.prototype._onConnection = function (conn) {
ConnPool.prototype._onConnection = function (conn) {
var self = this

// If the connection has already been closed before the `connect` event is fired,
// then `remoteAddress` will not be available, and we can't use this connection.
// - Node.js issue: https://github.com/nodejs/node-v0.x-archive/issues/7566
// - WebTorrent issue: https://github.com/webtorrent/webtorrent/issues/398
if (!conn.remoteAddress) {

if (!conn.socket && !conn.remoteAddress) {
conn.on('error', noop)
conn.destroy()
return
@@ -95,7 +111,7 @@ TCPPool.prototype._onConnection = function (conn) {
self._pendingConns.push(conn)
conn.once('close', cleanupPending)

var peer = Peer.createTCPIncomingPeer(conn)
var peer = Peer.createIncomingPeer(conn, conn.socket ? 'utpIncoming' : 'tcpIncoming')

var wire = peer.wire
wire.once('handshake', onHandshake)
@@ -14,7 +14,8 @@ var HANDSHAKE_TIMEOUT = 25000
* that lets you refer to a WebRTC endpoint.
*/
exports.createWebRTCPeer = function (conn, swarm) {
var peer = new Peer(conn.id, 'webrtc')
var peer = new Peer(conn.id)
peer.type = 'webrtc'
peer.conn = conn
peer.swarm = swarm

@@ -34,23 +35,25 @@ exports.createWebRTCPeer = function (conn, swarm) {
* listening port of the TCP server. Until the remote peer sends a handshake, we don't
* know what swarm the connection is intended for.
*/
exports.createTCPIncomingPeer = function (conn) {
var addr = conn.remoteAddress + ':' + conn.remotePort
var peer = new Peer(addr, 'tcpIncoming')
exports.createIncomingPeer = function (conn, type) {
var host = conn.host || conn.remoteAddress
var port = conn.port || conn.remotePort
var addr = host + ':' + port
var peer = new Peer(addr)
peer.type = type
peer.conn = conn
peer.addr = addr

peer.onConnect()

return peer
}

/**
* Outgoing TCP peers start out with just an IP address. At some point (when there is an
* Outgoing peers start out with just an IP address. At some point (when there is an
* available connection), the client can attempt to connect to the address.
*/
exports.createTCPOutgoingPeer = function (addr, swarm) {
var peer = new Peer(addr, 'tcpOutgoing')
exports.createOutgoingPeer = function (addr, swarm) {
var peer = new Peer(addr)
peer.addr = addr
peer.swarm = swarm

@@ -76,13 +79,13 @@ exports.createWebSeedPeer = function (url, swarm) {
* @param {string} id "ip:port" string, peer id (for WebRTC peers), or url (for Web Seeds)
* @param {string} type the type of the peer
*/
function Peer (id, type) {
function Peer (id) {
var self = this
self.id = id
self.type = type

debug('new Peer %s', id)

self.type = null
self.addr = null
self.conn = null
self.swarm = null
@@ -105,7 +108,7 @@ Peer.prototype.onConnect = function () {
if (self.destroyed) return
self.connected = true

debug('Peer %s connected', self.id)
debug('Peer %s connected, type %s', self.id, self.type)

clearTimeout(self.connectTimeout)

@@ -30,6 +30,7 @@ var speedometer = require('speedometer')
var uniq = require('uniq')
var utMetadata = require('ut_metadata')
var utPex = require('ut_pex') // browser exclude
var utp = require('utp') // browser exclude

var File = require('./file')
var Peer = require('./peer')
@@ -91,6 +92,7 @@ function Torrent (torrentId, client, opts) {
this.destroyed = false
this.paused = false
this.done = false
this.utp = opts.utp || true

this.metadata = null
this.store = null
@@ -782,7 +784,7 @@ Torrent.prototype._addPeer = function (peer) {
var newPeer
if (typeof peer === 'string') {
// `peer` is an addr ("ip:port" string)
newPeer = Peer.createTCPOutgoingPeer(peer, self)
newPeer = Peer.createOutgoingPeer(peer, self)
} else {
// `peer` is a WebRTC connection (simple-peer)
newPeer = Peer.createWebRTCPeer(peer, self)
@@ -1671,15 +1673,47 @@ Torrent.prototype._drain = function () {
var peer = self._queue.shift()
if (!peer) return // queue could be empty

this._debug('tcp connect attempt to %s', peer.addr)

var parts = addrToIPPort(peer.addr)
var opts = {
var connOpts = {
host: parts[0],
port: parts[1]
}

var conn = peer.conn = net.connect(opts)
// Start trying to connect with uTP
self._utpConnect(peer, connOpts)
}

Torrent.prototype._utpConnect = function (peer, connOpts) {
var self = this
if (!self.utp) return self._tcpConnect(peer, connOpts)

this._debug('utp connect attempt to %s', peer.addr)

var conn = peer.conn = utp.connect(connOpts.port, connOpts.host)
peer.type = 'utpOutgoing'

function fallbackTCP (err) {
if (peer.destroyed || peer.connected) return
this._debug('utp timeout/error %s, (error: %s)', peer.addr, err)
conn.once('connect', noop)
conn.once('error', noop)
peer.conn.destroy()
self._tcpConnect(peer, connOpts)
}

setTimeout(fallbackTCP.bind(this), 3000)

conn.once('connect', function () { peer.onConnect() })
conn.once('error', function (e) { fallbackTCP(e).bind(this) })
// conn.once('close', function () { })
}

Torrent.prototype._tcpConnect = function (peer, connOpts) {
var self = this
this._debug('tcp connect attempt to %s', peer.addr)

var conn = peer.conn = net.connect(connOpts)
peer.type = 'tcpOutgoing'

conn.once('connect', function () { peer.onConnect() })
conn.once('error', function (err) { peer.destroy(err) })
@@ -9,7 +9,7 @@
},
"browser": {
"./lib/server.js": false,
"./lib/tcp-pool.js": false,
"./lib/conn-pool.js": false,
"bittorrent-dht/client": false,
"fs-chunk-store": "memory-chunk-store",
"load-ip-set": false,
@@ -66,6 +66,7 @@
"unordered-array-remove": "^1.0.2",
"ut_metadata": "^3.0.8",
"ut_pex": "^1.1.1",
"utp": "0.0.9",
"xtend": "^4.0.1",
"zero-fill": "^2.2.3"
},
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.