Skip to content
This repository has been archived by the owner on Aug 11, 2021. It is now read-only.

Commit

Permalink
refactor: migrate to pullstreams
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed Sep 9, 2016
1 parent d8df2c9 commit 83a7e9e
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 145 deletions.
34 changes: 25 additions & 9 deletions README.md
Expand Up @@ -13,17 +13,20 @@
## Table of Contents

- [Install](#install)
- [Usage](#usage)
- [API](#api)
- [`resolve`](#resolve)
- [IPLDService](#ipldservice)
- [`.add(node, cb)`](#addnode-cb)
* [Install](#install)
* [Usage](#usage)
* [API](#api)
+ [`resolve`](#resolve)
+ [IPLDService](#ipldservice)
- [`.put(node, cb)`](#putnode-cb)
- [`.putStream([cb])`](#putstreamcb)
- [`.get(multihash, cb)`](#getmultihash-cb)
- [`.getStream(multihash)`](#getstreammultihash)
- [`.getRecursive(multihash, cb)`](#getrecursivemultihash-cb)
- [`.getRecursiveStream(multihash)`](#getrecursivestreammultihash)
- [`.remove(multihash, cb)`](#removemultihash-cb)
- [Contribute](#contribute)
- [License](#license)
* [Contribute](#contribute)
* [License](#license)

## Install

Expand Down Expand Up @@ -64,18 +67,31 @@ ipldService.add(node, (err) => {
### IPLDService
#### `.add(node, cb)`
#### `.put(node, cb)`
> Store the given node (any JavaScript object).
#### `.putStream([cb])`
Returns a sink pull-stream, to write IPLD objects to.
#### `.get(multihash, cb)`
> Retrieve a node by the given `multihash`.
#### `.getStream(multihash)`
Returns a source pull-stream of the requested IPLD object.
#### `.getRecursive(multihash, cb)`
> Retrieve a node by the given `multihash` and all linked nodes.
#### `.getRecursiveStream(multihash)`
Returns a source pull-stream, which emits the requested node, and
all linked nodes.
#### `.remove(multihash, cb)`
> Remove a node by the given `multihash`
Expand Down
28 changes: 15 additions & 13 deletions package.json
Expand Up @@ -34,30 +34,32 @@
"homepage": "https://github.com/ipfs/js-ipfs-ipld#readme",
"license": "MIT",
"devDependencies": {
"aegir": "^5.0.1",
"async": "^2.0.0-rc.5",
"aegir": "^8.0.1",
"async": "^2.0.1",
"buffer-loader": "0.0.1",
"chai": "^3.5.0",
"fs-blob-store": "^5.2.1",
"idb-plus-blob-store": "^1.1.2",
"ipfs-block-service": "^0.4.0",
"ipfs-repo": "^0.8.0",
"lodash": "^4.12.0",
"fs-pull-blob-store": "^0.3.0",
"idb-pull-blob-store": "^0.4.0",
"ipfs-block-service": "^0.5.0",
"ipfs-repo": "^0.9.0",
"lodash": "^4.15.0",
"ncp": "^2.0.0",
"pre-commit": "^1.1.3",
"rimraf": "^2.5.2"
"rimraf": "^2.5.4"
},
"dependencies": {
"babel-runtime": "^6.9.0",
"bs58": "^3.0.0",
"babel-runtime": "^6.11.6",
"ipfs-block": "^0.3.0",
"ipld": "^0.6.0",
"is-ipfs": "^0.2.0",
"lodash.flatten": "^4.2.0",
"lodash.includes": "^4.1.3"
"lodash.flatten": "^4.4.0",
"lodash.includes": "^4.3.0",
"multihashes": "^0.2.2",
"pull-stream": "^3.4.5",
"pull-traverse": "^1.0.3"
},
"contributors": [
"Friedel Ziegelmayer <dignifiedquire@gmail.com>",
"nicola <me@nicola.io>"
]
}
}
147 changes: 78 additions & 69 deletions src/ipld-service.js
Expand Up @@ -3,11 +3,13 @@
const isIPFS = require('is-ipfs')
const Block = require('ipfs-block')
const ipld = require('ipld')
const base58 = require('bs58')
const pull = require('pull-stream')
const traverse = require('pull-traverse')
const mh = require('multihashes')

const utils = require('./utils')

class IPLDService {
module.exports = class IPLDService {
constructor (blockService) {
if (!blockService) {
throw new Error('IPLDService requires a BlockService instance')
Expand All @@ -16,91 +18,98 @@ class IPLDService {
this.bs = blockService
}

add (node, cb) {
if (!(node instanceof Buffer)) {
node = ipld.marshal(node)
}

this.bs.addBlock(new Block(node, 'ipld'), cb)
put (node, cb) {
cb = cb || noop
pull(
pull.values([node]),
this.putStream(cb)
)
}

get (multihash, cb) {
const isMhash = isIPFS.multihash(multihash)
const isPath = isIPFS.path(multihash)

if (!isMhash && !isPath) {
return cb(new Error('Invalid Key'))
}
putStream (cb) {
cb = cb || noop
return pull(
pull.map((node) => {
if (!(node instanceof Buffer)) {
node = ipld.marshal(node)
}

if (isMhash) {
this._getWith(multihash, cb)
}
return new Block(node, 'ipld')
}),
this.bs.putStream(),
pull.onEnd(cb)
)
}

if (isPath) {
const ipfsKey = multihash.replace('/ipfs/', '')
this._getWith(ipfsKey, cb)
}
get (key, cb) {
pull(
this.getStream(key),
pull.collect((err, res) => {
if (err) return cb(err)
cb(null, res[0])
})
)
}

_getWith (key, cb) {
let formatted = key
getStream (key) {
const normalizedKey = normalizeKey(key)

if (typeof key === 'string') {
formatted = new Buffer(base58.decode(key))
if (!normalizedKey) {
return pull.error(new Error('Invalid Key'))
}

this.bs.getBlock(formatted, 'ipld', (err, block) => {
if (err) {
return cb(err)
}

let node

try {
node = ipld.unmarshal(block.data)
} catch (err) {
return cb(err)
}

return cb(null, node)
})
return pull(
this.bs.getStream(normalizedKey, 'ipld'),
pull.map((block) => ipld.unmarshal(block.data))
)
}

getRecursive (multihash, cb) {
const self = this
function getter (multihash, linkStack, nodeStack, cb) {
self.get(multihash, (err, node) => {
if (err && nodeStack.length > 0) {
return cb(new Error('Could not complete the recursive get', nodeStack))
}

if (err) {
return cb(err)
}
getRecursive (key, cb) {
pull(
this.getRecursiveStream(key),
pull.collect(cb)
)
}

nodeStack.push(node)
linkStack = linkStack.concat(utils.getKeys(node))
getRecursiveStream (key) {
return pull(
this.getStream(key),
pull.map((node) => traverse.widthFirst(node, (node) => {
return pull(
pull.values(utils.getKeys(node)),
pull.map((link) => this.getStream(link)),
pull.flatten()
)
})),
pull.flatten()
)
}

const next = linkStack.pop()
remove (keys, cb) {
this.bs.delete(keys, 'ipld', cb)
}
}

if (next) {
return getter(next, linkStack, nodeStack, cb)
}
function noop () {}

cb(null, nodeStack)
})
}
function normalizeKey (key) {
let res
const isMhash = isIPFS.multihash(key)
const isPath = isIPFS.path(key)

getter(multihash, [], [], cb)
if (!isMhash && !isPath) {
return null
}

remove (multihash, cb) {
if (!multihash || !isIPFS.multihash(multihash)) {
return cb(new Error('Invalid multihash'))
}
if (isMhash) {
res = key
} else if (isPath) {
res = key.replace('/ipfs/', '')
}

this.bs.deleteBlock(multihash, 'ipld', cb)
if (typeof res === 'string') {
return mh.fromB58String(res)
}
}

module.exports = IPLDService
return res
}
8 changes: 4 additions & 4 deletions src/resolve.js
Expand Up @@ -8,8 +8,8 @@ const IPLDService = require('./ipld-service')

const LINK_SYMBOL = ipld.LINK_SYMBOL

module.exports = function resolve (is, path, cb) {
if (!(is instanceof IPLDService)) {
module.exports = function resolve (service, path, cb) {
if (!(service instanceof IPLDService)) {
return cb(new Error('Missing IPLDService'))
}

Expand Down Expand Up @@ -40,7 +40,7 @@ module.exports = function resolve (is, path, cb) {
return cb(new Error(`Invalid link: "${link}"`))
}

is.get(blockLink, (err, block) => {
service.get(blockLink, (err, block) => {
if (err) {
return cb(err)
}
Expand All @@ -55,7 +55,7 @@ module.exports = function resolve (is, path, cb) {
if (next === 'ipfs') {
blockLink = parts.shift()
}
is.get(blockLink, (err, block) => {
service.get(blockLink, (err, block) => {
if (err) {
return cb(err)
}
Expand Down
2 changes: 1 addition & 1 deletion src/utils.js
Expand Up @@ -7,7 +7,7 @@ const LINK_SYMBOL = ipld.LINK_SYMBOL

exports = module.exports

// Recursively find all '@link' values in a given node
// Recursively find all LINK_SYMBOL values in a given node
exports.getKeys = (node) => {
return flatten(Object.keys(node).map((key) => {
if (key === LINK_SYMBOL) {
Expand Down
20 changes: 11 additions & 9 deletions test/browser.js
@@ -1,10 +1,11 @@
/* eslint-env mocha */
'use strict'

const async = require('async')
const store = require('idb-plus-blob-store')
const eachSeries = require('async/eachSeries')
const Store = require('idb-pull-blob-store')
const _ = require('lodash')
const IPFSRepo = require('ipfs-repo')
const pull = require('pull-stream')
const repoContext = require.context('buffer!./example-repo', true)

const tests = require('./ipld-tests')
Expand All @@ -29,10 +30,10 @@ describe('ipfs merkle dag browser tests', function () {
})
})

const mainBlob = store('ipfs')
const blocksBlob = store('ipfs/blocks')
const mainBlob = new Store('ipfs')
const blocksBlob = new Store('ipfs/blocks')

async.eachSeries(repoData, (file, cb) => {
eachSeries(repoData, (file, cb) => {
if (_.startsWith(file.key, 'datastore/')) {
return cb()
}
Expand All @@ -41,12 +42,13 @@ describe('ipfs merkle dag browser tests', function () {
const blob = blocks ? blocksBlob : mainBlob
const key = blocks ? file.key.replace(/^blocks\//, '') : file.key

blob.createWriteStream({
key: key
}).end(file.value, cb)
pull(
pull.values([file.value]),
blob.write(key, cb)
)
}, done)
})

const repo = new IPFSRepo('ipfs', {stores: store})
const repo = new IPFSRepo('ipfs', {stores: Store})
tests(repo)
})

0 comments on commit 83a7e9e

Please sign in to comment.