Skip to content

Commit

Permalink
Merge pull request #37 from ssb-ngi-pointer/transaction
Browse files Browse the repository at this point in the history
Transaction support
  • Loading branch information
arj03 committed Sep 22, 2021
2 parents 85cc25b + 36f1140 commit 7a38a2a
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 1 deletion.
47 changes: 47 additions & 0 deletions index.js
Expand Up @@ -263,6 +263,52 @@ module.exports = function (filename, opts) {
cb(null, appendSingle(data))
}

function appendTransaction(dataArray, cb) {
if (!Array.isArray(dataArray))
return cb(new Error("appendTransaction expects first argument to be an array"))

let size = 0
const encodedDataArray = dataArray.map(data => {
let encodedData = codec.encode(data)
if (typeof encodedData === 'string')
encodedData = Buffer.from(encodedData)
size += recordSize(encodedData)
return encodedData
})

// we always leave 2 bytes at the end as the last record must be
// followed by a 0 (length) to signal end of record
size += 2

if (size > blockSize)
return cb(new Error("data larger than block size"))

if (nextWriteBlockOffset + size > blockSize)
{
// doesn't fit
const buffer = Buffer.alloc(blockSize)
latestBlock = buffer
latestBlockIndex += 1
nextWriteBlockOffset = 0
debug("data doesn't fit current block, creating new")
}

const fileOffsets = []
encodedDataArray.forEach(encodedData => {
appendRecord(latestBlock, encodedData, nextWriteBlockOffset)
cache.set(latestBlockIndex, latestBlock) // update cache
const fileOffset = nextWriteBlockOffset + latestBlockIndex * blockSize
fileOffsets.push(fileOffset)
nextWriteBlockOffset += recordSize(encodedData)
blocksToBeWritten.set(latestBlockIndex, { block: latestBlock, fileOffset })
debug("data inserted at offset %d", fileOffset)
})

scheduleWrite()

return cb(null, fileOffsets)
}

const scheduleWrite = debounce(write, writeTimeout)

function writeBlock(blockIndex) {
Expand Down Expand Up @@ -350,6 +396,7 @@ module.exports = function (filename, opts) {
get: onLoad(get),
del: onLoad(del),
append: onLoad(append),
appendTransaction: onLoad(appendTransaction),
close: onLoad(close),
since,
onReady,
Expand Down
41 changes: 40 additions & 1 deletion test/basic.js
Expand Up @@ -78,8 +78,47 @@ tape('basic json re-read', function (t) {
if(err) throw err
t.deepEqual(b2, json2)

t.end()
db.close(t.end)
})
})
})
})

tape('basic transaction', function (t) {
var file = '/tmp/dsf-test-basic-transaction-json.log'
try { fs.unlinkSync(file) } catch (_) {}
var db = Log(file, {
blockSize: 2*1024,
codec: require('flumecodec/json')
})

db.appendTransaction([json1, json2], function (err, offsets) {
if(err) throw err
t.equal(offsets[0], 0)
db.get(offsets[0], function (err, b) {
if(err) throw err
t.deepEqual(b, json1)

db.get(offsets[1], function (err, b2) {
if(err) throw err
t.deepEqual(b2, json2)

db.close(t.end)
})
})
})
})

tape('transaction fail', function (t) {
var file = '/tmp/dsf-test-transaction-tail-json.log'
try { fs.unlinkSync(file) } catch (_) {}
var db = Log(file, {
blockSize: 25,
codec: require('flumecodec/json')
})

db.appendTransaction([json1, json2], function (err, offsets) {
t.equal(err.message, 'data larger than block size', 'fails on too much data')
db.close(t.end)
})
})

0 comments on commit 7a38a2a

Please sign in to comment.