Skip to content

Commit

Permalink
Need to make sure we advance sequence correctly when no items
Browse files Browse the repository at this point in the history
Fixes #34
  • Loading branch information
mhart committed Jun 30, 2016
1 parent cf059b4 commit 7384fcc
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 6 deletions.
8 changes: 4 additions & 4 deletions actions/getRecords.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ module.exports = function getRecords(store, data, cb) {
if (!/^shardId-[\d]{12}$/.test(shardId) || !(shardIx >= 0 && shardIx < 2147483648))
return cb(invalidShardIterator())

if (!(iteratorTime > 0 && iteratorTime <= Date.now()))
if (!(iteratorTime > 0 && iteratorTime <= now))
return cb(invalidShardIterator())

if (!/[a-zA-Z0-9_.-]+/.test(streamName) || !streamName.length || streamName.length > 128)
Expand Down Expand Up @@ -67,7 +67,7 @@ module.exports = function getRecords(store, data, cb) {

cb = once(cb)

var streamDb = store.getStreamDb(streamName), cutoffTime = Date.now() - (stream.RetentionPeriodHours * 60 * 60 * 1000),
var streamDb = store.getStreamDb(streamName), cutoffTime = now - (stream.RetentionPeriodHours * 60 * 60 * 1000),
keysToDelete = [], lastItem, opts

opts = {
Expand All @@ -87,15 +87,15 @@ module.exports = function getRecords(store, data, cb) {
})
.filter(function(item) { return !item._tooOld })
.join(function(items) {
var nextSeq = lastItem ? db.incrementSequence(lastItem._seqObj) : seqNo,
var nextSeq = db.incrementSequence(lastItem ? lastItem._seqObj : seqObj, lastItem ? null : now),
nextShardIterator = db.createShardIterator(streamName, shardId, nextSeq),
millisBehind = 0

if (!items.length && stream.Shards[shardIx].SequenceNumberRange.EndingSequenceNumber) {
var endSeqObj = db.parseSequence(stream.Shards[shardIx].SequenceNumberRange.EndingSequenceNumber)
if (seqObj.seqTime >= endSeqObj.seqTime) {
nextShardIterator = undefined
millisBehind = Math.max(0, Date.now() - endSeqObj.seqTime)
millisBehind = Math.max(0, now - endSeqObj.seqTime)
}
}

Expand Down
4 changes: 2 additions & 2 deletions db/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,13 @@ function stringifySequence(obj) {
}
}

function incrementSequence(seqObj) {
function incrementSequence(seqObj, seqTime) {
if (typeof seqObj == 'string') seqObj = parseSequence(seqObj)

return stringifySequence({
shardCreateTime: seqObj.shardCreateTime,
seqIx: seqObj.seqIx,
seqTime: seqObj.seqTime + 1000,
seqTime: seqTime || (seqObj.seqTime + 1000),
shardIx: seqObj.shardIx,
})
}
Expand Down

0 comments on commit 7384fcc

Please sign in to comment.