diff --git a/README.md b/README.md index 6dc11ba..73ecd03 100644 --- a/README.md +++ b/README.md @@ -413,6 +413,15 @@ with an `EINTEGRITY` error. `algorithms` has no effect if this option is present. +##### `opts.integrityEmitter` + +*Streaming only* If present, uses the provided event emitter as a source of +truth for both integrity and size. This allows use cases where integrity is +already being calculated outside of cacache to reuse that data instead of +calculating it a second time. + +The emitter must emit both the `'integrity'` and `'size'` events. + ##### `opts.algorithms` Default: ['sha512'] diff --git a/lib/content/write.js b/lib/content/write.js index a877710..568d8ad 100644 --- a/lib/content/write.js +++ b/lib/content/write.js @@ -1,5 +1,6 @@ 'use strict' +const events = require('events') const util = require('util') const contentPath = require('./path') @@ -116,17 +117,31 @@ async function handleContent (inputStream, cache, opts) { async function pipeToTmp (inputStream, cache, tmpTarget, opts) { let integrity let size - const hashStream = ssri.integrityStream({ - integrity: opts.integrity, - algorithms: opts.algorithms, - size: opts.size, - }) - hashStream.on('integrity', i => { - integrity = i - }) - hashStream.on('size', s => { - size = s - }) + let hashStream + let integrityEmitted, sizeEmitted + if (opts.integrityEmitter) { + integrityEmitted = events.once(opts.integrityEmitter, 'integrity') + opts.integrityEmitter.on('integrity', i => { + integrity = i + }) + + sizeEmitted = events.once(opts.integrityEmitter, 'size') + opts.integrityEmitter.on('size', s => { + size = s + }) + } else { + hashStream = ssri.integrityStream({ + integrity: opts.integrity, + algorithms: opts.algorithms, + size: opts.size, + }) + hashStream.on('integrity', i => { + integrity = i + }) + hashStream.on('size', s => { + size = s + }) + } const outStream = new fsm.WriteStream(tmpTarget, { flags: 'wx', @@ -135,13 +150,17 @@ async function pipeToTmp (inputStream, cache, tmpTarget, opts) { // NB: this can throw if the hashStream has a problem with // it, and the data is fully written. but pipeToTmp is only // called in promisory contexts where that is handled. - const pipeline = new Pipeline( - inputStream, - hashStream, - outStream - ) + const pipeline = opts.integrityEmitter + ? new Pipeline(inputStream, outStream) + : new Pipeline(inputStream, hashStream, outStream) await pipeline.promise() + if (opts.integrityEmitter) { + await Promise.all([ + integrityEmitted, + sizeEmitted, + ]) + } return { integrity, size } } diff --git a/test/content/write.js b/test/content/write.js index f565767..787fb65 100644 --- a/test/content/write.js +++ b/test/content/write.js @@ -1,6 +1,8 @@ 'use strict' +const events = require('events') const fs = require('@npmcli/fs') +const Minipass = require('minipass') const path = require('path') const rimraf = require('rimraf') const ssri = require('ssri') @@ -32,6 +34,62 @@ t.test('basic put', (t) => { }) }) +t.test('basic put, providing external integrity emitter', async (t) => { + const CACHE = t.testdir() + const CONTENT = 'foobarbaz' + const INTEGRITY = ssri.fromData(CONTENT) + + const write = t.mock('../../lib/content/write.js', { + ssri: { + ...ssri, + integrityStream: () => { + throw new Error('Should not be called') + }, + }, + }) + + const source = new Minipass().end(CONTENT) + + const tee = new Minipass() + + const integrityStream = ssri.integrityStream() + // since the integrityStream is not going anywhere, we need to manually resume it + // otherwise it'll get stuck in paused mode and will never process any data events + integrityStream.resume() + const integrityStreamP = Promise.all([ + events.once(integrityStream, 'integrity').then((res) => res[0]), + events.once(integrityStream, 'size').then((res) => res[0]), + ]) + + const contentStream = write.stream(CACHE, { integrityEmitter: integrityStream }) + const contentStreamP = Promise.all([ + events.once(contentStream, 'integrity').then((res) => res[0]), + events.once(contentStream, 'size').then((res) => res[0]), + contentStream.promise(), + ]) + + tee.pipe(integrityStream) + tee.pipe(contentStream) + source.pipe(tee) + + const [ + [ssriIntegrity, ssriSize], + [contentIntegrity, contentSize], + ] = await Promise.all([ + integrityStreamP, + contentStreamP, + ]) + + t.equal(ssriSize, CONTENT.length, 'ssri got the right size') + t.equal(contentSize, CONTENT.length, 'content got the right size') + t.same(ssriIntegrity, INTEGRITY, 'ssri got the right integrity') + t.same(contentIntegrity, INTEGRITY, 'content got the right integrity') + + const cpath = contentPath(CACHE, ssriIntegrity) + t.ok(fs.lstatSync(cpath).isFile(), 'content inserted as a single file') + t.equal(fs.readFileSync(cpath, 'utf8'), CONTENT, 'contents are identical to inserted content') +}) + t.test("checks input digest doesn't match data", (t) => { const CONTENT = 'foobarbaz' const integrity = ssri.fromData(CONTENT)