Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize prefixes for queries #95

Merged
merged 5 commits into from Jan 23, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Expand Up @@ -384,6 +384,12 @@ _Rule of thumb_: use prefix indexes in an EQUAL operation only when
the target `value` of your EQUAL can dynamically assume many (more
than a dozen) possible values.

An additional option `useMap` can be provided that will store the
prefix as a map instead of an array. The map can be seen as an
inverted index that allows for faster queries at the cost of extra
space. Maps don't store empty values meaning they are also a good fit
for sparce indexes such as vote links.

## Low-level API

First some terminology: offset refers to the byte position in the log
Expand Down
98 changes: 97 additions & 1 deletion benchmark/index.js
Expand Up @@ -18,7 +18,7 @@ const {
toPullStream,
paginate,
} = require('../operators')
const { seekType, seekAuthor } = require('../test/helpers')
const { seekType, seekAuthor, seekVoteLink } = require('../test/helpers')
const copy = require('../copy-json-to-bipf-async')

const dir = '/tmp/jitdb-benchmark'
Expand Down Expand Up @@ -213,3 +213,99 @@ test('paginate one huge index', (t) => {
)
})
})

test('query a prefix map (first run)', (t) => {
db.onReady(() => {
query(
fromDB(db),
paginate(1),
toCallback((err, { results }) => {
if (err) t.fail(err)
const rootKey = results[0].key

db.onReady(() => {
const start = Date.now()
let i = 0
pull(
query(
fromDB(db),
and(
equal(seekVoteLink, rootKey, {
indexType: 'value_content_vote_link',
useMap: true,
prefix: 32,
})
),
paginate(5),
toPullStream()
),
pull.drain(
(msgs) => {
i++
},
(err) => {
if (err) t.fail(err)
const duration = Date.now() - start
if (i !== 92) t.fail('wrong number of pages read: ' + i)
t.pass(`duration: ${duration}ms`)
fs.appendFileSync(
reportPath,
`| Query a prefix map (1st run) | ${duration}ms |\n`
)
t.end()
}
)
)
})
})
)
})
})

test('query a prefix map (second run)', (t) => {
db.onReady(() => {
query(
fromDB(db),
paginate(1),
toCallback((err, { results }) => {
if (err) t.fail(err)
const rootKey = results[0].key

db.onReady(() => {
const start = Date.now()
let i = 0
pull(
query(
fromDB(db),
and(
equal(seekVoteLink, rootKey, {
indexType: 'value_content_vote_link',
useMap: true,
prefix: 32,
})
),
paginate(5),
toPullStream()
),
pull.drain(
(msgs) => {
i++
},
(err) => {
if (err) t.fail(err)
const duration = Date.now() - start
if (i !== 92) t.fail('wrong number of pages read: ' + i)
t.pass(`duration: ${duration}ms`)
fs.appendFileSync(
reportPath,
`| Query a prefix map (2nd run) | ${duration}ms |\n`
)
t.end()
}
)
)
})
})
)
})
})
39 changes: 39 additions & 0 deletions files.js
Expand Up @@ -67,6 +67,43 @@ function loadTypedArrayFile(filename, Type, cb) {
.catch(cb)
}

function savePrefixMapFile(filename, version, offset, count, map, cb) {
if (!cb)
cb = (err) => {
if (err) console.error(err)
}

const jsonMap = JSON.stringify(map)
const b = Buffer.alloc(4 * FIELD_SIZE + jsonMap.length)
b.writeUInt32LE(version, 0)
b.writeUInt32LE(offset, FIELD_SIZE)
b.writeUInt32LE(count, 2 * FIELD_SIZE)
Buffer.from(jsonMap).copy(b, 4 * FIELD_SIZE)

writeFile(filename, b)
.then(() => cb())
.catch(cb)
}

function loadPrefixMapFile(filename, cb) {
readFile(filename)
.then((buf) => {
const version = buf.readUInt32LE(0)
const offset = buf.readUInt32LE(FIELD_SIZE)
const count = buf.readUInt32LE(2 * FIELD_SIZE)
const body = buf.slice(4 * FIELD_SIZE)
const map = JSON.parse(body)

cb(null, {
version,
offset,
count,
map,
})
})
.catch(cb)
}

function saveBitsetFile(filename, version, offset, bitset, cb) {
bitset.trim()
const count = bitset.words.length
Expand Down Expand Up @@ -120,6 +157,8 @@ function safeFilename(filename) {
module.exports = {
saveTypedArrayFile,
loadTypedArrayFile,
savePrefixMapFile,
loadPrefixMapFile,
saveBitsetFile,
loadBitsetFile,
listFilesIDB,
Expand Down
117 changes: 106 additions & 11 deletions index.js
Expand Up @@ -11,6 +11,8 @@ const debug = require('debug')('jitdb')
const {
saveTypedArrayFile,
loadTypedArrayFile,
savePrefixMapFile,
loadPrefixMapFile,
saveBitsetFile,
loadBitsetFile,
safeFilename,
Expand Down Expand Up @@ -113,6 +115,17 @@ module.exports = function (log, indexesPath) {
filepath: path.join(indexesPath, file),
}
cb()
} else if (file.endsWith('.32prefixmap')) {
// Don't load it yet, just tag it `lazy`
indexes[indexName] = {
offset: -1,
count: 0,
map: {},
lazy: true,
prefix: 32,
filepath: path.join(indexesPath, file),
}
cb()
} else if (file.endsWith('.index')) {
// Don't load it yet, just tag it `lazy`
indexes[indexName] = {
Expand Down Expand Up @@ -181,6 +194,21 @@ module.exports = function (log, indexesPath) {
)
}

function savePrefixMapIndex(name, prefixIndex, count, cb) {
if (prefixIndex.offset < 0) return
debug('saving prefix map index: %s', name)
const num = prefixIndex.prefix
const filename = path.join(indexesPath, name + `.${num}prefixmap`)
savePrefixMapFile(
filename,
prefixIndex.version || 1,
prefixIndex.offset,
count,
prefixIndex.map,
cb
)
}

function growTarrIndex(index, Type) {
debug('growing index')
const newArray = new Type(index.tarr.length * 2)
Expand Down Expand Up @@ -284,6 +312,29 @@ module.exports = function (log, indexesPath) {
}
}

function addToPrefixMap(map, seq, value) {
if (value === 0) return

let arr = map[value] || []
arr.push(seq)
map[value] = arr
}

function updatePrefixMapIndex(opData, index, buffer, seq, offset) {
if (seq > index.count - 1) {
const fieldStart = opData.seek(buffer)
if (fieldStart) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arj03 Thanks. Did you also see my comment about ~fieldStart? Or is this intentionally testing for "field must not be at the beginning of the buffer"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was keeping it in line with normal prefix indexes. It seems you are correct in that both have a bug if its at position 0 in the buffer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I didn't realize that normal prefix indexes had that too! Oops

const buf = bipf.slice(buffer, fieldStart)
addToPrefixMap(index.map, seq, buf.length ? safeReadUint32(buf) : 0)
} else {
addToPrefixMap(index.map, seq, 0)
staltz marked this conversation as resolved.
Show resolved Hide resolved
}

index.offset = offset
index.count = seq + 1
}
}

function updatePrefixIndex(opData, index, buffer, seq, offset) {
if (seq > index.count - 1) {
if (seq > index.tarr.length - 1) growTarrIndex(index, Uint32Array)
Expand Down Expand Up @@ -368,7 +419,9 @@ module.exports = function (log, indexesPath) {
updatedSequenceIndex = true

if (indexNeedsUpdate) {
if (op.data.prefix)
if (op.data.prefix && op.data.useMap)
updatePrefixMapIndex(op.data, index, buffer, seq, offset)
else if (op.data.prefix)
updatePrefixIndex(op.data, index, buffer, seq, offset)
else updateIndexValue(op, index, buffer, seq)
}
Expand All @@ -389,7 +442,10 @@ module.exports = function (log, indexesPath) {

index.offset = indexes['seq'].offset
if (indexNeedsUpdate) {
if (index.prefix) savePrefixIndex(op.data.indexName, index, count)
if (index.prefix && index.map)
savePrefixMapIndex(op.data.indexName, index, count)
else if (index.prefix)
savePrefixIndex(op.data.indexName, index, count)
else saveIndex(op.data.indexName, index)
}

Expand All @@ -401,7 +457,14 @@ module.exports = function (log, indexesPath) {
function createIndexes(opsMissingIndexes, cb) {
const newIndexes = {}
opsMissingIndexes.forEach((op) => {
if (op.data.prefix)
if (op.data.prefix && op.data.useMap) {
newIndexes[op.data.indexName] = {
offset: 0,
count: 0,
map: {},
prefix: typeof op.data.prefix === 'number' ? op.data.prefix : 32,
}
} else if (op.data.prefix)
newIndexes[op.data.indexName] = {
offset: 0,
count: 0,
Expand Down Expand Up @@ -443,6 +506,14 @@ module.exports = function (log, indexesPath) {
updatedSequenceIndex = true

opsMissingIndexes.forEach((op) => {
if (op.data.prefix && op.data.useMap)
updatePrefixMapIndex(
op.data,
newIndexes[op.data.indexName],
buffer,
seq,
offset
)
if (op.data.prefix)
updatePrefixIndex(
op.data,
Expand Down Expand Up @@ -473,7 +544,9 @@ module.exports = function (log, indexesPath) {
for (var indexName in newIndexes) {
const index = (indexes[indexName] = newIndexes[indexName])
index.offset = indexes['seq'].offset
if (index.prefix) savePrefixIndex(indexName, index, count)
if (index.prefix && index.map)
savePrefixMapIndex(indexName, index, count)
else if (index.prefix) savePrefixIndex(indexName, index, count)
else saveIndex(indexName, index)
}

Expand All @@ -485,7 +558,18 @@ module.exports = function (log, indexesPath) {
function loadLazyIndex(indexName, cb) {
debug('lazy loading %s', indexName)
let index = indexes[indexName]
if (index.prefix) {
if (index.prefix && index.map) {
loadPrefixMapFile(index.filepath, (err, data) => {
if (err) return cb(err)
const { version, offset, count, map } = data
index.version = version
index.offset = offset
index.count = count
index.map = map
index.lazy = false
cb()
})
} else if (index.prefix) {
loadTypedArrayFile(index.filepath, Uint32Array, (err, data) => {
if (err) return cb(err)
const { version, offset, count, tarr } = data
Expand Down Expand Up @@ -576,16 +660,27 @@ module.exports = function (log, indexesPath) {
function matchAgainstPrefix(op, prefixIndex, cb) {
const target = op.data.value
const targetPrefix = target ? safeReadUint32(target) : 0
const count = prefixIndex.count
const tarr = prefixIndex.tarr
const bitset = new TypedFastBitSet()
const done = multicb({ pluck: 1 })
for (let seq = 0; seq < count; ++seq) {
if (tarr[seq] === targetPrefix) {
bitset.add(seq)
getRecord(seq, done())

if (prefixIndex.map) {
if (prefixIndex.map[targetPrefix]) {
prefixIndex.map[targetPrefix].forEach((seq) => {
bitset.add(seq)
getRecord(seq, done())
})
}
} else {
const count = prefixIndex.count
const tarr = prefixIndex.tarr
for (let seq = 0; seq < count; ++seq) {
if (tarr[seq] === targetPrefix) {
bitset.add(seq)
getRecord(seq, done())
}
}
}

done((err, recs) => {
// FIXME: handle error better, this cb() should support 2 args
if (err) return console.error(err)
Expand Down
11 changes: 8 additions & 3 deletions operators.js
Expand Up @@ -107,16 +107,21 @@ function equal(seek, target, opts) {
const value = toBufferOrFalsy(target)
const valueName = !value ? '' : value.toString()
const indexType = opts.indexType
const indexName = opts.prefix
? safeFilename(indexType)
: safeFilename(indexType + '_' + valueName)
const indexName = safeFilename(
opts.prefix
? opts.useMap
? indexType + '_map'
: indexType
staltz marked this conversation as resolved.
Show resolved Hide resolved
: indexType + '_' + valueName
)
return {
type: 'EQUAL',
data: {
seek,
value,
indexType,
indexName,
useMap: opts.useMap,
indexAll: opts.indexAll,
prefix: opts.prefix,
},
Expand Down