Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

should remove part of memory leaks closures for callback in event listen... #282

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -101,6 +101,8 @@ function Torrent (torrentId, client, opts) {
this._servers = []

if (torrentId !== null) this._onTorrentId(torrentId)

this._destroyers = []
}

Object.defineProperty(Torrent.prototype, 'timeRemaining', {
@@ -223,24 +225,25 @@ Torrent.prototype._onParsedTorrent = function (parsedTorrent) {
},
maxConns: self.client.maxConns
})
self.swarm.on('error', function (err) {

listen(self.swarm, 'error', function (err) {
self._onError(err)
})
self.swarm.on('wire', function (wire, addr) {
}, self)
listen(self.swarm, 'wire', function (wire, addr) {
self._onWire(wire, addr)
})
}, self)

self.swarm.on('download', function (downloaded) {
listen(self.swarm, 'download', function (downloaded) {
self.client._downloadSpeed(downloaded) // update overall client stats
self.client.emit('download', downloaded)
self.emit('download', downloaded)
})
}, self)

self.swarm.on('upload', function (uploaded) {
listen(self.swarm, 'upload', function (uploaded) {
self.client._uploadSpeed(uploaded) // update overall client stats
self.client.emit('upload', uploaded)
self.emit('upload', uploaded)
})
}, self)

// listen for peers (note: in the browser, this is a no-op and callback is called on
// next tick)
@@ -304,17 +307,18 @@ Torrent.prototype._onSwarmListening = function () {
peerId: self.client.peerId,
port: self.client.torrentPort
})
self.discovery.on('error', function (err) {

listen(self.discovery, 'error', function (err) {
self._onError(err)
})
self.discovery.on('peer', function (peer) {
}, self)
listen(self.discovery, 'peer', function (peer) {
// Don't create new outgoing TCP connections when torrent is done
if (typeof peer === 'string' && self.done) return
self.addPeer(peer)
})
}, self)

// expose discovery events
reemit(self.discovery, self, ['trackerAnnounce', 'dhtAnnounce', 'warning'])
addDestroyer(self, reemit(self.discovery, self, ['trackerAnnounce', 'dhtAnnounce', 'warning']))

// if full metadata was included in initial torrent id, use it
if (self.info) self._onMetadata(self)
@@ -622,7 +626,7 @@ Torrent.prototype._onWire = function (wire, addr) {

// When peer sends PORT message, add that DHT node to routing table
if (self.client.dht && self.client.dht.listening) {
wire.on('port', function (port) {
listen(wire, 'port', function (port) {
if (self.destroyed || self.client.dht.destroyed) {
return
}
@@ -635,14 +639,14 @@ Torrent.prototype._onWire = function (wire, addr) {

self._debug('port: %s (from %s)', port, addr)
self.client.dht.addNode({ host: wire.remoteAddress, port: port })
})
}, self)
}

wire.on('timeout', function () {
listen(wire, 'timeout', function () {
self._debug('wire timeout (%s)', addr)
// TODO: this might be destroying wires too eagerly
wire.destroy()
})
}, self)

// Timeout for piece requests to this peer
wire.setTimeout(PIECE_TIMEOUT, true)
@@ -653,30 +657,30 @@ Torrent.prototype._onWire = function (wire, addr) {
// use ut_metadata extension
wire.use(ut_metadata(self.metadata))

wire.ut_metadata.on('warning', function (err) {
listen(wire.ut_metadata, 'warning', function (err) {
self._debug('ut_metadata warning: %s', err.message)
})
}, self)

if (!self.metadata) {
wire.ut_metadata.on('metadata', function (metadata) {
listen(wire.ut_metadata, 'metadata', function (metadata) {
self._debug('got metadata via ut_metadata')
self._onMetadata(metadata)
})
}, self)
wire.ut_metadata.fetch()
}

// use ut_pex extension if the torrent is not flagged as private
if (typeof ut_pex === 'function' && !self.private) {
wire.use(ut_pex())

wire.ut_pex.on('peer', function (peer) {
listen(wire.ut_pex, 'peer', function (peer) {
// Only add potential new peers when we're not seeding
if (self.done) return
self._debug('ut_pex: got peer: %s (from %s)', peer, addr)
self.addPeer(peer)
})
}, self)

wire.ut_pex.on('dropped', function (peer) {
listen(wire.ut_pex, 'dropped', function (peer) {
// the remote peer believes a given peer has been dropped from the swarm.
// if we're not currently connected to it, then remove it from the swarm's queue.
var peerObj = self.swarm._peers[peer]
@@ -686,10 +690,10 @@ Torrent.prototype._onWire = function (wire, addr) {
}
})

wire.once('close', function () {
listen(wire, 'close', function () {
// Stop sending updates to remote peer
wire.ut_pex.reset()
})
}, self, true)
}

// Hook to allow user-defined `bittorrent-protocol` extensions
@@ -731,43 +735,43 @@ Torrent.prototype._onWireWithMetadata = function (wire) {
wire.choke() // always choke seeders
}

wire.on('bitfield', function () {
listen(wire, 'bitfield', function () {
updateSeedStatus()
self._update()
})
}, self)

wire.on('have', function () {
listen(wire, 'have', function () {
updateSeedStatus()
self._update()
})
}, self)

wire.once('interested', function () {
listen(wire, 'interested', function () {
wire.unchoke()
})
}, self, true)

wire.once('close', function () {
listen(wire, 'close', function () {
clearTimeout(timeoutId)
})
}, self, true)

wire.on('choke', function () {
listen(wire, 'choke', function () {
clearTimeout(timeoutId)
timeoutId = setTimeout(onChokeTimeout, CHOKE_TIMEOUT)
if (timeoutId.unref) timeoutId.unref()
})
}, self)

wire.on('unchoke', function () {
listen(wire, 'unchoke', function () {
clearTimeout(timeoutId)
self._update()
})
}, self)

wire.on('request', function (index, offset, length, cb) {
listen(wire, 'request', function (index, offset, length, cb) {
if (length > MAX_BLOCK_LENGTH) {
// Per spec, disconnect from peers that request >128KB
return wire.destroy()
}
if (self.pieces[index]) return
self.store.get(index, { offset: offset, length: length }, cb)
})
}, self)

wire.bitfield(self.bitfield) // always send bitfield (required)
wire.interested() // always start out interested
@@ -1338,3 +1342,16 @@ function randomInt (high) {
}

function noop () {}

function addDestroyer (target, item) {
target._destroyers.push(item)
}

function listen (source, eventName, cb, target, once) {
var destroyer = function () {
source.removeListener(eventName, cb)
}
addDestroyer(target, destroyer)
source[once ? 'once' : 'on'](eventName, cb)
return destroyer
}
Submodule bittorrent-dht added at 06d420
Submodule bittorrent-swarm added at 7dec3e
Submodule re-emitter added at 764fee
Submodule torrent-discovery added at 8de962
@@ -29,7 +29,10 @@
"dependencies": {
"addr-to-ip-port": "^1.0.1",
"bitfield": "^1.0.2",
"//": {
"bittorrent-dht": "^7.0.0",
"//": "nothing"
},
"bittorrent-protocol": "^2.0.0",
"chunk-store-stream": "^2.0.0",
"cpus": "^1.0.0",
@@ -61,7 +64,10 @@
"speedometer": "^1.0.0",
"stream-to-blob-url": "^2.0.0",
"stream-with-known-length-to-buffer": "^1.0.0",
"//": {
"torrent-discovery": "^7.0.0",
"//": "nothing"
},
"torrent-piece": "^1.0.0",
"uniq": "^1.0.1",
"unordered-array-remove": "^1.0.2",
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.