Skip to content
This repository has been archived by the owner on Jul 3, 2019. It is now read-only.

Commit

Permalink
fix(perf): bulk-read get+read for massive speed
Browse files Browse the repository at this point in the history
  • Loading branch information
zkat committed Mar 5, 2017
1 parent db18a85 commit d26cdf9
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 31 deletions.
43 changes: 22 additions & 21 deletions get.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
const Promise = require('bluebird')

const index = require('./lib/entry-index')
const finished = Promise.promisify(require('mississippi').finished)
const memo = require('./lib/memoization')
const pipe = require('mississippi').pipe
const pipeline = require('mississippi').pipeline
Expand All @@ -25,33 +24,35 @@ function getData (byDigest, cache, key, opts) {
: memo.get(cache, key)
)
if (memoized && opts.memoize !== false) {
return Promise.resolve({
return Promise.resolve(byDigest ? memoized : {
metadata: memoized.entry.metadata,
data: memoized.data,
digest: memoized.entry.digest,
hashAlgorithm: memoized.entry.hashAlgorithm
})
}
const src = (byDigest ? getStreamDigest : getStream)(cache, key, opts)
let acc = []
let dataTotal = 0
let metadata
let digest
let hashAlgorithm
if (!byDigest) {
src.on('digest', d => {
digest = d
return (
byDigest ? Promise.resolve(null) : index.find(cache, key, opts)
).then(entry => {
if (!entry && !byDigest) {
throw index.notFoundError(cache, key)
}
return read(cache, byDigest ? key : entry.digest, {
hashAlgorithm: byDigest ? opts.hashAlgorithm : entry.hashAlgorithm,
size: opts.size
}).then(data => byDigest ? data : {
metadata: entry.metadata,
data: data,
digest: entry.digest,
hashAlgorithm: entry.hashAlgorithm
}).then(res => {
if (opts.memoize && byDigest) {
memo.put.byDigest(cache, key, opts.hashAlgorithm, res)
} else if (opts.memoize) {
memo.put(cache, entry, res.data)
}
return res
})
src.on('hashAlgorithm', d => { hashAlgorithm = d })
src.on('metadata', d => { metadata = d })
}
src.on('data', d => {
acc.push(d)
dataTotal += d.length
})
return finished(src).then(() => {
const data = Buffer.concat(acc, dataTotal)
return byDigest ? data : { metadata, data, digest, hashAlgorithm }
})
}

Expand Down
35 changes: 35 additions & 0 deletions lib/content/read.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,30 @@ const Promise = require('bluebird')

const checksumStream = require('checksum-stream')
const contentPath = require('./path')
const crypto = require('crypto')
const fs = require('graceful-fs')
const pipeline = require('mississippi').pipeline

Promise.promisifyAll(fs)

module.exports = read
function read (cache, address, opts) {
opts = opts || {}
const algo = opts.hashAlgorithm || 'sha512'
const cpath = contentPath(cache, address, algo)
return fs.readFileAsync(cpath, null).then(data => {
const digest = crypto.createHash(algo).update(data).digest('hex')
if (typeof opts.size === 'number' && opts.size !== data.length) {
throw sizeError(opts.size, data.length)
} else if (digest !== address) {
throw checksumError(address, digest)
} else {
return data
}
})
}

module.exports.stream = readStream
module.exports.readStream = readStream
function readStream (cache, address, opts) {
opts = opts || {}
Expand Down Expand Up @@ -37,3 +56,19 @@ function hasContent (cache, address, algorithm) {
}
})
}

function sizeError (expected, found) {
var err = new Error('stream data size mismatch')
err.expected = expected
err.found = found
err.code = 'EBADSIZE'
return err
}

function checksumError (expected, found) {
var err = new Error('checksum failed')
err.code = 'EBADCHECKSUM'
err.expected = expected
err.found = found
return err
}
33 changes: 23 additions & 10 deletions test/content.read.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const Promise = require('bluebird')

const bufferise = require('./util/bufferise')
const crypto = require('crypto')
const path = require('path')
const Tacks = require('tacks')
Expand All @@ -13,45 +14,57 @@ const CacheContent = require('./util/cache-content')

const read = require('../lib/content/read')

test('readStream: returns a stream with cache content data', function (t) {
const CONTENT = 'foobarbaz'
test('read: returns a Promise with cache content data', function (t) {
const CONTENT = bufferise('foobarbaz')
const DIGEST = crypto.createHash('sha512').update(CONTENT).digest('hex')
const fixture = new Tacks(CacheContent({
[DIGEST]: CONTENT
}))
fixture.create(CACHE)
const stream = read.readStream(CACHE, DIGEST)
return read(CACHE, DIGEST).then(data => {
t.deepEqual(data, CONTENT, 'cache contents read correctly')
})
})

test('read.stream: returns a stream with cache content data', function (t) {
const CONTENT = bufferise('foobarbaz')
const DIGEST = crypto.createHash('sha512').update(CONTENT).digest('hex')
const fixture = new Tacks(CacheContent({
[DIGEST]: CONTENT
}))
fixture.create(CACHE)
const stream = read.stream(CACHE, DIGEST)
stream.on('error', function (e) { throw e })
let buf = ''
stream.on('data', function (data) { buf += data })
stream.on('end', function () {
t.ok(true, 'stream completed successfully')
t.equal(CONTENT, buf, 'cache contents read correctly')
t.deepEqual(bufferise(buf), CONTENT, 'cache contents read correctly')
t.end()
})
})

test('readStream: allows hashAlgorithm configuration', function (t) {
test('read.stream: allows hashAlgorithm configuration', function (t) {
const CONTENT = 'foobarbaz'
const HASH = 'whirlpool'
const DIGEST = crypto.createHash(HASH).update(CONTENT).digest('hex')
const fixture = new Tacks(CacheContent({
[DIGEST]: CONTENT
}, HASH))
fixture.create(CACHE)
const stream = read.readStream(CACHE, DIGEST, { hashAlgorithm: HASH })
const stream = read.stream(CACHE, DIGEST, { hashAlgorithm: HASH })
stream.on('error', function (e) { throw e })
let buf = ''
stream.on('data', function (data) { buf += data })
stream.on('end', function () {
t.ok(true, 'stream completed successfully, off a sha512')
t.equal(CONTENT, buf, 'cache contents read correctly')
t.deepEqual(buf, CONTENT, 'cache contents read correctly')
t.end()
})
})

test('readStream: errors if content missing', function (t) {
const stream = read.readStream(CACHE, 'whatnot')
test('read.stream: errors if content missing', function (t) {
const stream = read.stream(CACHE, 'whatnot')
stream.on('error', function (e) {
t.ok(e, 'got an error!')
t.equal(e.code, 'ENOENT', 'error uses ENOENT error code')
Expand All @@ -65,7 +78,7 @@ test('readStream: errors if content missing', function (t) {
})
})

test('readStream: errors if content fails checksum', function (t) {
test('read.stream: errors if content fails checksum', function (t) {
const CONTENT = 'foobarbaz'
const DIGEST = crypto.createHash('sha512').update(CONTENT).digest('hex')
const fixture = new Tacks(CacheContent({
Expand Down
8 changes: 8 additions & 0 deletions test/util/bufferise.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
'use strict'

module.exports = bufferise
function bufferise (string) {
return Buffer.from
? Buffer.from(string, 'utf8')
: new Buffer(string, 'utf8')
}

0 comments on commit d26cdf9

Please sign in to comment.