Skip to content

Commit

Permalink
feat: more perf stuffs
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire authored and daviddias committed Dec 23, 2016
1 parent dbe80cc commit c65e722
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 170 deletions.
18 changes: 8 additions & 10 deletions README.md
Expand Up @@ -104,24 +104,24 @@ pull(

- `cids: CID|[]CID`

Cancel previously requested keys, forcefully. That means they are removed from the
wantlist independent of how many other resources requested these keys. Callbacks
attached to `getBlock` are errored with `Error('manual unwant: key')`.
Cancel previously requested cids, forcefully. That means they are removed from the
wantlist independent of how many other resources requested these cids. Callbacks
attached to `getBlock` are errored with `Error('manual unwant: cid)`.

#### `cancelWants(cids)`

- `cid: CID|[]CID`

Cancel previously requested keys.
Cancel previously requested cids.

#### `putStream()`

Returns a duplex `pull-stream` that emits an object `{key: Multihash}` for every written block when it was stored.
Objects passed into here should be of the form `{data: Buffer, key: Multihash}`
Returns a duplex `pull-stream` that emits an object `{cid: CID}` for every written block when it was stored.
Objects passed into here should be of the form `{data: Buffer, cid: CID}`

#### `put(blockAndCid, callback)`

- `blockAndKey: {data: Buffer, cid: CID}`
- `blockAndCid: {data: Buffer, cid: CID}`
- `callback: Function`

Announce that the current node now has the block containing `data`. This will store it
Expand Down Expand Up @@ -152,9 +152,7 @@ src
│   ├── decision
│   │   ├── engine.js
│   │   ├── index.js
│   │   ├── ledger.js
│   │   ├── peer-request-queue.js
│   │   └── pq.js
│   │   └── ledger.js
│   ├── network # Handles peerSet and open new conns
│   │   └── index.js
│   └── want-manager # Keeps track of all blocks the peer wants (not the others which it is connected)
Expand Down
9 changes: 4 additions & 5 deletions package.json
Expand Up @@ -44,22 +44,22 @@
"idb-pull-blob-store": "^0.5.1",
"interface-pull-blob-store": "^0.6.0",
"ipfs-repo": "^0.11.2",
"libp2p-ipfs-nodejs": "^0.17.0",
"libp2p-ipfs-nodejs": "^0.17.1",
"lodash": "^4.17.2",
"multiaddr": "^2.1.1",
"ncp": "^2.0.0",
"peer-book": "^0.3.0",
"peer-id": "^0.8.0",
"peer-id": "^0.8.1",
"peer-info": "^0.8.1",
"rimraf": "^2.5.4",
"safe-buffer": "^5.0.1"
},
"dependencies": {
"async": "^2.1.4",
"cids": "^0.3.5",
"debug": "^2.4.4",
"debug": "^2.5.1",
"heap": "^0.2.6",
"ipfs-block": "^0.5.3",
"ipfs-block": "^0.5.4",
"lodash.debounce": "^4.0.8",
"lodash.find": "^4.6.0",
"lodash.groupby": "^4.6.0",
Expand All @@ -68,7 +68,6 @@
"lodash.pullallwith": "^4.7.0",
"lodash.uniqwith": "^4.5.0",
"lodash.values": "^4.3.0",
"multihashes": "^0.3.1",
"protocol-buffers": "^3.2.1",
"pull-defer": "^0.2.2",
"pull-length-prefixed": "^1.2.0",
Expand Down
74 changes: 29 additions & 45 deletions src/components/decision-engine/index.js
Expand Up @@ -11,7 +11,6 @@ const find = require('lodash.find')
const values = require('lodash.values')
const groupBy = require('lodash.groupby')
const pullAllWith = require('lodash.pullallwith')
const CID = require('cids')

const log = debug('bitswap:engine')
log.error = debug('bitswap:engine:error')
Expand Down Expand Up @@ -124,18 +123,17 @@ class DecisionEngine {
return
}
// Check all connected peers if they want the block we received
for (let l of this.ledgerMap.values()) {
this.ledgerMap.forEach((ledger) => {
cids
.map((k) => l.wantlistContains(k))
.map((cid) => ledger.wantlistContains(cid))
.filter(Boolean)
.forEach((e) => {
// this.peerRequestQueue.push(e, l.partner)
.forEach((entry) => {
this._tasks.push({
entry: e,
target: l.partner
entry: entry,
target: ledger.partner
})
})
}
})
this._outbox()
}

Expand All @@ -152,30 +150,26 @@ class DecisionEngine {
ledger.wantlist = new Wantlist()
}

this._processBlocks(msg.blocks, ledger, (err) => {
if (err) {
log.error(`failed to process blocks: ${err.message}`)
}
this._processBlocks(msg.blocks, ledger)

if (msg.wantlist.size === 0) {
return cb()
}
if (msg.wantlist.size === 0) {
return cb()
}

let cancels = []
let wants = []
for (let entry of msg.wantlist.values()) {
if (entry.cancel) {
ledger.cancelWant(entry.cid)
cancels.push(entry)
} else {
ledger.wants(entry.cid, entry.priority)
wants.push(entry)
}
let cancels = []
let wants = []
msg.wantlist.forEach((entry) => {
if (entry.cancel) {
ledger.cancelWant(entry.cid)
cancels.push(entry)
} else {
ledger.wants(entry.cid, entry.priority)
wants.push(entry)
}

this._cancelWants(ledger, peerId, cancels)
this._addWants(ledger, peerId, wants, cb)
})

this._cancelWants(ledger, peerId, cancels)
this._addWants(ledger, peerId, wants, cb)
}

_cancelWants (ledger, peerId, entries) {
Expand Down Expand Up @@ -209,24 +203,14 @@ class DecisionEngine {
}

_processBlocks (blocks, ledger, callback) {
map(blocks.values(), (block, cb) => {
block.key((err, key) => {
if (err) {
return cb(err)
}
log('got block (%s bytes)', block.data.length)
ledger.receivedBytes(block.data.length)

cb(null, new CID(key))
})
}, (err, cids) => {
if (err) {
return callback(err)
}

this.receivedBlocks(cids)
callback()
const cids = []
blocks.forEach((b, cidStr) => {
log('got block (%s bytes)', b.block.data.length)
ledger.receivedBytes(b.block.data.length)
cids.push(b.cid)
})

this.receivedBlocks(cids)
}

// Clear up all accounting things after message was sent
Expand Down
143 changes: 62 additions & 81 deletions src/index.js
@@ -1,21 +1,21 @@
'use strict'

const series = require('async/series')
const debug = require('debug')

const log = debug('bitswap')
log.error = debug('bitswap:error')
const waterfall = require('async/waterfall')
const each = require('async/each')
const EventEmitter = require('events').EventEmitter
const pull = require('pull-stream')
const paramap = require('pull-paramap')
const defer = require('pull-defer/source')
const CID = require('cids')
const debug = require('debug')

const CONSTANTS = require('./constants')
const WantManager = require('./components/want-manager')
const Network = require('./components/network')
const DecisionEngine = require('./components/decision-engine')

const log = debug('bitswap')
log.error = debug('bitswap:error')

class Bitswap {
constructor (libp2p, blockstore, peerBook) {
this.libp2p = libp2p
Expand Down Expand Up @@ -46,83 +46,53 @@ class Bitswap {
log('failed to receive message', incoming)
}

const cidsAndBlocks = Array
.from(incoming.blocks.entries())
.map((entry) => {
return { cid: new CID(entry[0]), block: entry[1] }
})

if (cidsAndBlocks.length === 0) {
if (incoming.blocks.size === 0) {
return cb()
}

const cidsAndBlocks = Array.from(incoming.blocks.values())

// quickly send out cancels, reduces chances of duplicate block receives
pull(
pull.values(cidsAndBlocks),
pull.filter((cidAndBlock) => this.wm.wantlist.contains(cidAndBlock.cid)),
pull.collect((err, cidsAndBlocks) => {
if (err) {
return log.error(err)
}
const cids = cidsAndBlocks.map((entry) => entry.cid)
const toCancel = cidsAndBlocks
.filter((b) => this.wm.wantlist.contains(b.cid))
.map((b) => b.cid)

this.wm.cancelWants(cids)
})
)
this.wm.cancelWants(toCancel)

pull(
pull.values(cidsAndBlocks),
paramap(this._handleReceivedBlock.bind(this, peerId), 10),
pull.onEnd(cb)
each(
cidsAndBlocks,
this._handleReceivedBlock.bind(this, peerId),
cb
)
})
}

_handleReceivedBlock (peerId, cidAndBlock, callback) {
series([
(cb) => this._updateReceiveCounters(cidAndBlock.block, (err) => {
if (err) {
// ignore, as these have been handled
// in _updateReceiveCounters
const cid = cidAndBlock.cid
const block = cidAndBlock.block

waterfall([
(cb) => this.blockstore.has(cid.multihash, cb),
(exists, cb) => {
this._updateReceiveCounters(block, exists)
log('got block')

if (exists) {
return cb()
}

log('got block from %s', peerId.toB58String(), cidAndBlock.block.data.length)
cb()
}),
(cb) => {
this.put(cidAndBlock, (err) => {
if (err) {
log.error('receiveMessage put error: %s', err.message)
}
cb()
})
this._putBlockStore(cidAndBlock, cb)
}
], callback)
}

_updateReceiveCounters (block, callback) {
_updateReceiveCounters (block, exists) {
this.blocksRecvd++
block.key((err, key) => {
if (err) {
return callback(err)
}

this.blockstore.has(key, (err, has) => {
if (err) {
log('blockstore.has error: %s', err.message)
return callback(err)
}

if (has) {
this.dupBlocksRecvd ++
this.dupDataRecvd += block.data.length
return callback(new Error('Already have block'))
}

callback()
})
})
if (exists) {
this.dupBlocksRecvd ++
this.dupDataRecvd += block.data.length
}
}

// handle errors on the receiving channel
Expand Down Expand Up @@ -250,24 +220,35 @@ class Bitswap {
})
}),
pull.filter((val) => !val[1]),
pull.map((val) => {
const block = val[0].block
const cid = val[0].cid
log('putting block')
return pull(
pull.values([{
data: block.data,
key: cid.multihash
}]),
this.blockstore.putStream(),
pull.through(() => {
log('put block')
this.notifications.emit(`block:${cid.buffer.toString()}`, block)
this.engine.receivedBlocks([cid])
})
)
}),
pull.flatten()
pull.asyncMap((val, cb) => {
this._putBlockStore(val[0], cb)
})
)
}

_putBlockStore (blockAndCid, callback) {
const block = blockAndCid.block
const cid = blockAndCid.cid
const cidStr = cid.buffer.toString()

log('putting block')

pull(
pull.values([{
data: block.data,
key: cid.multihash
}]),
this.blockstore.putStream(),
pull.collect((err, meta) => {
if (err) {
return callback(err)
}

log('put block')
this.notifications.emit(`block:${cidStr}`, block)
this.engine.receivedBlocks([cid])
callback(null, meta)
})
)
}

Expand Down

0 comments on commit c65e722

Please sign in to comment.