From 83a7e9ebdeb3e142f1754c204e5c2a3ec156c9fa Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Fri, 9 Sep 2016 11:18:56 +0200 Subject: [PATCH] refactor: migrate to pullstreams --- README.md | 34 +++++++--- package.json | 28 +++++---- src/ipld-service.js | 147 +++++++++++++++++++++++--------------------- src/resolve.js | 8 +-- src/utils.js | 2 +- test/browser.js | 20 +++--- test/ipld-tests.js | 98 +++++++++++++++++------------ test/node.js | 4 +- 8 files changed, 196 insertions(+), 145 deletions(-) diff --git a/README.md b/README.md index 60dd0fc..ab9f493 100644 --- a/README.md +++ b/README.md @@ -13,17 +13,20 @@ ## Table of Contents -- [Install](#install) -- [Usage](#usage) -- [API](#api) - - [`resolve`](#resolve) - - [IPLDService](#ipldservice) - - [`.add(node, cb)`](#addnode-cb) +* [Install](#install) +* [Usage](#usage) +* [API](#api) + + [`resolve`](#resolve) + + [IPLDService](#ipldservice) + - [`.put(node, cb)`](#putnode-cb) + - [`.putStream([cb])`](#putstreamcb) - [`.get(multihash, cb)`](#getmultihash-cb) + - [`.getStream(multihash)`](#getstreammultihash) - [`.getRecursive(multihash, cb)`](#getrecursivemultihash-cb) + - [`.getRecursiveStream(multihash)`](#getrecursivestreammultihash) - [`.remove(multihash, cb)`](#removemultihash-cb) -- [Contribute](#contribute) -- [License](#license) +* [Contribute](#contribute) +* [License](#license) ## Install @@ -64,18 +67,31 @@ ipldService.add(node, (err) => { ### IPLDService -#### `.add(node, cb)` +#### `.put(node, cb)` > Store the given node (any JavaScript object). +#### `.putStream([cb])` + +Returns a sink pull-stream, to write IPLD objects to. + #### `.get(multihash, cb)` > Retrieve a node by the given `multihash`. +#### `.getStream(multihash)` + +Returns a source pull-stream of the requested IPLD object. + #### `.getRecursive(multihash, cb)` > Retrieve a node by the given `multihash` and all linked nodes. +#### `.getRecursiveStream(multihash)` + +Returns a source pull-stream, which emits the requested node, and +all linked nodes. + #### `.remove(multihash, cb)` > Remove a node by the given `multihash` diff --git a/package.json b/package.json index 5ae98c3..08e4b85 100644 --- a/package.json +++ b/package.json @@ -34,30 +34,32 @@ "homepage": "https://github.com/ipfs/js-ipfs-ipld#readme", "license": "MIT", "devDependencies": { - "aegir": "^5.0.1", - "async": "^2.0.0-rc.5", + "aegir": "^8.0.1", + "async": "^2.0.1", "buffer-loader": "0.0.1", "chai": "^3.5.0", - "fs-blob-store": "^5.2.1", - "idb-plus-blob-store": "^1.1.2", - "ipfs-block-service": "^0.4.0", - "ipfs-repo": "^0.8.0", - "lodash": "^4.12.0", + "fs-pull-blob-store": "^0.3.0", + "idb-pull-blob-store": "^0.4.0", + "ipfs-block-service": "^0.5.0", + "ipfs-repo": "^0.9.0", + "lodash": "^4.15.0", "ncp": "^2.0.0", "pre-commit": "^1.1.3", - "rimraf": "^2.5.2" + "rimraf": "^2.5.4" }, "dependencies": { - "babel-runtime": "^6.9.0", - "bs58": "^3.0.0", + "babel-runtime": "^6.11.6", "ipfs-block": "^0.3.0", "ipld": "^0.6.0", "is-ipfs": "^0.2.0", - "lodash.flatten": "^4.2.0", - "lodash.includes": "^4.1.3" + "lodash.flatten": "^4.4.0", + "lodash.includes": "^4.3.0", + "multihashes": "^0.2.2", + "pull-stream": "^3.4.5", + "pull-traverse": "^1.0.3" }, "contributors": [ "Friedel Ziegelmayer ", "nicola " ] -} \ No newline at end of file +} diff --git a/src/ipld-service.js b/src/ipld-service.js index e8eef7a..75f3395 100644 --- a/src/ipld-service.js +++ b/src/ipld-service.js @@ -3,11 +3,13 @@ const isIPFS = require('is-ipfs') const Block = require('ipfs-block') const ipld = require('ipld') -const base58 = require('bs58') +const pull = require('pull-stream') +const traverse = require('pull-traverse') +const mh = require('multihashes') const utils = require('./utils') -class IPLDService { +module.exports = class IPLDService { constructor (blockService) { if (!blockService) { throw new Error('IPLDService requires a BlockService instance') @@ -16,91 +18,98 @@ class IPLDService { this.bs = blockService } - add (node, cb) { - if (!(node instanceof Buffer)) { - node = ipld.marshal(node) - } - - this.bs.addBlock(new Block(node, 'ipld'), cb) + put (node, cb) { + cb = cb || noop + pull( + pull.values([node]), + this.putStream(cb) + ) } - get (multihash, cb) { - const isMhash = isIPFS.multihash(multihash) - const isPath = isIPFS.path(multihash) - - if (!isMhash && !isPath) { - return cb(new Error('Invalid Key')) - } + putStream (cb) { + cb = cb || noop + return pull( + pull.map((node) => { + if (!(node instanceof Buffer)) { + node = ipld.marshal(node) + } - if (isMhash) { - this._getWith(multihash, cb) - } + return new Block(node, 'ipld') + }), + this.bs.putStream(), + pull.onEnd(cb) + ) + } - if (isPath) { - const ipfsKey = multihash.replace('/ipfs/', '') - this._getWith(ipfsKey, cb) - } + get (key, cb) { + pull( + this.getStream(key), + pull.collect((err, res) => { + if (err) return cb(err) + cb(null, res[0]) + }) + ) } - _getWith (key, cb) { - let formatted = key + getStream (key) { + const normalizedKey = normalizeKey(key) - if (typeof key === 'string') { - formatted = new Buffer(base58.decode(key)) + if (!normalizedKey) { + return pull.error(new Error('Invalid Key')) } - this.bs.getBlock(formatted, 'ipld', (err, block) => { - if (err) { - return cb(err) - } - - let node - - try { - node = ipld.unmarshal(block.data) - } catch (err) { - return cb(err) - } - - return cb(null, node) - }) + return pull( + this.bs.getStream(normalizedKey, 'ipld'), + pull.map((block) => ipld.unmarshal(block.data)) + ) } - getRecursive (multihash, cb) { - const self = this - function getter (multihash, linkStack, nodeStack, cb) { - self.get(multihash, (err, node) => { - if (err && nodeStack.length > 0) { - return cb(new Error('Could not complete the recursive get', nodeStack)) - } - - if (err) { - return cb(err) - } + getRecursive (key, cb) { + pull( + this.getRecursiveStream(key), + pull.collect(cb) + ) + } - nodeStack.push(node) - linkStack = linkStack.concat(utils.getKeys(node)) + getRecursiveStream (key) { + return pull( + this.getStream(key), + pull.map((node) => traverse.widthFirst(node, (node) => { + return pull( + pull.values(utils.getKeys(node)), + pull.map((link) => this.getStream(link)), + pull.flatten() + ) + })), + pull.flatten() + ) + } - const next = linkStack.pop() + remove (keys, cb) { + this.bs.delete(keys, 'ipld', cb) + } +} - if (next) { - return getter(next, linkStack, nodeStack, cb) - } +function noop () {} - cb(null, nodeStack) - }) - } +function normalizeKey (key) { + let res + const isMhash = isIPFS.multihash(key) + const isPath = isIPFS.path(key) - getter(multihash, [], [], cb) + if (!isMhash && !isPath) { + return null } - remove (multihash, cb) { - if (!multihash || !isIPFS.multihash(multihash)) { - return cb(new Error('Invalid multihash')) - } + if (isMhash) { + res = key + } else if (isPath) { + res = key.replace('/ipfs/', '') + } - this.bs.deleteBlock(multihash, 'ipld', cb) + if (typeof res === 'string') { + return mh.fromB58String(res) } -} -module.exports = IPLDService + return res +} diff --git a/src/resolve.js b/src/resolve.js index dbecb96..581c6dc 100644 --- a/src/resolve.js +++ b/src/resolve.js @@ -8,8 +8,8 @@ const IPLDService = require('./ipld-service') const LINK_SYMBOL = ipld.LINK_SYMBOL -module.exports = function resolve (is, path, cb) { - if (!(is instanceof IPLDService)) { +module.exports = function resolve (service, path, cb) { + if (!(service instanceof IPLDService)) { return cb(new Error('Missing IPLDService')) } @@ -40,7 +40,7 @@ module.exports = function resolve (is, path, cb) { return cb(new Error(`Invalid link: "${link}"`)) } - is.get(blockLink, (err, block) => { + service.get(blockLink, (err, block) => { if (err) { return cb(err) } @@ -55,7 +55,7 @@ module.exports = function resolve (is, path, cb) { if (next === 'ipfs') { blockLink = parts.shift() } - is.get(blockLink, (err, block) => { + service.get(blockLink, (err, block) => { if (err) { return cb(err) } diff --git a/src/utils.js b/src/utils.js index dfdeb03..f28266d 100644 --- a/src/utils.js +++ b/src/utils.js @@ -7,7 +7,7 @@ const LINK_SYMBOL = ipld.LINK_SYMBOL exports = module.exports -// Recursively find all '@link' values in a given node +// Recursively find all LINK_SYMBOL values in a given node exports.getKeys = (node) => { return flatten(Object.keys(node).map((key) => { if (key === LINK_SYMBOL) { diff --git a/test/browser.js b/test/browser.js index d2109ad..02576cf 100644 --- a/test/browser.js +++ b/test/browser.js @@ -1,10 +1,11 @@ /* eslint-env mocha */ 'use strict' -const async = require('async') -const store = require('idb-plus-blob-store') +const eachSeries = require('async/eachSeries') +const Store = require('idb-pull-blob-store') const _ = require('lodash') const IPFSRepo = require('ipfs-repo') +const pull = require('pull-stream') const repoContext = require.context('buffer!./example-repo', true) const tests = require('./ipld-tests') @@ -29,10 +30,10 @@ describe('ipfs merkle dag browser tests', function () { }) }) - const mainBlob = store('ipfs') - const blocksBlob = store('ipfs/blocks') + const mainBlob = new Store('ipfs') + const blocksBlob = new Store('ipfs/blocks') - async.eachSeries(repoData, (file, cb) => { + eachSeries(repoData, (file, cb) => { if (_.startsWith(file.key, 'datastore/')) { return cb() } @@ -41,12 +42,13 @@ describe('ipfs merkle dag browser tests', function () { const blob = blocks ? blocksBlob : mainBlob const key = blocks ? file.key.replace(/^blocks\//, '') : file.key - blob.createWriteStream({ - key: key - }).end(file.value, cb) + pull( + pull.values([file.value]), + blob.write(key, cb) + ) }, done) }) - const repo = new IPFSRepo('ipfs', {stores: store}) + const repo = new IPFSRepo('ipfs', {stores: Store}) tests(repo) }) diff --git a/test/ipld-tests.js b/test/ipld-tests.js index c5df36d..ed7fbac 100644 --- a/test/ipld-tests.js +++ b/test/ipld-tests.js @@ -5,7 +5,8 @@ const expect = require('chai').expect const BlockService = require('ipfs-block-service') const ipld = require('ipld') const multihash = require('multihashing') -const async = require('async') +const series = require('async/series') +const pull = require('pull-stream') const IPLDService = require('../src').IPLDService const resolve = require('../src').resolve @@ -25,10 +26,14 @@ module.exports = (repo) => { size: 11 } - ipldService.add(node, (err) => { - expect(err).to.not.exist - done() - }) + ipldService.put(node, done) + }) + + it('putStream', (done) => { + pull( + pull.values([{name: 'pull.txt', size: 12}]), + ipldService.putStream(done) + ) }) it('gets an ipld node', (done) => { @@ -37,7 +42,7 @@ module.exports = (repo) => { size: 11 } - ipldService.add(node, (err) => { + ipldService.put(node, (err) => { expect(err).to.not.exist const mh = multihash(ipld.marshal(node), 'sha2-256') @@ -50,6 +55,30 @@ module.exports = (repo) => { }) }) + it('getStream', (done) => { + const node = { + name: 'put.txt', + size: 15 + } + const mh = multihash(ipld.marshal(node), 'sha2-256') + pull( + pull.values([node]), + ipldService.putStream(read) + ) + + function read (err) { + expect(err).to.not.exist + pull( + ipldService.getStream(mh), + pull.collect((err, res) => { + expect(err).to.not.exist + expect(res[0]).to.be.eql(node) + done() + }) + ) + } + }) + it('get ipld nodes recursively', (done) => { // 1 -> 2 -> 3 const node1 = {data: '1'} @@ -64,15 +93,15 @@ module.exports = (repo) => { '/': ipld.multihash(ipld.marshal(node2)) } - async.series([ - (cb) => ipldService.add(node1, cb), - (cb) => ipldService.add(node2, cb), - (cb) => ipldService.add(node3, cb), + series([ + (cb) => ipldService.put(node1, cb), + (cb) => ipldService.put(node2, cb), + (cb) => ipldService.put(node3, cb), (cb) => { const mh = multihash(ipld.marshal(node1), 'sha2-256') ipldService.getRecursive(mh, (err, nodes) => { expect(err).to.not.exist - expect(nodes.length).to.equal(3) + expect(nodes).to.have.length(3) cb() }) } @@ -84,24 +113,17 @@ module.exports = (repo) => { it('removes and ipld node', (done) => { const node = {data: 'short lived node'} - - ipldService.add(node, (err) => { - expect(err).to.not.exist - const mh = multihash(ipld.marshal(node), 'sha2-256') - - ipldService.get(mh, (err, fetchedNode) => { - expect(err).to.not.exist - - ipldService.remove(mh, (err) => { - expect(err).to.not.exist - - ipldService.get(mh, (err) => { - expect(err).to.exist - done() - }) - }) + const mh = multihash(ipld.marshal(node), 'sha2-256') + + series([ + (cb) => ipldService.put(node, cb), + (cb) => ipldService.get(mh, cb), + (cb) => ipldService.remove(mh, cb), + (cb) => ipldService.get(mh, (err) => { + expect(err).to.exist + cb() }) - }) + ], done) }) }) @@ -126,7 +148,7 @@ module.exports = (repo) => { const mh = ipld.multihash(ipld.marshal(node)) before((done) => { - ipldService.add(node, done) + ipldService.put(node, done) }) it('resolves direct leaves of type string', (done) => { @@ -199,10 +221,10 @@ module.exports = (repo) => { const mh = ipld.multihash(ipld.marshal(alice)) before((done) => { - async.series([ - (cb) => ipldService.add(aliceAbout, cb), - (cb) => ipldService.add(alice, cb), - (cb) => ipldService.add(bob, cb) + series([ + (cb) => ipldService.put(aliceAbout, cb), + (cb) => ipldService.put(alice, cb), + (cb) => ipldService.put(bob, cb) ], done) }) @@ -266,11 +288,11 @@ module.exports = (repo) => { const mh = ipld.multihash(ipld.marshal(blogpost)) before((done) => { - async.series([ - (cb) => ipldService.add(draft, cb), - (cb) => ipldService.add(alice, cb), - (cb) => ipldService.add(author, cb), - (cb) => ipldService.add(blogpost, cb) + series([ + (cb) => ipldService.put(draft, cb), + (cb) => ipldService.put(alice, cb), + (cb) => ipldService.put(author, cb), + (cb) => ipldService.put(blogpost, cb) ], done) }) diff --git a/test/node.js b/test/node.js index 750b017..d316e9d 100644 --- a/test/node.js +++ b/test/node.js @@ -5,7 +5,7 @@ const ncp = require('ncp').ncp const rimraf = require('rimraf') const expect = require('chai').expect const IPFSRepo = require('ipfs-repo') -const fsb = require('fs-blob-store') +const Store = require('fs-pull-blob-store') const tests = require('./ipld-tests') @@ -28,7 +28,7 @@ describe('node test blocks', () => { }) }) - const repo = new IPFSRepo(repoTests, {stores: fsb}) + const repo = new IPFSRepo(repoTests, {stores: Store}) tests(repo) })