Skip to content

Commit

Permalink
feat(decision-engine): split large block messages
Browse files Browse the repository at this point in the history
also cleanup the network code, to not improperly reuse connections
  • Loading branch information
dignifiedquire committed Feb 21, 2017
1 parent f267826 commit 09d5b2b
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 53 deletions.
35 changes: 33 additions & 2 deletions src/components/decision-engine/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const debug = require('debug')
const pull = require('pull-stream')
const each = require('async/each')
const eachSeries = require('async/eachSeries')
const waterfall = require('async/waterfall')
const map = require('async/map')
const debounce = require('lodash.debounce')
Expand All @@ -19,6 +20,8 @@ const Message = require('../../types/message')
const Wantlist = require('../../types/wantlist')
const Ledger = require('./ledger')

const MAX_MESSAGE_SIZE = 512 * 1024

class DecisionEngine {
constructor (blockstore, network) {
this.blockstore = blockstore
Expand All @@ -35,14 +38,42 @@ class DecisionEngine {
}

_sendBlocks (env, cb) {
// split into messges of max 512 * 1024 bytes
const blocks = env.blocks
const total = blocks.reduce((acc, b) => {
return acc + b.block.data.byteLength
}, 0)

if (total < MAX_MESSAGE_SIZE) {
return this._sendSafeBlocks(env.peer, blocks, cb)
}

let size = 0
let batch = []

eachSeries(blocks, (b, cb) => {
batch.push(b)
size += b.block.data.byteLength

if (size >= MAX_MESSAGE_SIZE) {
const nextBatch = batch.slice()
batch = []
this._sendSafeBlocks(env.peer, nextBatch, cb)
} else {
cb()
}
}, cb)
}

_sendSafeBlocks (peer, blocks, cb) {
const msg = new Message(false)

env.blocks.forEach((b) => {
blocks.forEach((b) => {
msg.addBlock(b.cid, b.block)
})

// console.log('sending %s blocks', msg.blocks.size)
this.network.sendMessage(env.peer, msg, (err) => {
this.network.sendMessage(peer, msg, (err) => {
if (err) {
log('sendblock error: %s', err.message)
}
Expand Down
98 changes: 49 additions & 49 deletions src/components/network/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
const debug = require('debug')
const lp = require('pull-length-prefixed')
const pull = require('pull-stream')
const pushable = require('pull-pushable')
const setImmediate = require('async/setImmediate')

const Message = require('../../types/message')
Expand All @@ -19,7 +18,6 @@ class Network {
this.libp2p = libp2p
this.peerBook = peerBook
this.bitswap = bitswap
this.conns = new Map()
this.b100Only = b100Only || false

// increase event listener max
Expand Down Expand Up @@ -129,69 +127,71 @@ class Network {
}

const stringId = peerId.toB58String()
log('sendMessage to %s', stringId)
log('sendMessage to %s', stringId, msg)
let peerInfo
try {
peerInfo = this.peerBook.getByB58String(stringId)
} catch (err) {
return callback(err)
}

if (this.conns.has(stringId)) {
this.conns.get(stringId)(msg)
return callback()
}

const msgQueue = pushable()

// Attempt Bitswap 1.1.0
this.libp2p.dialByPeerInfo(peerInfo, BITSWAP110, (err, conn) => {
this._dialPeer(peerInfo, (err, conn, protocol) => {
if (err) {
// Attempt Bitswap 1.0.0
this.libp2p.dialByPeerInfo(peerInfo, BITSWAP100, (err, conn) => {
if (err) {
return callback(err)
}
log('dialed %s on Bitswap 1.0.0', peerInfo.id.toB58String())

this.conns.set(stringId, (msg) => {
msgQueue.push(msg.serializeToBitswap100())
})

this.conns.get(stringId)(msg)

withConn(this.conns, conn)
callback()
})
return
return callback(err)
}
log('dialed %s on Bitswap 1.1.0', peerInfo.id.toB58String())

this.conns.set(stringId, (msg) => {
msgQueue.push(msg.serializeToBitswap110())
let serialized
switch (protocol) {
case BITSWAP100:
serialized = msg.serializeToBitswap100()
break
case BITSWAP110:
serialized = msg.serializeToBitswap110()
break
default:
return callback(new Error('Unkown protocol: ' + protocol))
}
writeMessage(conn, serialized, (err) => {
if (err) {
log(err)
}
})

this.conns.get(stringId)(msg)

withConn(this.conns, conn)
callback()
})
}

function withConn (conns, conn) {
pull(
msgQueue,
lp.encode(),
conn,
pull.onEnd((err) => {
if (err) {
log.error(err)
}
msgQueue.end()
conns.delete(stringId)
})
)
_dialPeer (peerInfo, callback) {
// dialByPeerInfo throws if no network is there
try {
// Attempt Bitswap 1.1.0
this.libp2p.dialByPeerInfo(peerInfo, BITSWAP110, (err, conn) => {
if (err) {
// Attempt Bitswap 1.0.0
this.libp2p.dialByPeerInfo(peerInfo, BITSWAP100, (err, conn) => {
if (err) {
return callback(err)
}

callback(null, conn, BITSWAP100)
})
return
}

callback(null, conn, BITSWAP110)
})
} catch (err) {
return callback(err)
}
}
}

function writeMessage (conn, msg, callback) {
pull(
pull.values([msg]),
lp.encode(),
conn,
pull.onEnd(callback)
)
}

module.exports = Network
48 changes: 46 additions & 2 deletions test/components/decision-engine/index-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ function stringifyMessages (messages) {
}

module.exports = (repo) => {
function newEngine (path, done) {
function newEngine (path, done, net) {
parallel([
(cb) => repo.create(path, cb),
(cb) => PeerId.create(cb)
Expand All @@ -38,7 +38,7 @@ module.exports = (repo) => {
return done(err)
}
const blockstore = results[0].blockstore
const engine = new DecisionEngine(blockstore, mockNetwork())
const engine = new DecisionEngine(blockstore, net || mockNetwork())
engine.start()

done(null, { peer: results[1], engine })
Expand Down Expand Up @@ -221,5 +221,49 @@ module.exports = (repo) => {
)
})
})

it('splits large block messages', (done) => {
const data = _.range(10).map((i) => {
const b = new Buffer(1024 * 256)
b.fill(i)
return b
})
const blocks = _.range(10).map((i) => {
return new Block(data[i])
})

const net = mockNetwork(5, (res) => {
expect(res.messages).to.have.length(5)
done()
})

parallel([
(cb) => newEngine('sf', cb, net),
(cb) => map(blocks, (b, cb) => b.key(cb), cb)
], (err, res) => {
expect(err).to.not.exist
const sf = res[0].engine
const cids = res[1].map((c) => new CID(c))
const id = res[0].peer

pull(
pull.values(blocks.map((b, i) => ({
data: b.data, key: cids[i].multihash
}))),
sf.blockstore.putStream(),
pull.onEnd((err) => {
expect(err).to.not.exist
const msg = new Message(false)
cids.forEach((c, i) => {
msg.addEntry(c, Math.pow(2, 32) - 1 - i)
})

sf.messageReceived(id, msg, (err) => {
expect(err).to.not.exist
})
})
)
})
})
})
}

0 comments on commit 09d5b2b

Please sign in to comment.