Skip to content

Commit

Permalink
feat: Peer and Content Routing Support (DHT) - first round
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire authored and daviddias committed Jul 13, 2017
1 parent cbf4a3a commit 667fe9c
Show file tree
Hide file tree
Showing 7 changed files with 261 additions and 134 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,4 @@
"greenkeeperio-bot <support@greenkeeper.io>",
"npmcdn-to-unpkg-bot <npmcdn-to-unpkg-bot@users.noreply.github.com>"
]
}
}
53 changes: 30 additions & 23 deletions src/components/network/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
const debug = require('debug')
const lp = require('pull-length-prefixed')
const pull = require('pull-stream')
const setImmediate = require('async/setImmediate')
const waterfall = require('async/waterfall')
const each = require('async/each')

const Message = require('../../types/message')
const CONSTANTS = require('../../constants')
Expand Down Expand Up @@ -89,34 +90,46 @@ class Network {
if (!this._running) {
return
}

this.bitswap._onPeerConnected(peerInfo.id)
}

_onPeerDisconnect (peerInfo) {
if (!this._running) {
return
}

this.bitswap._onPeerDisconnected(peerInfo.id)
}

// Connect to the given peer
connectTo (peerId, callback) {
const done = (err) => setImmediate(() => callback(err))

if (!this._running) {
return done(new Error('No running network'))
return callback(new Error('No running network'))
}

// NOTE: For now, all this does is ensure that we are
// connected. Once we have Peer Routing, we will be able
// to find the Peer
if (this.libp2p.swarm.muxedConns[peerId.toB58String()]) {
done()
} else {
done(new Error('Could not connect to peer with peerId:', peerId.toB58String()))
}
this.libp2p.dial(peerId, callback)
}

findProviders (cid, maxProviders, callback) {
// TODO
// consider if we want to trickleDown maxProviders, currently this is
// not an exposed option:
// https://github.com/libp2p/js-libp2p-kad-dht/blob/master/src/index.js#L416
this.libp2p.contentRouting.findProviders(cid, CONSTANTS.providerRequestTimeout, callback)
}

findAndConnect (cid, maxProviders, callback) {
waterfall([
(cb) => this.findProviders(cid, maxProviders, cb),
(provs, cb) => each(provs, (p, cb) => this.connectTo(p, cb))
], callback)
}

provide (cid, callback) {
this.libp2p.contentRouting.provide(cid, callback)
}

// Connect to the given peer
// Send the given msg (instance of Message) to the given peer
sendMessage (peerId, msg, callback) {
if (!this._running) {
Expand All @@ -125,14 +138,8 @@ class Network {

const stringId = peerId.toB58String()
log('sendMessage to %s', stringId, msg)
let peerInfo
try {
peerInfo = this.peerBook.get(stringId)
} catch (err) {
return callback(err)
}

this._dialPeer(peerInfo, (err, conn, protocol) => {
this._dialPeer(peerId, (err, conn, protocol) => {
if (err) {
return callback(err)
}
Expand All @@ -157,14 +164,14 @@ class Network {
})
}

_dialPeer (peerInfo, callback) {
_dialPeer (peer, callback) {
// dialByPeerInfo throws if no network is there
try {
// Attempt Bitswap 1.1.0
this.libp2p.dial(peerInfo, BITSWAP110, (err, conn) => {
this.libp2p.dial(peer, BITSWAP110, (err, conn) => {
if (err) {
// Attempt Bitswap 1.0.0
this.libp2p.dial(peerInfo, BITSWAP100, (err, conn) => {
this.libp2p.dial(peer, BITSWAP100, (err, conn) => {
if (err) {
return callback(err)
}
Expand Down
1 change: 1 addition & 0 deletions src/components/want-manager/msg-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ module.exports = class MsgQueue {
log.error('cant connect to peer %s: %s', this.peerId.toB58String(), err.message)
return
}

log('sending message')
this.network.sendMessage(this.peerId, msg, (err) => {
if (err) {
Expand Down
133 changes: 110 additions & 23 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ const reject = require('async/reject')
const each = require('async/each')
const EventEmitter = require('events').EventEmitter
const debug = require('debug')
const series = require('async/series')
const map = require('async/map')
const once = require('once')

const CONSTANTS = require('./constants')
const WantManager = require('./components/want-manager')
Expand Down Expand Up @@ -126,6 +129,12 @@ class Bitswap {
`block:${block.cid.buffer.toString()}`,
block
)
this.network.provide(block.cid, (err) => {
if (err) {
log.error('Failed to provide: %s', err.message)
}
})

this.engine.receivedBlocks([block.cid])
callback()
})
Expand All @@ -150,26 +159,64 @@ class Bitswap {
* @returns {void}
*/
get (cid, callback) {
this.getMany([cid], (err, blocks) => {
if (err) {
return callback(err)
}

if (blocks && blocks.length > 0) {
callback(null, blocks[0])
} else {
// when a unwant happens
callback()
}
})
}

/**
* Fetch a a list of blocks by cid. If the blocks are in the local
* blockstore they are returned, otherwise the blocks are added to the wantlist and returned once another node sends them to us.
*
* @param {Array<CID>} cids
* @param {function(Error, Blocks)} callback
* @returns {void}
*/
getMany (cids, callback) {
callback = once(callback)
const unwantListeners = {}
const blockListeners = {}
const cidStr = cid.buffer.toString()
const unwantEvent = `unwant:${cidStr}`
const blockEvent = `block:${cidStr}`

log('get: %s', cidStr)
const cleanupListener = () => {
const unwantEvent = (c) => `unwant:${c}`
const blockEvent = (c) => `block:${c}`
const retrieved = []
const locals = []
const missing = []

log('getMany', cids.length)
const cleanupListener = (cidStr) => {
if (unwantListeners[cidStr]) {
this.notifications.removeListener(unwantEvent, unwantListeners[cidStr])
this.notifications.removeListener(
unwantEvent(cidStr),
unwantListeners[cidStr]
)
delete unwantListeners[cidStr]
}

if (blockListeners[cidStr]) {
this.notifications.removeListener(blockEvent, blockListeners[cidStr])
this.notifications.removeListener(
blockEvent(cidStr),
blockListeners[cidStr]
)
delete blockListeners[cidStr]
}
}

const addListener = () => {
const addListeners = (cids) => {
cids.forEach((c) => addListener(c))
}

const addListener = (cid) => {
const cidStr = cid.buffer.toString()

unwantListeners[cidStr] = () => {
log(`manual unwant: ${cidStr}`)
cleanupListener()
Expand All @@ -180,26 +227,61 @@ class Bitswap {
blockListeners[cidStr] = (block) => {
this.wm.cancelWants([cid])
cleanupListener(cid)
callback(null, block)
retrieved.push(block)

if (retrieved.length === missing.length) {
finish(callback)
}
}

this.notifications.once(unwantEvent, unwantListeners[cidStr])
this.notifications.once(blockEvent, blockListeners[cidStr])
this.notifications.once(
unwantEvent(cidStr),
unwantListeners[cidStr]
)
this.notifications.once(
blockEvent(cidStr),
blockListeners[cidStr]
)
}

this.blockstore.has(cid, (err, has) => {
if (err) {
return callback(err)
}
const finish = (cb) => {
map(locals, (cid, cb) => {
this.blockstore.get(cid, cb)
}, (err, localBlocks) => {
if (err) {
return callback(err)
}

if (has) {
log('already have block: %s', cidStr)
return this.blockstore.get(cid, callback)
}
callback(null, localBlocks.concat(retrieved))
})
}

addListener()
this.wm.wantBlocks([cid])
})
series([
(cb) => each(cids, (cid, cb) => {
this.blockstore.has(cid, (err, has) => {
if (err) {
return cb(err)
}

if (has) {
locals.push(cid)
} else {
missing.push(cid)
}
cb()
})
}, cb),
(cb) => {
if (missing.length > 0) {
addListeners(missing)
this.wm.wantBlocks(missing)

this.network.findAndConnect(cids[0], CONSTANTS.maxProvidersPerRequest, cb)
} else {
cb()
}
}
], finish)
}

// removes the given cids from the wantlist independent of any ref counts
Expand Down Expand Up @@ -269,6 +351,11 @@ class Bitswap {
block
)
this.engine.receivedBlocks([block.cid])
this.network.provide(block.cid, (err) => {
if (err) {
log.error('Failed to provide: %s', err.message)
}
})
})
cb()
})
Expand Down
Loading

0 comments on commit 667fe9c

Please sign in to comment.