From d26cdf9f8fecfb171177ce736f1c7555e314b84b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kat=20March=C3=A1n?= Date: Sat, 4 Mar 2017 17:02:23 -0800 Subject: [PATCH] fix(perf): bulk-read get+read for massive speed --- get.js | 43 +++++++++++++++++++++--------------------- lib/content/read.js | 35 ++++++++++++++++++++++++++++++++++ test/content.read.js | 33 ++++++++++++++++++++++---------- test/util/bufferise.js | 8 ++++++++ 4 files changed, 88 insertions(+), 31 deletions(-) create mode 100644 test/util/bufferise.js diff --git a/get.js b/get.js index f097382..053d096 100644 --- a/get.js +++ b/get.js @@ -3,7 +3,6 @@ const Promise = require('bluebird') const index = require('./lib/entry-index') -const finished = Promise.promisify(require('mississippi').finished) const memo = require('./lib/memoization') const pipe = require('mississippi').pipe const pipeline = require('mississippi').pipeline @@ -25,33 +24,35 @@ function getData (byDigest, cache, key, opts) { : memo.get(cache, key) ) if (memoized && opts.memoize !== false) { - return Promise.resolve({ + return Promise.resolve(byDigest ? memoized : { metadata: memoized.entry.metadata, data: memoized.data, digest: memoized.entry.digest, hashAlgorithm: memoized.entry.hashAlgorithm }) } - const src = (byDigest ? getStreamDigest : getStream)(cache, key, opts) - let acc = [] - let dataTotal = 0 - let metadata - let digest - let hashAlgorithm - if (!byDigest) { - src.on('digest', d => { - digest = d + return ( + byDigest ? Promise.resolve(null) : index.find(cache, key, opts) + ).then(entry => { + if (!entry && !byDigest) { + throw index.notFoundError(cache, key) + } + return read(cache, byDigest ? key : entry.digest, { + hashAlgorithm: byDigest ? opts.hashAlgorithm : entry.hashAlgorithm, + size: opts.size + }).then(data => byDigest ? data : { + metadata: entry.metadata, + data: data, + digest: entry.digest, + hashAlgorithm: entry.hashAlgorithm + }).then(res => { + if (opts.memoize && byDigest) { + memo.put.byDigest(cache, key, opts.hashAlgorithm, res) + } else if (opts.memoize) { + memo.put(cache, entry, res.data) + } + return res }) - src.on('hashAlgorithm', d => { hashAlgorithm = d }) - src.on('metadata', d => { metadata = d }) - } - src.on('data', d => { - acc.push(d) - dataTotal += d.length - }) - return finished(src).then(() => { - const data = Buffer.concat(acc, dataTotal) - return byDigest ? data : { metadata, data, digest, hashAlgorithm } }) } diff --git a/lib/content/read.js b/lib/content/read.js index e6a102e..96cecd9 100644 --- a/lib/content/read.js +++ b/lib/content/read.js @@ -4,11 +4,30 @@ const Promise = require('bluebird') const checksumStream = require('checksum-stream') const contentPath = require('./path') +const crypto = require('crypto') const fs = require('graceful-fs') const pipeline = require('mississippi').pipeline Promise.promisifyAll(fs) +module.exports = read +function read (cache, address, opts) { + opts = opts || {} + const algo = opts.hashAlgorithm || 'sha512' + const cpath = contentPath(cache, address, algo) + return fs.readFileAsync(cpath, null).then(data => { + const digest = crypto.createHash(algo).update(data).digest('hex') + if (typeof opts.size === 'number' && opts.size !== data.length) { + throw sizeError(opts.size, data.length) + } else if (digest !== address) { + throw checksumError(address, digest) + } else { + return data + } + }) +} + +module.exports.stream = readStream module.exports.readStream = readStream function readStream (cache, address, opts) { opts = opts || {} @@ -37,3 +56,19 @@ function hasContent (cache, address, algorithm) { } }) } + +function sizeError (expected, found) { + var err = new Error('stream data size mismatch') + err.expected = expected + err.found = found + err.code = 'EBADSIZE' + return err +} + +function checksumError (expected, found) { + var err = new Error('checksum failed') + err.code = 'EBADCHECKSUM' + err.expected = expected + err.found = found + return err +} diff --git a/test/content.read.js b/test/content.read.js index acb0615..cb9411d 100644 --- a/test/content.read.js +++ b/test/content.read.js @@ -2,6 +2,7 @@ const Promise = require('bluebird') +const bufferise = require('./util/bufferise') const crypto = require('crypto') const path = require('path') const Tacks = require('tacks') @@ -13,25 +14,37 @@ const CacheContent = require('./util/cache-content') const read = require('../lib/content/read') -test('readStream: returns a stream with cache content data', function (t) { - const CONTENT = 'foobarbaz' +test('read: returns a Promise with cache content data', function (t) { + const CONTENT = bufferise('foobarbaz') const DIGEST = crypto.createHash('sha512').update(CONTENT).digest('hex') const fixture = new Tacks(CacheContent({ [DIGEST]: CONTENT })) fixture.create(CACHE) - const stream = read.readStream(CACHE, DIGEST) + return read(CACHE, DIGEST).then(data => { + t.deepEqual(data, CONTENT, 'cache contents read correctly') + }) +}) + +test('read.stream: returns a stream with cache content data', function (t) { + const CONTENT = bufferise('foobarbaz') + const DIGEST = crypto.createHash('sha512').update(CONTENT).digest('hex') + const fixture = new Tacks(CacheContent({ + [DIGEST]: CONTENT + })) + fixture.create(CACHE) + const stream = read.stream(CACHE, DIGEST) stream.on('error', function (e) { throw e }) let buf = '' stream.on('data', function (data) { buf += data }) stream.on('end', function () { t.ok(true, 'stream completed successfully') - t.equal(CONTENT, buf, 'cache contents read correctly') + t.deepEqual(bufferise(buf), CONTENT, 'cache contents read correctly') t.end() }) }) -test('readStream: allows hashAlgorithm configuration', function (t) { +test('read.stream: allows hashAlgorithm configuration', function (t) { const CONTENT = 'foobarbaz' const HASH = 'whirlpool' const DIGEST = crypto.createHash(HASH).update(CONTENT).digest('hex') @@ -39,19 +52,19 @@ test('readStream: allows hashAlgorithm configuration', function (t) { [DIGEST]: CONTENT }, HASH)) fixture.create(CACHE) - const stream = read.readStream(CACHE, DIGEST, { hashAlgorithm: HASH }) + const stream = read.stream(CACHE, DIGEST, { hashAlgorithm: HASH }) stream.on('error', function (e) { throw e }) let buf = '' stream.on('data', function (data) { buf += data }) stream.on('end', function () { t.ok(true, 'stream completed successfully, off a sha512') - t.equal(CONTENT, buf, 'cache contents read correctly') + t.deepEqual(buf, CONTENT, 'cache contents read correctly') t.end() }) }) -test('readStream: errors if content missing', function (t) { - const stream = read.readStream(CACHE, 'whatnot') +test('read.stream: errors if content missing', function (t) { + const stream = read.stream(CACHE, 'whatnot') stream.on('error', function (e) { t.ok(e, 'got an error!') t.equal(e.code, 'ENOENT', 'error uses ENOENT error code') @@ -65,7 +78,7 @@ test('readStream: errors if content missing', function (t) { }) }) -test('readStream: errors if content fails checksum', function (t) { +test('read.stream: errors if content fails checksum', function (t) { const CONTENT = 'foobarbaz' const DIGEST = crypto.createHash('sha512').update(CONTENT).digest('hex') const fixture = new Tacks(CacheContent({ diff --git a/test/util/bufferise.js b/test/util/bufferise.js new file mode 100644 index 0000000..45253db --- /dev/null +++ b/test/util/bufferise.js @@ -0,0 +1,8 @@ +'use strict' + +module.exports = bufferise +function bufferise (string) { + return Buffer.from + ? Buffer.from(string, 'utf8') + : new Buffer(string, 'utf8') +}