Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Defer Store Verification #1350

Open
wants to merge 5 commits into
base: master
from
Open
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -100,6 +100,8 @@ function Torrent (torrentId, client, opts) {
this._amInterested = false
this._selections = []
this._critical = []
this._verified = []
this._verifying = []

this.wires = [] // open wires (added *after* handshake)

@@ -532,6 +534,8 @@ Torrent.prototype._onMetadata = function (metadata) {
self._onWireWithMetadata(wire)
})

self.emit('metadata')

self._debug('verifying existing torrent data')
if (self._fileModtimes && self._store === FSChunkStore) {
// don't verify if the files haven't been modified since we last checked
@@ -546,9 +550,11 @@ Torrent.prototype._onMetadata = function (metadata) {

if (unchanged) {
for (var index = 0; index < self.pieces.length; index++) {
self._markVerified(index)
self._markInStore(index)
}
self._onStore()
self._checkDone()

self.updateSelections()
} else {
self._verifyPieces()
}
@@ -557,7 +563,8 @@ Torrent.prototype._onMetadata = function (metadata) {
self._verifyPieces()
}

self.emit('metadata')
self.ready = true
self.emit('ready')
}

/*
@@ -586,55 +593,67 @@ Torrent.prototype._verifyPieces = function () {
var self = this
parallelLimit(self.pieces.map(function (_, index) {
return function (cb) {
if (self.destroyed) return cb(new Error('torrent is destroyed'))

self.store.get(index, function (err, buf) {
if (self.destroyed) return cb(new Error('torrent is destroyed'))

if (err) return process.nextTick(cb, null) // ignore error
sha1(buf, function (hash) {
if (self.destroyed) return cb(new Error('torrent is destroyed'))

if (hash === self._hashes[index]) {
if (!self.pieces[index]) return
self._debug('piece verified %s', index)
self._markVerified(index)
} else {
self._debug('piece invalid %s', index)
}
cb(null)
})
})
self._verify(index, cb)
}
}), FILESYSTEM_CONCURRENCY, function (err) {
if (err) return self._destroy(err)
self._debug('done verifying')
self._onStore()
})
}

Torrent.prototype._markVerified = function (index) {
this.pieces[index] = null
this._reservations[index] = null
this.bitfield.set(index, true)
}

/**
* Called when the metadata, listening server, and underlying chunk store is initialized.
*/
Torrent.prototype._onStore = function () {
Torrent.prototype._verify = function (index, cb) {
var self = this
if (self.destroyed) return
self._debug('on store')

self.ready = true
self.emit('ready')
if (self.destroyed) return cb(new Error('torrent is destroyed'))
if (self._verifying[index] || self._verified[index]) {
// call cb() asynchronously so that parallelLimit() won't block
return process.nextTick(cb, null)
}

// Files may start out done if the file was already in the store
self._checkDone()
self._verifying[index] = true

// In case any selections were made before torrent was ready
self._updateSelections()
self.store.get(index, function (err, buf) {
if (self.destroyed) return cb(new Error('torrent is destroyed'))

if (err) {
// interpret any error as not having the chunk
self._verified[index] = true
self._update()
return cb(null)
}

sha1(buf, function (hash) {
if (self.destroyed) return cb(new Error('torrent is destroyed'))

if (hash === self._hashes[index]) {
self._debug('piece from store verified %s', index)
self._markInStore(index)

self.wires.forEach(function (wire) {
wire.have(index)
})

// We also check `self.destroyed` since `torrent.destroy()` could have been
// called in the `torrent.on('done')` handler, triggered by `_checkDone()`.
if (self._checkDone() && !self.destroyed) self.discovery.complete()

self._update()

cb(null)
} else {
self._debug('piece from store invalid %s', index)
self._verified[index] = true
self._update()
cb(null)
}
})
})
}

Torrent.prototype._markInStore = function (index) {
this.pieces[index] = null
this._reservations[index] = null
this.bitfield.set(index, true)
}

Torrent.prototype.destroy = function (cb) {
@@ -1470,6 +1489,13 @@ Torrent.prototype._request = function (wire, index, hotswap) {

if (self.bitfield.get(index)) return false

if (!self._verified[index]) {
// verify critical pieces as soon as possible
if (self._critical[index]) self._verify(index, noop)

return false
}

var maxOutstandingRequests = isWebSeed
? Math.min(
getPiecePipelineLength(wire, PIPELINE_MAX_DURATION, self.pieceLength),
@@ -1500,9 +1526,6 @@ Torrent.prototype._request = function (wire, index, hotswap) {
wire.request(index, chunkOffset, chunkLength, function onChunk (err, chunk) {
if (self.destroyed) return

// TODO: what is this for?
if (!self.ready) return self.once('ready', function () { onChunk(err, chunk) })

if (r[i] === wire) r[i] = null

if (piece !== self.pieces[index]) return onUpdateTick()
@@ -1534,7 +1557,7 @@ Torrent.prototype._request = function (wire, index, hotswap) {

if (hash === self._hashes[index]) {
if (!self.pieces[index]) return
self._debug('piece verified %s', index)
self._debug('piece from wire verified %s', index)

self.pieces[index] = null
self._reservations[index] = null
@@ -1551,7 +1574,7 @@ Torrent.prototype._request = function (wire, index, hotswap) {
if (self._checkDone() && !self.destroyed) self.discovery.complete()
} else {
self.pieces[index] = new Piece(piece.length)
self.emit('warning', new Error('Piece ' + index + ' failed verification'))
self.emit('warning', new Error('Piece from wire ' + index + ' failed verification'))
}
onUpdateTick()
})
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.