Skip to content
This repository has been archived by the owner on Jul 3, 2019. It is now read-only.

Commit

Permalink
feat(get): add sync API for reading
Browse files Browse the repository at this point in the history
  • Loading branch information
zkat committed Nov 5, 2018
1 parent 93b0893 commit db1e094
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 25 deletions.
49 changes: 49 additions & 0 deletions get.js
Expand Up @@ -63,6 +63,55 @@ function getData (byDigest, cache, key, opts) {
})
}

module.exports.sync = function get (cache, key, opts) {
return getDataSync(false, cache, key, opts)
}
module.exports.sync.byDigest = function getByDigest (cache, digest, opts) {
return getDataSync(true, cache, digest, opts)
}
function getDataSync (byDigest, cache, key, opts) {
opts = GetOpts(opts)
const memoized = (
byDigest
? memo.get.byDigest(cache, key, opts)
: memo.get(cache, key, opts)
)
if (memoized && opts.memoize !== false) {
return byDigest ? memoized : {
metadata: memoized.entry.metadata,
data: memoized.data,
integrity: memoized.entry.integrity,
size: memoized.entry.size
}
}
const entry = !byDigest && index.find.sync(cache, key, opts)
if (!entry && !byDigest) {
throw new index.NotFoundError(cache, key)
}
const data = read.sync(
cache,
byDigest ? key : entry.integrity,
{
integrity: opts.integrity,
size: opts.size
}
)
const res = byDigest
? data
: {
metadata: entry.metadata,
data: data,
size: entry.size,
integrity: entry.integrity
}
if (opts.memoize && byDigest) {
memo.put.byDigest(cache, key, res, opts)
} else if (opts.memoize) {
memo.put(cache, entry, res.data, opts)
}
return res
}

module.exports.stream = getStream
function getStream (cache, key, opts) {
opts = GetOpts(opts)
Expand Down
107 changes: 82 additions & 25 deletions lib/entry-index.js
Expand Up @@ -75,10 +75,36 @@ function insert (cache, key, integrity, opts) {
})
}

module.exports.insert.sync = insertSync
function insertSync (cache, key, integrity, opts) {
opts = IndexOpts(opts)
const bucket = bucketPath(cache, key)
const entry = {
key,
integrity: integrity && ssri.stringify(integrity),
time: Date.now(),
size: opts.size,
metadata: opts.metadata
}
fixOwner.mkdirfix.sync(path.dirname(bucket), opts.uid, opts.gid)
const stringified = JSON.stringify(entry)
fs.appendFileSync(
bucket, `\n${hashEntry(stringified)}\t${stringified}`
)
try {
fixOwner.chownr.sync(bucket, opts.uid, opts.gid)
} catch (err) {
if (err.code !== 'ENOENT') {
throw err
}
}
return formatEntry(cache, entry)
}

module.exports.find = find
function find (cache, key) {
const bucket = bucketPath(cache, key)
return bucketEntries(cache, bucket).then(entries => {
return bucketEntries(bucket).then(entries => {
return entries.reduce((latest, next) => {
if (next && next.key === key) {
return formatEntry(cache, next)
Expand All @@ -95,11 +121,36 @@ function find (cache, key) {
})
}

module.exports.find.sync = findSync
function findSync (cache, key) {
const bucket = bucketPath(cache, key)
try {
return bucketEntriesSync(bucket).reduce((latest, next) => {
if (next && next.key === key) {
return formatEntry(cache, next)
} else {
return latest
}
}, null)
} catch (err) {
if (err.code === 'ENOENT') {
return null
} else {
throw err
}
}
}

module.exports.delete = del
function del (cache, key, opts) {
return insert(cache, key, null, opts)
}

module.exports.delete.sync = delSync
function delSync (cache, key, opts) {
return insertSync(cache, key, null, opts)
}

module.exports.lsStream = lsStream
function lsStream (cache) {
const indexDir = bucketDir(cache)
Expand All @@ -116,7 +167,6 @@ function lsStream (cache) {
// "/cachename/<bucket 0xFF>/<bucket 0xFF>/*"
return readdirOrEmpty(subbucketPath).map(entry => {
const getKeyToEntry = bucketEntries(
cache,
path.join(subbucketPath, entry)
).reduce((acc, entry) => {
acc.set(entry.key, entry)
Expand Down Expand Up @@ -152,32 +202,39 @@ function ls (cache) {
})
}

function bucketEntries (cache, bucket, filter) {
function bucketEntries (bucket, filter) {
return readFileAsync(
bucket, 'utf8'
).then(data => {
let entries = []
data.split('\n').forEach(entry => {
if (!entry) { return }
const pieces = entry.split('\t')
if (!pieces[1] || hashEntry(pieces[1]) !== pieces[0]) {
// Hash is no good! Corruption or malice? Doesn't matter!
// EJECT EJECT
return
}
let obj
try {
obj = JSON.parse(pieces[1])
} catch (e) {
// Entry is corrupted!
return
}
if (obj) {
entries.push(obj)
}
})
return entries
).then(data => _bucketEntries(data, filter))
}

function bucketEntriesSync (bucket, filter) {
const data = fs.readFileSync(bucket, 'utf8')
return _bucketEntries(data, filter)
}

function _bucketEntries (data, filter) {
let entries = []
data.split('\n').forEach(entry => {
if (!entry) { return }
const pieces = entry.split('\t')
if (!pieces[1] || hashEntry(pieces[1]) !== pieces[0]) {
// Hash is no good! Corruption or malice? Doesn't matter!
// EJECT EJECT
return
}
let obj
try {
obj = JSON.parse(pieces[1])
} catch (e) {
// Entry is corrupted!
return
}
if (obj) {
entries.push(obj)
}
})
return entries
}

module.exports._bucketDir = bucketDir
Expand Down
46 changes: 46 additions & 0 deletions lib/util/fix-owner.js
Expand Up @@ -31,6 +31,34 @@ function fixOwner (filepath, uid, gid) {
)
}

module.exports.chownr.sync = fixOwnerSync
function fixOwnerSync (filepath, uid, gid) {
if (!process.getuid) {
// This platform doesn't need ownership fixing
return
}
if (typeof uid !== 'number' && typeof gid !== 'number') {
// There's no permissions override. Nothing to do here.
return
}
if ((typeof uid === 'number' && process.getuid() === uid) &&
(typeof gid === 'number' && process.getgid() === gid)) {
// No need to override if it's already what we used.
return
}
try {
chownr.sync(
filepath,
typeof uid === 'number' ? uid : process.getuid(),
typeof gid === 'number' ? gid : process.getgid()
)
} catch (err) {
if (err.code === 'ENOENT') {
return null
}
}
}

module.exports.mkdirfix = mkdirfix
function mkdirfix (p, uid, gid, cb) {
return mkdirp(p).then(made => {
Expand All @@ -42,3 +70,21 @@ function mkdirfix (p, uid, gid, cb) {
return fixOwner(p, uid, gid).then(() => null)
})
}

module.exports.mkdirfix.sync = mkdirfixSync
function mkdirfixSync (p, uid, gid) {
try {
const made = mkdirp.sync(p)
if (made) {
fixOwnerSync(made, uid, gid)
return made
}
} catch (err) {
if (err.code === 'EEXIST') {
fixOwnerSync(p, uid, gid)
return null
} else {
throw err
}
}
}
2 changes: 2 additions & 0 deletions locales/en.js
Expand Up @@ -18,6 +18,8 @@ x.ls.stream = cache => ls.stream(cache)

x.get = (cache, key, opts) => get(cache, key, opts)
x.get.byDigest = (cache, hash, opts) => get.byDigest(cache, hash, opts)
x.get.sync = (cache, key, opts) => get.sync(cache, key, opts)
x.get.sync.byDigest = (cache, key, opts) => get.sync.byDigest(cache, key, opts)
x.get.stream = (cache, key, opts) => get.stream(cache, key, opts)
x.get.stream.byDigest = (cache, hash, opts) => get.stream.byDigest(cache, hash, opts)
x.get.copy = (cache, key, dest, opts) => get.copy(cache, key, dest, opts)
Expand Down
2 changes: 2 additions & 0 deletions locales/es.js
Expand Up @@ -18,6 +18,8 @@ x.ls.flujo = cache => ls.stream(cache)

x.saca = (cache, clave, ops) => get(cache, clave, ops)
x.saca.porHacheo = (cache, hacheo, ops) => get.byDigest(cache, hacheo, ops)
x.saca.sinc = (cache, clave, ops) => get.sync(cache, clave, ops)
x.saca.sinc.porHacheo = (cache, hacheo, ops) => get.sync.byDigest(cache, hacheo, ops)
x.saca.flujo = (cache, clave, ops) => get.stream(cache, clave, ops)
x.saca.flujo.porHacheo = (cache, hacheo, ops) => get.stream.byDigest(cache, hacheo, ops)
x.sava.copia = (cache, clave, destino, opts) => get.copy(cache, clave, destino, opts)
Expand Down
18 changes: 18 additions & 0 deletions test/get.js
Expand Up @@ -80,6 +80,24 @@ test('basic bulk get', t => {
})
})

test('basic sync get', t => {
const fixture = new Tacks(CacheContent({
[INTEGRITY]: CONTENT
}))
fixture.create(CACHE)
index.insert.sync(CACHE, KEY, INTEGRITY, opts())
const res = get.sync(CACHE, KEY)
t.deepEqual(res, {
metadata: METADATA,
data: CONTENT,
integrity: INTEGRITY,
size: SIZE
}, 'bulk key get returned proper data')
const resByDig = get.sync.byDigest(CACHE, INTEGRITY)
t.deepEqual(resByDig, CONTENT, 'byDigest returned proper data')
t.done()
})

test('basic stream get', t => {
const fixture = new Tacks(CacheContent({
[INTEGRITY]: CONTENT
Expand Down

0 comments on commit db1e094

Please sign in to comment.