diff --git a/packages/ipfs-unixfs-exporter/package.json b/packages/ipfs-unixfs-exporter/package.json index 1db85149..55a4e048 100644 --- a/packages/ipfs-unixfs-exporter/package.json +++ b/packages/ipfs-unixfs-exporter/package.json @@ -163,6 +163,7 @@ "it-pipe": "^2.0.4", "it-pushable": "^3.1.0", "it-map": "^1.0.6", + "p-queue": "^7.3.0", "multiformats": "^9.4.2", "uint8arrays": "^3.0.0" }, diff --git a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.js b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.js index 30ce718d..4f11cb44 100644 --- a/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.js +++ b/packages/ipfs-unixfs-exporter/src/resolvers/unixfs-v1/content/file.js @@ -8,6 +8,7 @@ import { pushable } from 'it-pushable' import parallel from 'it-parallel' import { pipe } from 'it-pipe' import map from 'it-map' +import PQueue from 'p-queue' /** * @typedef {import('../../../types').ExporterOptions} ExporterOptions @@ -19,14 +20,15 @@ import map from 'it-map' /** * @param {Blockstore} blockstore * @param {PBNode | Uint8Array} node - * @param {import('it-pushable').Pushable} queue + * @param {import('it-pushable').Pushable} queue * @param {number} streamPosition * @param {number} start * @param {number} end + * @param {PQueue} walkQueue * @param {ExporterOptions} options * @returns {Promise} */ -async function walkDAG (blockstore, node, queue, streamPosition, start, end, options) { +async function walkDAG (blockstore, node, queue, streamPosition, start, end, walkQueue, options) { // a `raw` node if (node instanceof Uint8Array) { queue.push(extractDataFromBlock(node, streamPosition, start, end)) @@ -100,19 +102,23 @@ async function walkDAG (blockstore, node, queue, streamPosition, start, end, opt }), async (source) => { for await (const { link, block, blockStart } of source) { + /** @type {PBNode | Uint8Array} */ let child switch (link.Hash.code) { case dagPb.code: - child = await dagPb.decode(block) + child = dagPb.decode(block) break case raw.code: child = block break default: - throw errCode(new Error(`Unsupported codec: ${link.Hash.code}`), 'ERR_NOT_UNIXFS') + queue.end(errCode(new Error(`Unsupported codec: ${link.Hash.code}`), 'ERR_NOT_UNIXFS')) + return } - await walkDAG(blockstore, child, queue, blockStart, start, end, options) + walkQueue.add(async () => { + await walkDAG(blockstore, child, queue, blockStart, start, end, walkQueue, options) + }) } } ) @@ -141,14 +147,20 @@ const fileContent = (cid, node, unixfs, path, resolve, depth, blockstore) => { return } - const queue = pushable({ - objectMode: true + // use a queue to walk the DAG instead of recursion to ensure very deep DAGs + // don't overflow the stack + const walkQueue = new PQueue({ + concurrency: 1 }) + const queue = pushable() - walkDAG(blockstore, node, queue, 0, offset, offset + length, options) - .catch(err => { - queue.end(err) - }) + walkQueue.add(async () => { + await walkDAG(blockstore, node, queue, 0, offset, offset + length, walkQueue, options) + }) + + walkQueue.on('error', error => { + queue.end(error) + }) let read = 0