Skip to content

Commit

Permalink
feat(decision): use ipfs-repo interface
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed Apr 27, 2016
1 parent 5e20682 commit abec856
Show file tree
Hide file tree
Showing 15 changed files with 449 additions and 213 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Expand Up @@ -27,4 +27,6 @@ build/Release
node_modules

dist
lib
lib

test/test-repo-for*
12 changes: 10 additions & 2 deletions package.json
Expand Up @@ -32,14 +32,22 @@
"homepage": "https://github.com/ipfs/js-ipfs-bitswap#readme",
"devDependencies": {
"aegir": "^2.1.2",
"buffer-loader": "0.0.1",
"chai": "^3.5.0",
"fs-blob-store": "^5.2.1",
"idb-plus-blob-store": "^1.1.2",
"ipfs-repo": "^0.7.0",
"lodash": "^4.11.1",
"peer-id": "^0.6.6"
"ncp": "^2.0.0",
"peer-id": "^0.6.6",
"rimraf": "^2.5.2"
},
"dependencies": {
"async": "^2.0.0-rc.3",
"debug": "^2.2.0",
"heap": "^0.2.6",
"ipfs-blocks": "^0.2.3",
"highland": "^3.0.0-beta.1",
"ipfs-block": "^0.2.0",
"lodash.isundefined": "^3.0.1",
"protocol-buffers": "^3.1.6"
},
Expand Down
91 changes: 56 additions & 35 deletions src/decision/engine.js
@@ -1,6 +1,9 @@
'use strict'

const debug = require('debug')
const _ = require('highland')
const async = require('async')

const log = debug('engine')
log.error = debug('engine:error')

Expand All @@ -9,8 +12,8 @@ const PeerRequestQueue = require('./peer-request-queue')
const Ledger = require('./ledger')

module.exports = class Engine {
constructor (blockStore) {
this.blockStore = blockStore
constructor (datastore) {
this.datastore = datastore

// A list of of ledgers by their partner id
this.ledgerMap = new Map()
Expand All @@ -20,35 +23,35 @@ module.exports = class Engine {
this.peerRequestQueue = new PeerRequestQueue()

// Can't declare generator functions regularly
this.outbox = function * () {
// eslint-disable-next-line
while (true) {
const nextTask = this.peerRequestQueue.pop()
if (!nextTask) break

const block = this.blockStore.get(nextTask.entry.key)
if (!block) {
this.outbox = _((push, next) => {
const nextTask = this.peerRequestQueue.pop()

if (!nextTask) return push(null, _.nil)

this.datastore.get(nextTask.entry.key, (err, block) => {
if (err || !block) {
nextTask.done()
continue
} else {
push(null, {
peer: nextTask.target,
block: block,
sent: () => {
nextTask.done()
}
})
}

yield {
peer: nextTask.target,
block: block,
sent: () => {
nextTask.done()
}
}
}
}
next()
})
})
}

peers () {
return Array.from(this.ledgerMap.values()).map((l) => l.partner)
}

// Handle incoming messages
messageReceived (peerId, msg) {
messageReceived (peerId, msg, cb) {
if (msg.empty) {
log('received empty message from %s', peerId)
}
Expand All @@ -60,24 +63,42 @@ module.exports = class Engine {
ledger.wantlist = new Wantlist()
}

for (let entry of msg.wantlist.values()) {
const key = entry.entry.key
if (entry.cancel) {
log('cancel %s', key)
ledger.cancelWant(key)
this.peerRequestQueue.remove(key, peerId)
} else {
log('wants %s - %s', key, entry.entry.priority)
ledger.wants(key, entry.entry.priority)

// If we already have the block, serve it
if (this.blockStore.has(key)) {
this._processBlocks(msg.blocks, ledger)

async.eachSeries(
msg.wantlist.values(),
this._processWantlist.bind(this, ledger, peerId),
(err) => {
async.setImmediate(() => cb(err))
})
}

_processWantlist (ledger, peerId, entry, cb) {
const key = entry.entry.key

if (entry.cancel) {
log('cancel %s', key)
ledger.cancelWant(key)
this.peerRequestQueue.remove(key, peerId)
async.setImmediate(() => cb())
} else {
log('wants %s - %s', key, entry.entry.priority)
ledger.wants(key, entry.entry.priority)

// If we already have the block, serve it
this.datastore.has(key, (err, exists) => {
if (err) {
log('failed existence check %s', key)
} else {
this.peerRequestQueue.push(entry.entry, peerId)
}
}
cb()
})
}
}

for (let block of msg.blocks.values()) {
_processBlocks (blocks, ledger) {
for (let block of blocks.values()) {
log('got block %s %s bytes', block.key, block.data.length)
ledger.receivedBytes(block.data.length)

Expand Down
3 changes: 1 addition & 2 deletions src/decision/peer-request-queue.js
Expand Up @@ -112,7 +112,6 @@ module.exports = class PeerRequestQueue {
// Remove a task from the queue
remove (key, peerId) {
const t = this.taskMap.get(taskKey(peerId, key))

if (t) {
// remove the task "lazily"
// simply mark it as trash, so it'll be dropped when popped off the
Expand All @@ -126,7 +125,7 @@ module.exports = class PeerRequestQueue {
}

function taskKey (peerId, key) {
return `${peerId.toHexString()}${key.toString('hex')}`
return `${peerId.toHexString()}:${key.toString('hex')}`
}

function partnerCompare (a, b) {
Expand Down
2 changes: 1 addition & 1 deletion src/index.js
Expand Up @@ -5,7 +5,7 @@ const log = debug('bitswap')
log.error = debug('bitswap:error')

// const cs = require('./constants')
const WantManager = require('./want-manager')
const WantManager = require('./wantmanager')
const Network = require('./network')
const decision = require('./decision')

Expand Down
2 changes: 1 addition & 1 deletion src/message/index.js
Expand Up @@ -2,7 +2,7 @@

const protobuf = require('protocol-buffers')
const fs = require('fs')
const Block = require('ipfs-blocks').Block
const Block = require('ipfs-block')
const path = require('path')

const WantlistEntry = require('../wantlist/entry')
Expand Down
62 changes: 62 additions & 0 deletions test/browser.js
@@ -0,0 +1,62 @@
'use strict'

const async = require('async')
const store = require('idb-plus-blob-store')
const _ = require('lodash')
const IPFSRepo = require('ipfs-repo')

const repoContext = require.context('buffer!./test-repo', true)

const idb = window.indexedDB ||
window.mozIndexedDB ||
window.webkitIndexedDB ||
window.msIndexedDB

// book keeping
const dbs = []

function createRepo (id, done) {
const repoData = []
repoContext.keys().forEach(function (key) {
repoData.push({
key: key.replace('./', ''),
value: repoContext(key)
})
})

const mainBlob = store(id)
const blocksBlob = store(`${id}/blocks`)

dbs.push(id)

async.eachSeries(repoData, (file, cb) => {
if (_.startsWith(file.key, 'datastore/')) {
return cb()
}

const blocks = _.startsWith(file.key, 'blocks/')
const blob = blocks ? blocksBlob : mainBlob

const key = blocks ? file.key.replace(/^blocks\//, '') : file.key

blob.createWriteStream({
key: key
}).end(file.value, cb)
}, () => {
const repo = new IPFSRepo(id, {stores: store})
done(null, repo)
})
}

function removeRepos (done) {
dbs.forEach((db) => {
idb.deleteDatabase(db)
idb.deleteDatabase(`${db}/blocks`)
})
done()
}

require('./decision/engine-test')({
create: createRepo,
remove: removeRepos
})

0 comments on commit abec856

Please sign in to comment.