Skip to content

Commit

Permalink
feat: allow external integrity/size source
Browse files Browse the repository at this point in the history
  • Loading branch information
nlf committed May 16, 2022
1 parent 71d4389 commit f890f7b
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 16 deletions.
9 changes: 9 additions & 0 deletions README.md
Expand Up @@ -413,6 +413,15 @@ with an `EINTEGRITY` error.

`algorithms` has no effect if this option is present.

##### `opts.integrityEmitter`

*Streaming only* If present, uses the provided event emitter as a source of
truth for both integrity and size. This allows use cases where integrity is
already being calculated outside of cacache to reuse that data instead of
calculating it a second time.

The emitter must emit both the `'integrity'` and `'size'` events.

##### `opts.algorithms`

Default: ['sha512']
Expand Down
51 changes: 35 additions & 16 deletions lib/content/write.js
@@ -1,5 +1,6 @@
'use strict'

const events = require('events')
const util = require('util')

const contentPath = require('./path')
Expand Down Expand Up @@ -116,17 +117,31 @@ async function handleContent (inputStream, cache, opts) {
async function pipeToTmp (inputStream, cache, tmpTarget, opts) {
let integrity
let size
const hashStream = ssri.integrityStream({
integrity: opts.integrity,
algorithms: opts.algorithms,
size: opts.size,
})
hashStream.on('integrity', i => {
integrity = i
})
hashStream.on('size', s => {
size = s
})
let hashStream
let integrityEmitted, sizeEmitted
if (opts.integrityEmitter) {
integrityEmitted = events.once(opts.integrityEmitter, 'integrity')
opts.integrityEmitter.on('integrity', i => {
integrity = i
})

sizeEmitted = events.once(opts.integrityEmitter, 'size')
opts.integrityEmitter.on('size', s => {
size = s
})
} else {
hashStream = ssri.integrityStream({
integrity: opts.integrity,
algorithms: opts.algorithms,
size: opts.size,
})
hashStream.on('integrity', i => {
integrity = i
})
hashStream.on('size', s => {
size = s
})
}

const outStream = new fsm.WriteStream(tmpTarget, {
flags: 'wx',
Expand All @@ -135,13 +150,17 @@ async function pipeToTmp (inputStream, cache, tmpTarget, opts) {
// NB: this can throw if the hashStream has a problem with
// it, and the data is fully written. but pipeToTmp is only
// called in promisory contexts where that is handled.
const pipeline = new Pipeline(
inputStream,
hashStream,
outStream
)
const pipeline = opts.integrityEmitter
? new Pipeline(inputStream, outStream)
: new Pipeline(inputStream, hashStream, outStream)

await pipeline.promise()
if (opts.integrityEmitter) {
await Promise.all([
integrityEmitted,
sizeEmitted,
])
}
return { integrity, size }
}

Expand Down
58 changes: 58 additions & 0 deletions test/content/write.js
@@ -1,6 +1,8 @@
'use strict'

const events = require('events')
const fs = require('@npmcli/fs')
const Minipass = require('minipass')
const path = require('path')
const rimraf = require('rimraf')
const ssri = require('ssri')
Expand Down Expand Up @@ -32,6 +34,62 @@ t.test('basic put', (t) => {
})
})

t.test('basic put, providing external integrity emitter', async (t) => {
const CACHE = t.testdir()
const CONTENT = 'foobarbaz'
const INTEGRITY = ssri.fromData(CONTENT)

const write = t.mock('../../lib/content/write.js', {
ssri: {
...ssri,
integrityStream: () => {
throw new Error('Should not be called')
},
},
})

const source = new Minipass().end(CONTENT)

const tee = new Minipass()

const integrityStream = ssri.integrityStream()
// since the integrityStream is not going anywhere, we need to manually resume it
// otherwise it'll get stuck in paused mode and will never process any data events
integrityStream.resume()
const integrityStreamP = Promise.all([
events.once(integrityStream, 'integrity').then((res) => res[0]),
events.once(integrityStream, 'size').then((res) => res[0]),
])

const contentStream = write.stream(CACHE, { integrityEmitter: integrityStream })
const contentStreamP = Promise.all([
events.once(contentStream, 'integrity').then((res) => res[0]),
events.once(contentStream, 'size').then((res) => res[0]),
contentStream.promise(),
])

tee.pipe(integrityStream)
tee.pipe(contentStream)
source.pipe(tee)

const [
[ssriIntegrity, ssriSize],
[contentIntegrity, contentSize],
] = await Promise.all([
integrityStreamP,
contentStreamP,
])

t.equal(ssriSize, CONTENT.length, 'ssri got the right size')
t.equal(contentSize, CONTENT.length, 'content got the right size')
t.same(ssriIntegrity, INTEGRITY, 'ssri got the right integrity')
t.same(contentIntegrity, INTEGRITY, 'content got the right integrity')

const cpath = contentPath(CACHE, ssriIntegrity)
t.ok(fs.lstatSync(cpath).isFile(), 'content inserted as a single file')
t.equal(fs.readFileSync(cpath, 'utf8'), CONTENT, 'contents are identical to inserted content')
})

t.test("checks input digest doesn't match data", (t) => {
const CONTENT = 'foobarbaz'
const integrity = ssri.fromData(CONTENT)
Expand Down

0 comments on commit f890f7b

Please sign in to comment.