Skip to content

Commit

Permalink
Merge pull request #17 from ipfs/stronger-faster-better
Browse files Browse the repository at this point in the history
Stronger faster better
  • Loading branch information
dignifiedquire committed Jun 6, 2016
2 parents 7987ec7 + 6563398 commit fc128b0
Show file tree
Hide file tree
Showing 18 changed files with 380 additions and 187 deletions.
15 changes: 9 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,30 @@
"fs-blob-store": "^5.2.1",
"idb-plus-blob-store": "^1.1.2",
"ipfs-repo": "^0.8.0",
"libp2p-ipfs": "^0.10.0",
"lodash": "^4.11.2",
"libp2p-ipfs": "^0.11.0",
"lodash": "^4.13.1",
"multiaddr": "^2.0.2",
"ncp": "^2.0.0",
"peer-book": "^0.3.0",
"peer-id": "^0.7.0",
"peer-info": "^0.7.0",
"rimraf": "^2.5.2"
"rimraf": "^2.5.2",
"safe-buffer": "^5.0.1"
},
"dependencies": {
"async": "^2.0.0-rc.4",
"async": "^2.0.0-rc.5",
"bl": "^1.1.2",
"debug": "^2.2.0",
"heap": "^0.2.6",
"highland": "^3.0.0-beta.1",
"ipfs-block": "^0.3.0",
"lodash.isequal": "^4.1.4",
"lodash.isequalwith": "^4.2.0",
"lodash.isundefined": "^3.0.1",
"multihashes": "^0.2.2",
"protocol-buffers": "^3.1.6"
},
"contributors": [
"David Dias <daviddias.p@gmail.com>",
"Friedel Ziegelmayer <dignifiedquire@gmail.com>"
]
}
}
2 changes: 1 addition & 1 deletion src/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const second = 1000

module.exports = {
maxProvidersPerRequest: 3,
provierRequestTimeout: 10 * second,
providerRequestTimeout: 10 * second,
hasBlockTimeout: 15 * second,
provideTimeout: 15 * second,
kMaxPriority: Math.pow(2, 31) - 1,
Expand Down
43 changes: 29 additions & 14 deletions src/decision/engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const debug = require('debug')
const _ = require('highland')
const async = require('async')
const mh = require('multihashes')

const log = debug('bitswap:engine')
log.error = debug('bitswap:engine:error')
Expand All @@ -23,6 +24,8 @@ module.exports = class Engine {
// A priority queue of requests received from different
// peers.
this.peerRequestQueue = new PeerRequestQueue()

this._running = false
}

_sendBlock (env, cb) {
Expand All @@ -40,16 +43,11 @@ module.exports = class Engine {
}

_outbox () {
if (!this._timer) {
this._timer = setTimeout(() => {
doIt(() => {
this._timer = null
})
}, 100)
}
if (!this._running) return

const doIt = (cb) => {
_((push, next) => {
if (!this._running) return push(null, _.nil)
const nextTask = this.peerRequestQueue.pop()

if (!nextTask) return push(null, _.nil)
Expand All @@ -75,6 +73,14 @@ module.exports = class Engine {
})
.done(cb)
}

if (!this._timer) {
this._timer = setTimeout(() => {
doIt(() => {
this._timer = null
})
}, 50)
}
}

wantlistForPeer (peerId) {
Expand Down Expand Up @@ -103,7 +109,7 @@ module.exports = class Engine {
}

this._processBlocks(msg.blocks, ledger)
log('wantlist', Array.from(msg.wantlist.values()))
log('wantlist', Array.from(msg.wantlist.values()).map((e) => e.toString()))
async.eachSeries(
msg.wantlist.values(),
this._processWantlist.bind(this, ledger, peerId),
Expand All @@ -112,7 +118,8 @@ module.exports = class Engine {
if (err) return done(err)
this._outbox()
done()
})
}
)
}

receivedBlock (block) {
Expand All @@ -133,20 +140,20 @@ module.exports = class Engine {

_processWantlist (ledger, peerId, entry, cb) {
if (entry.cancel) {
log('cancel %s', entry.key.toString('hex'))
log('cancel %s', mh.toB58String(entry.key))
ledger.cancelWant(entry.key)
this.peerRequestQueue.remove(entry.key, peerId)
async.setImmediate(() => cb())
} else {
log('wants %s - %s', entry.key.toString('hex'), entry.priority)
log('wants %s - %s', mh.toB58String(entry.key), entry.priority)
ledger.wants(entry.key, entry.priority)

// If we already have the block, serve it
this.datastore.has(entry.key, (err, exists) => {
if (err) {
log('failed existence check %s', entry.key.toString('hex'))
log('failed existence check %s', mh.toB58String(entry.key))
} else if (exists) {
log('has want %s', entry.key.toString('hex'))
log('has want %s', mh.toB58String(entry.key))
this.peerRequestQueue.push(entry.entry, peerId)
}
cb()
Expand All @@ -156,7 +163,7 @@ module.exports = class Engine {

_processBlocks (blocks, ledger) {
for (let block of blocks.values()) {
log('got block %s %s bytes', block.key.toString('hex'), block.data.length)
log('got block %s (%s bytes)', mh.toB58String(block.key), block.data.length)
ledger.receivedBytes(block.data.length)

this.receivedBlock(block)
Expand Down Expand Up @@ -200,4 +207,12 @@ module.exports = class Engine {

return l
}

start () {
this._running = true
}

stop () {
this._running = false
}
}
41 changes: 32 additions & 9 deletions src/decision/peer-request-queue.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
'use strict'

const mh = require('multihashes')
const debug = require('debug')
const assert = require('assert')

const PriorityQueue = require('./pq')

const log = debug('bitswap:peer-request-queue')

class PeerRequestTask {
constructor (entry, target, done) {
this.entry = entry
Expand All @@ -13,10 +19,16 @@ class PeerRequestTask {
get key () {
return taskKey(this.target, this.entry.key)
}

get [Symbol.toStringTag] () {
return `PeerRequestTask <target: ${this.target.toB58String()}, entry: ${this.entry.toString()}>`
}
}

class ActivePartner {
constructor () {
constructor (id) {
this.id = id

// The number of blocks this peer is currently being sent.
this.active = 0

Expand All @@ -30,17 +42,18 @@ class ActivePartner {
}

startTask (key) {
this.activeBlocks.set(key, {})
this.activeBlocks.set(mh.toB58String(key), 1)
this.active ++
}

taskDone (key) {
this.activeBlocks.delete(key)
const k = mh.toB58String(key)
assert(this.activeBlocks.has(k), 'finishing non existent task')

this.activeBlocks.delete()
this.active --

if (this.active < 0) {
throw new Error('more tasks finished than started')
}
assert(this.active >= 0, 'more tasks finished than started')
}
}

Expand All @@ -53,21 +66,24 @@ module.exports = class PeerRequestQueue {

// Add a new entry to the queue
push (entry, to) {
log('push, to: %s', to.toB58String())
let partner = this.partners.get(to.toB58String())

if (!partner) {
partner = new ActivePartner()
partner = new ActivePartner(to)
this.pQueue.push(partner)
this.partners.set(to.toB58String(), partner)
}

if (partner.activeBlocks.has(entry.key)) {
log('has activeBlocks', entry.key)
return
}

let task = this.taskMap.get(taskKey(to, entry.key))

if (task) {
log('updating task', task.toString())
task.entry.priority = entry.priority
partner.taskQueue.update(task)
return
Expand All @@ -79,13 +95,16 @@ module.exports = class PeerRequestQueue {
})

partner.taskQueue.push(task)
log('taskMap.set', task.key, task.toString())
this.taskMap.set(task.key, task)
partner.requests ++
partner.taskQueue.update(task)
}

// Get the task with the hightest priority from the queue
pop () {
// log('pop, empty? %s', this.pQueue.isEmpty())
// log('partners', Array.from(this.partners.values()).map((val) => [val.requests, val.taskQueue.size()]))
if (this.pQueue.isEmpty()) return

let partner = this.pQueue.pop()
Expand All @@ -103,7 +122,7 @@ module.exports = class PeerRequestQueue {
partner.requests --
break
}

// log('pop, out', partner.taskQueue.isEmpty(), out)
this.pQueue.push(partner)
return out
}
Expand All @@ -120,11 +139,15 @@ module.exports = class PeerRequestQueue {
// having canceled a block, we now account for that in the given partner
this.partners.get(peerId.toB58String()).requests --
}

log('taskMap', Array.from(this.taskMap.values()).map((v) => {
return v.toString()
}))
}
}

function taskKey (peerId, key) {
return `${peerId.toB58String()}:${key.toString('hex')}`
return `${peerId.toB58String()}:${mh.toB58String(key)}`
}

function partnerCompare (a, b) {
Expand Down
Loading

0 comments on commit fc128b0

Please sign in to comment.