-
Notifications
You must be signed in to change notification settings - Fork 20
Builder refactoring, trickle builder and balanced builder #118
Changes from 7 commits
511c746
0e3f158
6af7458
977f41b
c767848
7854682
df48f69
50c5d35
cbe2ce4
f8b9e80
fedfc30
8e8d3d6
7647657
74482f3
2b92345
01d8583
0036314
02cdefd
8ac163c
e723586
b9a01f8
03f49d4
fedbe5f
180b808
937c292
0f706df
0d3602e
973c483
67fbf87
ff6cce5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -150,6 +150,19 @@ been written into the [DAG Service][]'s storage mechanism. | |
The input's file paths and directory structure will be preserved in the DAG | ||
Nodes. | ||
|
||
### Importer options | ||
|
||
In the second argument of the importer constructor you can specify the following options: | ||
|
||
* `chunker` (string, defaults to `"fixed"`): the chunking strategy. Now only supports `"fixed"` | ||
* `chunkSize` (positive integer, defaults to `262144`): the maximum chunk size for the `fixed` chunker. | ||
* `strategy` (string, defaults to `"balanced"`): the DAG builder strategy name. Supports: | ||
* `flat`: flat list of chunks | ||
* `balanced`: builds a balanced tree | ||
* `trickle`: builds [a trickle tree](https://github.com/ipfs/specs/pull/57#issuecomment-265205384) | ||
* `maxChildrenPerNode` (positive integer, defaults to `172`): the maximum children per node for the `balanced` and `trickle` DAG builder strategies | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now defaults to 174 |
||
* `layerRepeat` (positive integer, defaults to 4): (only applicable to the `trickle` DAG builder strategy). The maximum repetition of parent nodes for each layer of the tree. | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @dignifiedquire wanna push a commit with jsdoc to this PR so that unixfs-engine gets fresh docs with this refactor? Or perhaps just to a PR to the PR, that would be also good :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can merge this as is and we can add the documentation in a later PR, other wise if @pgte wants to take a stab he can start adding this info as jsdoc as well. It doesn't have to be me to do that. |
||
|
||
### Example Exporter | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
'use strict' | ||
|
||
const assert = require('assert') | ||
const pull = require('pull-stream') | ||
const pullWrite = require('pull-write') | ||
const pushable = require('pull-pushable') | ||
const batch = require('pull-batch') | ||
|
||
module.exports = function balancedReduceToRoot (reduce, options) { | ||
const source = pushable() | ||
|
||
const sink = pullWrite( | ||
function (item, cb) { | ||
source.push(item) | ||
cb() | ||
}, | ||
null, | ||
1, | ||
function (end) { | ||
source.end(end) | ||
} | ||
) | ||
|
||
const result = pushable() | ||
|
||
reduceToParents(source, function (err, roots) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please use arrow functions for unnamed functions, to keep consistent with the rest of our code base |
||
if (err) { | ||
result.emit('error', err) | ||
return // early | ||
} | ||
assert.equal(roots.length, 1, 'no root') | ||
result.push(roots[0]) | ||
result.end() | ||
}) | ||
|
||
function reduceToParents (_chunks, callback) { | ||
let chunks = _chunks | ||
if (Array.isArray(chunks)) { | ||
chunks = pull.values(chunks) | ||
} | ||
|
||
pull( | ||
chunks, | ||
batch(options.maxChildrenPerNode), | ||
pull.asyncMap(reduce), | ||
pull.collect(reduced) | ||
) | ||
|
||
function reduced (err, roots) { | ||
if (err) { | ||
callback(err) | ||
} else if (roots.length > 1) { | ||
reduceToParents(roots, callback) | ||
} else { | ||
callback(null, roots) | ||
} | ||
} | ||
} | ||
|
||
return { | ||
sink: sink, | ||
source: result | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
'use strict' | ||
|
||
const balancedReducer = require('./balanced-reducer') | ||
|
||
const defaultOptions = { | ||
maxChildrenPerNode: 172 | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's add a comment with a link to https://github.com/ipfs/go-ipfs/blob/master/importer/helpers/helpers.go#L16-L35 so that we remember where this value comes from |
||
|
||
module.exports = function (reduce, _options) { | ||
const options = Object.assign({}, defaultOptions, _options) | ||
return balancedReducer(reduce, options) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
'use strict' | ||
|
||
const assert = require('assert') | ||
const UnixFS = require('ipfs-unixfs') | ||
const pull = require('pull-stream') | ||
const parallel = require('async/parallel') | ||
const waterfall = require('async/waterfall') | ||
const dagPB = require('ipld-dag-pb') | ||
const CID = require('cids') | ||
|
||
const reduce = require('./reduce') | ||
|
||
const DAGNode = dagPB.DAGNode | ||
|
||
const defaultOptions = { | ||
chunkSize: 262144 | ||
// chunkSize: 26214 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's up with this lonely commented out option? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Was used in an experiment and had forgotten to remove it. Removed now. |
||
} | ||
|
||
module.exports = function (Chunker, ipldResolver, Reducer, _options) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use uppercase variables only for constructors, and if they are constructors call them with |
||
const options = Object.assign({}, defaultOptions, _options) | ||
|
||
return function (source, files) { | ||
return function (items, cb) { | ||
parallel(items.map((item) => (cb) => { | ||
if (!item.content) { | ||
// item is a directory | ||
return createAndStoreDir(item, (err, node) => { | ||
if (err) { | ||
return cb(err) | ||
} | ||
source.push(node) | ||
files.push(node) | ||
cb() | ||
}) | ||
} | ||
|
||
// item is a file | ||
createAndStoreFile(item, (err, node) => { | ||
if (err) { | ||
return cb(err) | ||
} | ||
source.push(node) | ||
files.push(node) | ||
cb() | ||
}) | ||
}), cb) | ||
} | ||
} | ||
|
||
function createAndStoreDir (item, callback) { | ||
// 1. create the empty dir dag node | ||
// 2. write it to the dag store | ||
|
||
const d = new UnixFS('directory') | ||
waterfall([ | ||
(cb) => DAGNode.create(d.marshal(), cb), | ||
(node, cb) => { | ||
ipldResolver.put({ | ||
node: node, | ||
cid: new CID(node.multihash) | ||
}, (err) => cb(err, node)) | ||
} | ||
], (err, node) => { | ||
if (err) { | ||
return callback(err) | ||
} | ||
callback(null, { | ||
path: item.path, | ||
multihash: node.multihash, | ||
size: node.size | ||
}) | ||
}) | ||
} | ||
|
||
function createAndStoreFile (file, callback) { | ||
if (Buffer.isBuffer(file.content)) { | ||
file.content = pull.values([file.content]) | ||
} | ||
|
||
if (typeof file.content !== 'function') { | ||
return callback(new Error('invalid content')) | ||
} | ||
|
||
const reducer = Reducer(reduce(file, ipldResolver), options) | ||
|
||
pull( | ||
file.content, | ||
Chunker(options), | ||
pull.map(chunk => new Buffer(chunk)), | ||
pull.map(buffer => new UnixFS('file', buffer)), | ||
pull.asyncMap((fileNode, callback) => { | ||
DAGNode.create(fileNode.marshal(), (err, node) => { | ||
callback(err, { DAGNode: node, fileNode: fileNode }) | ||
}) | ||
}), | ||
pull.asyncMap((leaf, callback) => { | ||
ipldResolver.put( | ||
{ | ||
node: leaf.DAGNode, | ||
cid: new CID(leaf.DAGNode.multihash) | ||
}, | ||
err => callback(err, leaf) | ||
) | ||
}), | ||
pull.map((leaf) => { | ||
return { | ||
path: file.path, | ||
multihash: leaf.DAGNode.multihash, | ||
size: leaf.DAGNode.size, | ||
leafSize: leaf.fileNode.fileSize(), | ||
name: '' | ||
} | ||
}), | ||
reducer, | ||
pull.collect((err, roots) => { | ||
if (err) { | ||
callback(err) | ||
} else { | ||
assert.equal(roots.length, 1, 'should result in exactly one root') | ||
callback(null, roots[0]) | ||
} | ||
}) | ||
) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
'use strict' | ||
|
||
const pullPushable = require('pull-pushable') | ||
const pullWrite = require('pull-write') | ||
|
||
module.exports = function createBuildStream (createStrategy, ipldResolver, flushTree, options) { | ||
const files = [] | ||
|
||
const source = pullPushable() | ||
|
||
const sink = pullWrite( | ||
createStrategy(source, files), | ||
null, | ||
options.highWaterMark, | ||
(err) => { | ||
if (err) { | ||
return source.end(err) | ||
} | ||
|
||
flushTree(files, ipldResolver, source, () => { | ||
source.end() | ||
}) | ||
} | ||
) | ||
|
||
return { | ||
source: source, | ||
sink: sink | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
'use strict' | ||
|
||
const pull = require('pull-stream') | ||
const pushable = require('pull-pushable') | ||
const pullWrite = require('pull-write') | ||
const batch = require('pull-batch') | ||
|
||
module.exports = function (reduce, options) { | ||
const source = pushable() | ||
const sink = pullWrite( | ||
function (d, cb) { | ||
source.push(d) | ||
cb() | ||
}, | ||
null, | ||
1, | ||
function (err) { | ||
if (err) { | ||
source.emit('error', err) | ||
} else { | ||
source.end() | ||
} | ||
} | ||
) | ||
|
||
const result = pushable() | ||
|
||
pull( | ||
source, | ||
batch(Infinity), | ||
pull.asyncMap(reduce), | ||
pull.collect(function (err, roots) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. arrow function |
||
if (err) { | ||
result.emit('error', err) | ||
return // early | ||
} | ||
result.push(roots[0]) | ||
result.end() | ||
}) | ||
) | ||
|
||
return { | ||
sink: sink, | ||
source: result | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
'use strict' | ||
|
||
const assert = require('assert') | ||
const createBuildStream = require('./create-build-stream') | ||
const Builder = require('./builder') | ||
|
||
const reducers = { | ||
flat: require('./flat'), | ||
balanced: require('./balanced'), | ||
trickle: require('./trickle') | ||
} | ||
|
||
const defaultOptions = { | ||
strategy: 'balanced', | ||
highWaterMark: 100 | ||
} | ||
|
||
module.exports = function (Chunker, ipldResolver, flushTree, _options) { | ||
assert(Chunker, 'Missing chunker creator function') | ||
assert(ipldResolver, 'Missing IPLD Resolver') | ||
assert(flushTree, 'Missing flushTree argument') | ||
|
||
const options = Object.assign({}, defaultOptions, _options) | ||
|
||
const strategyName = options.strategy | ||
const reducer = reducers[strategyName] | ||
assert(reducer, 'Unknown importer build strategy name: ' + strategyName) | ||
|
||
const createStrategy = Builder(Chunker, ipldResolver, reducer, options) | ||
|
||
return createBuildStream(createStrategy, ipldResolver, flushTree, options) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to have a specific
chunkerOptions
object, which has in the case for the fixed chunker an option ofsize
, as for other chunkers this might not make sense as an option.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dignifiedquire Implemented your suggestion.