Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Defer Store Verification #1350

Open
wants to merge 5 commits into
base: master
from
Open
Changes from 1 commit
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

Next

Defer store verification

Allow file access before the entire store has been validated.

Critical pieces are validated immediately instead of waiting in the
parallelLimit() queue.
  • Loading branch information
KayleePop committed Apr 11, 2018
commit 11ced941e39d9f4c6dd76701c2017a05468b9d65
@@ -100,6 +100,8 @@ function Torrent (torrentId, client, opts) {
this._amInterested = false
this._selections = []
this._critical = []
this._verified = []
this._verifying = []

this.wires = [] // open wires (added *after* handshake)

@@ -548,7 +550,6 @@ Torrent.prototype._onMetadata = function (metadata) {
for (var index = 0; index < self.pieces.length; index++) {
self._markVerified(index)
}
self._onStore()
} else {
self._verifyPieces()
}
@@ -558,6 +559,7 @@ Torrent.prototype._onMetadata = function (metadata) {
}

self.emit('metadata')
self._onStore()
}

/*
@@ -586,30 +588,64 @@ Torrent.prototype._verifyPieces = function () {
var self = this
parallelLimit(self.pieces.map(function (_, index) {
return function (cb) {
if (self.destroyed) return cb(new Error('torrent is destroyed'))

self.store.get(index, function (err, buf) {
if (self.destroyed) return cb(new Error('torrent is destroyed'))

if (err) return process.nextTick(cb, null) // ignore error
sha1(buf, function (hash) {
if (self.destroyed) return cb(new Error('torrent is destroyed'))

if (hash === self._hashes[index]) {
if (!self.pieces[index]) return
self._debug('piece verified %s', index)
self._markVerified(index)
} else {
self._debug('piece invalid %s', index)
}
cb(null)
})
})
self._verify(index, cb)
}
}), FILESYSTEM_CONCURRENCY, function (err) {
if (err) return self._destroy(err)
self._debug('done verifying')
self._onStore()
})
}

Torrent.prototype._verify = function (index, cb) {
var self = this

if (self.destroyed) return cb(new Error('torrent is destroyed'))
if (self._verifying[index] || self.bitfield.get(index) || self._verified[index]) {
// call cb() asynchronously so that parallelLimit() won't block
return process.nextTick(cb, null)
}

self._verifying[index] = true

self.store.get(index, function (err, buf) {
if (self.destroyed) return cb(new Error('torrent is destroyed'))
// a critical piece might have been downloaded before this callback
if (self.bitfield.get(index)) return cb(null)

if (err) {
// interpret any error as not having the chunk
self._verified[index] = true
self._update()
return cb(null)
}

sha1(buf, function (hash) {
if (self.destroyed) return cb(new Error('torrent is destroyed'))
// a critical piece might have been downloaded before this callback
if (self.bitfield.get(index)) return cb(null)

if (hash === self._hashes[index]) {
self._debug('piece verified %s', index)
self._markVerified(index)

self.wires.forEach(function (wire) {
wire.have(index)
})

// We also check `self.destroyed` since `torrent.destroy()` could have been
// called in the `torrent.on('done')` handler, triggered by `_checkDone()`.
if (self._checkDone() && !self.destroyed) self.discovery.complete()

self._update()

cb(null)
} else {
self._debug('piece invalid %s', index)
self._verified[index] = true
self._update()
cb(null)
}
})
})
}

@@ -1470,6 +1506,16 @@ Torrent.prototype._request = function (wire, index, hotswap) {

if (self.bitfield.get(index)) return false

if (!self._verified[index]) {
if (self._critical[index]) {
// if the piece is critical then start its verification immediately
self._verify(index, noop)
} else {
// cancel the request unless the piece is critical
return false
}
}

var maxOutstandingRequests = isWebSeed
? Math.min(
getPiecePipelineLength(wire, PIPELINE_MAX_DURATION, self.pieceLength),
@@ -1498,7 +1544,8 @@ Torrent.prototype._request = function (wire, index, hotswap) {
var chunkLength = isWebSeed ? piece.chunkLengthRemaining(reservation) : piece.chunkLength(reservation)

wire.request(index, chunkOffset, chunkLength, function onChunk (err, chunk) {
if (self.destroyed) return
// A critical piece may have been verified as in the store before the callback
if (self.destroyed || self.bitfield.get(index)) return

// TODO: what is this for?
if (!self.ready) return self.once('ready', function () { onChunk(err, chunk) })
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.