Skip to content

Commit

Permalink
feat: upgrade to new block api and async crypto
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed Nov 11, 2016
1 parent 2fcc538 commit 540af9f
Show file tree
Hide file tree
Showing 24 changed files with 1,246 additions and 813 deletions.
19 changes: 0 additions & 19 deletions .aegir.js

This file was deleted.

3 changes: 1 addition & 2 deletions .gitignore
Expand Up @@ -27,6 +27,5 @@ build/Release
node_modules

dist
lib

test/test-repo-for*
test/test-repo-for*
24 changes: 18 additions & 6 deletions .travis.yml
@@ -1,8 +1,15 @@
sudo: false
language: node_js
node_js:
- 4
- stable
matrix:
include:
- node_js: 4
env: CXX=g++-4.8
- node_js: 6
env:
- SAUCE=true
- CXX=g++-4.8
- node_js: stable
env: CXX=g++-4.8

# Make sure we have new NPM.
before_install:
Expand All @@ -13,12 +20,17 @@ script:
- npm test
- npm run coverage

addons:
firefox: 'latest'

before_script:
- export DISPLAY=:99.0
- sh -e /etc/init.d/xvfb start

after_success:
- npm run coverage-publish

addons:
firefox: 'latest'
apt:
sources:
- ubuntu-toolchain-r-test
packages:
- g++-4.8
7 changes: 4 additions & 3 deletions API.md
Expand Up @@ -59,13 +59,14 @@ Cancel previously requested keys.
### `putStream()`

Returns a duplex `pull-stream` that emits an object `{key: Multihash}` for every written block when it was stored.
Objects passed into here should be of the form `{data: Buffer, key: Multihash}`

### `put(block, cb)`
### `put(blockAndKey, cb)`

- `block: IpfsBlock`
- `blockAndKey: {data: Buffer, key: Multihash}`
- `cb: Function`

Announce that the current node now has the `block`. This will store it
Announce that the current node now has the block containing `data`. This will store it
in the local database and attempt to serve it to all peers that are known
to have requested it. The callback is called when we are sure that the block
is stored.
Expand Down
4 changes: 4 additions & 0 deletions README.md
Expand Up @@ -8,6 +8,10 @@
[![Travis CI](https://travis-ci.org/ipfs/js-ipfs-bitswap.svg?branch=master)](https://travis-ci.org/ipfs/js-ipfs-bitswap)
[![Circle CI](https://circleci.com/gh/ipfs/js-ipfs-bitswap.svg?style=svg)](https://circleci.com/gh/ipfs/js-ipfs-bitswap)
[![Dependency Status](https://david-dm.org/ipfs/js-ipfs-bitswap.svg?style=flat-square)](https://david-dm.org/ipfs/js-ipfs-bitswap) [![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat-square)](https://github.com/feross/standard)
![](https://img.shields.io/badge/npm-%3E%3D3.0.0-orange.svg?style=flat-square)
![](https://img.shields.io/badge/Node.js-%3E%3D4.0.0-orange.svg?style=flat-square)

[![Sauce Test Status](https://saucelabs.com/browser-matrix/js-ipfs-bitswap.svg)](https://saucelabs.com/u/js-ipfs-bitswap)

> Node.js implementation of the Bitswap 'data exchange' protocol used by IPFS
Expand Down
39 changes: 21 additions & 18 deletions package.json
Expand Up @@ -2,8 +2,10 @@
"name": "ipfs-bitswap",
"version": "0.7.1",
"description": "Node.js implementation of the Bitswap data exchange protocol used by IPFS",
"main": "lib/index.js",
"jsnext:main": "src/index.js",
"main": "src/index.js",
"browser": {
"libp2p-ipfs": false
},
"scripts": {
"test": "aegir-test",
"test:browser": "aegir-test browser",
Expand Down Expand Up @@ -33,38 +35,39 @@
},
"homepage": "https://github.com/ipfs/js-ipfs-bitswap#readme",
"devDependencies": {
"aegir": "^8.0.1",
"aegir": "^9.1.1",
"buffer-loader": "0.0.1",
"chai": "^3.5.0",
"fs-pull-blob-store": "^0.4.1",
"idb-pull-blob-store": "^0.4.0",
"interface-pull-blob-store": "^0.5.0",
"ipfs-repo": "^0.9.0",
"libp2p-ipfs": "^0.14.1",
"lodash": "^4.16.2",
"idb-pull-blob-store": "^0.5.1",
"interface-pull-blob-store": "^0.6.0",
"ipfs-repo": "^0.11.1",
"libp2p-ipfs": "^0.15.0",
"lodash": "^4.16.6",
"multiaddr": "^2.0.3",
"ncp": "^2.0.0",
"peer-book": "^0.3.0",
"peer-id": "^0.7.0",
"peer-info": "^0.7.1",
"peer-id": "^0.8.0",
"peer-info": "^0.8.0",
"rimraf": "^2.5.4",
"safe-buffer": "^5.0.1"
},
"dependencies": {
"async": "^2.1.0",
"debug": "^2.3.1",
"async": "^2.1.2",
"cids": "^0.2.0",
"debug": "^2.3.2",
"heap": "^0.2.6",
"ipfs-block": "^0.3.0",
"ipfs-block": "^0.5.0",
"lodash.debounce": "^4.0.8",
"lodash.isequalwith": "^4.4.0",
"lodash.isundefined": "^3.0.1",
"multihashes": "^0.2.2",
"protocol-buffers": "^3.1.6",
"protocol-buffers": "^3.1.8",
"pull-defer": "^0.2.2",
"pull-generate": "^2.2.0",
"pull-length-prefixed": "^1.2.0",
"pull-paramap": "^1.1.6",
"pull-paramap": "^1.2.0",
"pull-pushable": "^2.0.1",
"pull-stream": "^3.4.5"
"pull-stream": "^3.5.0"
},
"contributors": [
"David Dias <daviddias.p@gmail.com>",
Expand All @@ -74,4 +77,4 @@
"greenkeeperio-bot <support@greenkeeper.io>",
"npmcdn-to-unpkg-bot <npmcdn-to-unpkg-bot@users.noreply.github.com>"
]
}
}
136 changes: 77 additions & 59 deletions src/decision/engine.js
Expand Up @@ -3,7 +3,10 @@
const debug = require('debug')
const mh = require('multihashes')
const pull = require('pull-stream')
const generate = require('pull-generate')
const whilst = require('async/whilst')
const setImmediate = require('async/setImmediate')
const each = require('async/each')
const debounce = require('lodash.debounce')

const log = debug('bitswap:engine')
log.error = debug('bitswap:engine:error')
Expand All @@ -26,37 +29,45 @@ module.exports = class Engine {
this.peerRequestQueue = new PeerRequestQueue()

this._running = false

this._outbox = debounce(this._outboxExec.bind(this), 100)
}

_sendBlock (env, cb) {
const msg = new Message(false)
msg.addBlock(env.block)

log('Sending block to %s', env.peer.toB58String(), env.block.data.toString())

this.network.sendMessage(env.peer, msg, (err) => {
msg.addBlock(env.block, (err) => {
if (err) {
log('sendblock error: %s', err.message)
return cb(err)
}
cb(null, 'done')

log('Sending block to %s', env.peer.toB58String(), env.block.data.toString())

this.network.sendMessage(env.peer, msg, (err) => {
if (err) {
log('sendblock error: %s', err.message)
}
cb(null, 'done')
})
})
}

_outbox () {
if (!this._running) return
_outboxExec () {
let nextTask
log('outbox')

const doIt = (cb) => pull(
generate(null, (state, cb) => {
log('generating', this._running)
whilst(
() => {
if (!this._running) {
return cb(true)
return
}

const nextTask = this.peerRequestQueue.pop()
nextTask = this.peerRequestQueue.pop()
log('check', this._running && nextTask)
return Boolean(nextTask)
},
(next) => {
log('generating')
log('got task', nextTask)
if (!nextTask) {
return cb(true)
}

pull(
this.blockstore.getStream(nextTask.entry.key),
Expand All @@ -65,31 +76,20 @@ module.exports = class Engine {
const block = blocks[0]
if (err || !block) {
nextTask.done()
return cb(null, false)
return next()
}

cb(null, {
this._sendBlock({
peer: nextTask.target,
block: block,
sent: () => {
sent () {
nextTask.done()
}
})
}, next)
})
)
}),
pull.filter(Boolean),
pull.asyncMap(this._sendBlock.bind(this)),
pull.onEnd(cb)
}
)

if (!this._timer) {
this._timer = setTimeout(() => {
doIt(() => {
this._timer = null
})
}, 50)
}
}

wantlistForPeer (peerId) {
Expand Down Expand Up @@ -118,20 +118,25 @@ module.exports = class Engine {
ledger.wantlist = new Wantlist()
}

this._processBlocks(msg.blocks, ledger)
log('wantlist', Array.from(msg.wantlist.values()).map((e) => e.toString()))

pull(
pull.values(Array.from(msg.wantlist.values())),
pull.asyncMap((entry, cb) => {
this._processWantlist(ledger, peerId, entry, cb)
}),
pull.onEnd((err) => {
if (err) return cb(err)
this._outbox()
cb()
})
)
this._processBlocks(msg.blocks, ledger, (err) => {
if (err) {
log.error(`failed to process blocks: ${err.message}`)
}

log('wantlist', Array.from(msg.wantlist.values()).map((e) => e.toString()))

pull(
pull.values(Array.from(msg.wantlist.values())),
pull.asyncMap((entry, cb) => {
this._processWantlist(ledger, peerId, entry, cb)
}),
pull.onEnd((err) => {
if (err) return cb(err)
this._outbox()
cb()
})
)
})
}

receivedBlock (key) {
Expand Down Expand Up @@ -173,23 +178,36 @@ module.exports = class Engine {
}
}

_processBlocks (blocks, ledger) {
for (let block of blocks.values()) {
log('got block %s (%s bytes)', mh.toB58String(block.key), block.data.length)
ledger.receivedBytes(block.data.length)
_processBlocks (blocks, ledger, callback) {
each(blocks.values(), (block, cb) => {
block.key((err, key) => {
if (err) {
return cb(err)
}
log('got block %s (%s bytes)', mh.toB58String(key), block.data.length)
ledger.receivedBytes(block.data.length)

this.receivedBlock(block.key)
}
this.receivedBlock(key)
cb()
})
}, callback)
}

// Clear up all accounting things after message was sent
messageSent (peerId, msg) {
messageSent (peerId, msg, callback) {
const ledger = this._findOrCreate(peerId)
for (let block of msg.blocks.values()) {
each(msg.blocks.values(), (block, cb) => {
ledger.sentBytes(block.data.length)
ledger.wantlist.remove(block.key)
this.peerRequestQueue.remove(block.key, peerId)
}
block.key((err, key) => {
if (err) {
return cb(err)
}

ledger.wantlist.remove(key)
this.peerRequestQueue.remove(key, peerId)
cb()
})
}, callback)
}

numBytesSentTo (peerId) {
Expand Down

0 comments on commit 540af9f

Please sign in to comment.