diff --git a/package.json b/package.json index 64ce31d7..9547d625 100644 --- a/package.json +++ b/package.json @@ -59,8 +59,8 @@ "bs58": "^4.0.1", "cids": "~0.5.3", "deep-extend": "~0.6.0", - "ipfs-unixfs": "~0.1.14", - "ipld": "^0.17.2", + "ipfs-unixfs": "~0.1.15", + "ipld": "~0.17.2", "ipld-dag-pb": "~0.14.4", "left-pad": "^1.3.0", "lodash": "^4.17.10", diff --git a/src/exporter/file.js b/src/exporter/file.js index 6a0d801f..0da35e8c 100644 --- a/src/exporter/file.js +++ b/src/exporter/file.js @@ -30,7 +30,7 @@ module.exports = (node, name, path, pathRest, resolve, size, dag, parent, depth, } if (length === 0) { - return pull.empty() + return pull.once(Buffer.alloc(0)) } if (!offset) { @@ -56,36 +56,37 @@ module.exports = (node, name, path, pathRest, resolve, size, dag, parent, depth, function streamBytes (dag, node, fileSize, offset, length) { if (offset === fileSize || length === 0) { - return pull.empty() + return pull.once(Buffer.alloc(0)) } const end = offset + length - let streamPosition = 0 function getData ({ node, start }) { - if (!node || !node.data) { - return - } - try { const file = UnixFS.unmarshal(node.data) if (!file.data) { - return + return Buffer.alloc(0) } - const block = extractDataFromBlock(file.data, start, offset, end) - - streamPosition += block.length - - return block + return extractDataFromBlock(file.data, start, offset, end) } catch (error) { throw new Error(`Failed to unmarshal node - ${error.message}`) } } + // as we step through the children, keep track of where we are in the stream + // so we can filter out nodes we're not interested in + let streamPosition = 0 + function visitor ({ node }) { const file = UnixFS.unmarshal(node.data) + const nodeHasData = Boolean(file.data && file.data.length) + + // handle case where data is present on leaf nodes and internal nodes + if (nodeHasData && node.links.length) { + streamPosition += file.data.length + } // work out which child nodes contain the requested data const filteredLinks = node.links @@ -100,7 +101,7 @@ function streamBytes (dag, node, fileSize, offset, length) { return child }) - .filter((child, index) => { + .filter((child) => { return (offset >= child.start && offset < child.end) || // child has offset byte (end > child.start && end <= child.end) || // child has end byte (offset < child.start && end > child.end) // child is between offset and end bytes @@ -138,6 +139,12 @@ function streamBytes (dag, node, fileSize, offset, length) { function extractDataFromBlock (block, streamPosition, begin, end) { const blockLength = block.length + if (begin >= streamPosition + blockLength) { + // If begin is after the start of the block, return an empty block + // This can happen when internal nodes contain data + return Buffer.alloc(0) + } + if (end - streamPosition < blockLength) { // If the end byte is in the current block, truncate the block to the end byte block = block.slice(0, end - streamPosition) diff --git a/test/exporter.js b/test/exporter.js index d165e7e4..1bca514c 100644 --- a/test/exporter.js +++ b/test/exporter.js @@ -14,6 +14,17 @@ const CID = require('cids') const loadFixture = require('aegir/fixtures') const doUntil = require('async/doUntil') const waterfall = require('async/waterfall') +const parallel = require('async/parallel') +const series = require('async/series') +const fs = require('fs') +const path = require('path') +const push = require('pull-pushable') +const toPull = require('stream-to-pull-stream') +const toStream = require('pull-stream-to-stream') +const { + DAGNode, + DAGLink +} = require('ipld-dag-pb') const unixFSEngine = require('./../src') const exporter = unixFSEngine.exporter @@ -64,6 +75,52 @@ module.exports = (repo) => { }) } + function addTestDirectory ({directory, strategy = 'balanced', maxChunkSize}, callback) { + const input = push() + const dirName = path.basename(directory) + + pull( + input, + pull.map((file) => { + return { + path: path.join(dirName, path.basename(file)), + content: toPull.source(fs.createReadStream(file)) + } + }), + importer(ipld, { + strategy, + maxChunkSize + }), + pull.collect(callback) + ) + + const listFiles = (directory, depth, stream, cb) => { + waterfall([ + (done) => fs.stat(directory, done), + (stats, done) => { + if (stats.isDirectory()) { + return waterfall([ + (done) => fs.readdir(directory, done), + (children, done) => { + series( + children.map(child => (next) => listFiles(path.join(directory, child), depth + 1, stream, next)), + done + ) + } + ], done) + } + + stream.push(directory) + done() + } + ], cb) + } + + listFiles(directory, 0, input, () => { + input.end() + }) + } + function checkBytesThatSpanBlocks (strategy, cb) { const bytesInABlock = 262144 const bytes = Buffer.alloc(bytesInABlock + 100, 0) @@ -517,6 +574,55 @@ module.exports = (repo) => { checkBytesThatSpanBlocks('trickle', done) }) + it('exports a directory containing an empty file whose content gets turned into a ReadableStream', function (done) { + // replicates the behaviour of ipfs.files.get + waterfall([ + (cb) => addTestDirectory({ + directory: path.join(__dirname, 'fixtures', 'dir-with-empty-files') + }, cb), + (result, cb) => { + const dir = result.pop() + + pull( + exporter(dir.multihash, ipld), + pull.map((file) => { + if (file.content) { + file.content = toStream.source(file.content) + file.content.pause() + } + + return file + }), + pull.collect((error, files) => { + if (error) { + return cb(error) + } + + series( + files + .filter(file => Boolean(file.content)) + .map(file => { + return (done) => { + if (file.content) { + file.content + .pipe(toStream.sink(pull.collect((error, bufs) => { + expect(error).to.not.exist() + expect(bufs.length).to.equal(1) + expect(bufs[0].length).to.equal(0) + + done() + }))) + } + } + }), + cb + ) + }) + ) + } + ], done) + }) + // TODO: This needs for the stores to have timeouts, // otherwise it is impossible to predict if a file doesn't // really exist @@ -532,6 +638,100 @@ module.exports = (repo) => { }) ) }) + + it('exports file with data on internal and leaf nodes', function (done) { + waterfall([ + (cb) => createAndPersistNode(ipld, 'raw', [0x04, 0x05, 0x06, 0x07], [], cb), + (leaf, cb) => createAndPersistNode(ipld, 'file', [0x00, 0x01, 0x02, 0x03], [ + leaf + ], cb), + (file, cb) => { + pull( + exporter(file.multihash, ipld), + pull.asyncMap((file, cb) => readFile(file, cb)), + pull.through(buffer => { + expect(buffer).to.deep.equal(Buffer.from([0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07])) + }), + pull.collect(cb) + ) + } + ], done) + }) + + it('exports file with data on some internal and leaf nodes', function (done) { + // create a file node with three children: + // where: + // i = internal node without data + // d = internal node with data + // l = leaf node with data + // i + // / | \ + // l d i + // | \ + // l l + waterfall([ + (cb) => { + // create leaves + parallel([ + (next) => createAndPersistNode(ipld, 'raw', [0x00, 0x01, 0x02, 0x03], [], next), + (next) => createAndPersistNode(ipld, 'raw', [0x08, 0x09, 0x10, 0x11], [], next), + (next) => createAndPersistNode(ipld, 'raw', [0x12, 0x13, 0x14, 0x15], [], next) + ], cb) + }, + (leaves, cb) => { + parallel([ + (next) => createAndPersistNode(ipld, 'raw', [0x04, 0x05, 0x06, 0x07], [leaves[1]], next), + (next) => createAndPersistNode(ipld, 'raw', null, [leaves[2]], next) + ], (error, internalNodes) => { + if (error) { + return cb(error) + } + + createAndPersistNode(ipld, 'file', null, [ + leaves[0], + internalNodes[0], + internalNodes[1] + ], cb) + }) + }, + (file, cb) => { + pull( + exporter(file.multihash, ipld), + pull.asyncMap((file, cb) => readFile(file, cb)), + pull.through(buffer => { + expect(buffer).to.deep.equal( + Buffer.from([ + 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, + 0x08, 0x09, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15 + ]) + ) + }), + pull.collect(cb) + ) + } + ], done) + }) + + it('exports file with data on internal and leaf nodes with an offset that only fetches data from leaf nodes', function (done) { + waterfall([ + (cb) => createAndPersistNode(ipld, 'raw', [0x04, 0x05, 0x06, 0x07], [], cb), + (leaf, cb) => createAndPersistNode(ipld, 'file', [0x00, 0x01, 0x02, 0x03], [ + leaf + ], cb), + (file, cb) => { + pull( + exporter(file.multihash, ipld, { + offset: 4 + }), + pull.asyncMap((file, cb) => readFile(file, cb)), + pull.through(buffer => { + expect(buffer).to.deep.equal(Buffer.from([0x04, 0x05, 0x06, 0x07])) + }), + pull.collect(cb) + ) + } + ], done) + }) }) } @@ -567,3 +767,26 @@ function readFile (file, done) { }) ) } + +function createAndPersistNode (ipld, type, data, children, callback) { + const file = new UnixFS(type, data ? Buffer.from(data) : undefined) + const links = [] + + children.forEach(child => { + const leaf = UnixFS.unmarshal(child.data) + + file.addBlockSize(leaf.fileSize()) + + links.push(new DAGLink('', child.size, child.multihash)) + }) + + DAGNode.create(file.marshal(), links, (error, node) => { + if (error) { + return callback(error) + } + + ipld.put(node, { + cid: new CID(node.multihash) + }, (error) => callback(error, node)) + }) +} diff --git a/test/fixtures/dir-with-empty-files/empty-file.txt b/test/fixtures/dir-with-empty-files/empty-file.txt new file mode 100644 index 00000000..e69de29b diff --git a/test/importer.js b/test/importer.js index a5685a03..f5b1edba 100644 --- a/test/importer.js +++ b/test/importer.js @@ -7,7 +7,7 @@ const extend = require('deep-extend') const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect -const sinon = require('sinon') +const spy = require('sinon/lib/sinon/spy') const BlockService = require('ipfs-block-service') const pull = require('pull-stream') const mh = require('multihashes') @@ -214,7 +214,7 @@ module.exports = (repo) => { expect(err).to.not.exist() expect(nodes.length).to.be.eql(1) // always yield empty node - expect(mh.toB58String(nodes[0].multihash)).to.be.eql('QmfJMCvenrj4SKKRc48DYPxwVdS44qCUCqqtbqhJuSTWXP') + expect(mh.toB58String(nodes[0].multihash)).to.be.eql('QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH') done() })) }) @@ -456,7 +456,7 @@ module.exports = (repo) => { }) it('will call an optional progress function', (done) => { - options.progress = sinon.spy() + options.progress = spy() pull( pull.values([{ diff --git a/test/node.js b/test/node.js index 8bc82510..064428e9 100644 --- a/test/node.js +++ b/test/node.js @@ -10,8 +10,8 @@ const mkdirp = require('mkdirp') const series = require('async/series') describe('IPFS UnixFS Engine', () => { - const repoExample = path.join(process.cwd(), '/test/test-repo') - const repoTests = path.join(os.tmpdir(), '/unixfs-tests-' + Date.now()) + const repoExample = path.join(process.cwd(), 'test', 'test-repo') + const repoTests = path.join(os.tmpdir(), 'unixfs-tests-' + Date.now()) const repo = new IPFSRepo(repoTests) diff --git a/test/with-dag-api.js b/test/with-dag-api.js index e93a9ee2..1f68442c 100644 --- a/test/with-dag-api.js +++ b/test/with-dag-api.js @@ -225,7 +225,7 @@ describe('with dag-api', function () { expect(err).to.not.exist() expect(nodes.length).to.be.eql(1) // always yield empty node - expect(mh.toB58String(nodes[0].multihash)).to.be.eql('QmfJMCvenrj4SKKRc48DYPxwVdS44qCUCqqtbqhJuSTWXP') + expect(mh.toB58String(nodes[0].multihash)).to.be.eql('QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH') done() })) })