Skip to content

Commit

Permalink
[FIXED] SQLStore: Incorrect first/last seq after recovery and expired…
Browse files Browse the repository at this point in the history
… msgs

There could be an issue with the SQLStore when server is restarted
and messages are expired during the recovery process. The first/last
sequence for a channel could be incorrect which would cause subscriptions
to possibly not receive messages since their last_sent could be higher
than the new last_seq for that channel.

Resolves #655

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Sep 28, 2018
1 parent 76c9471 commit bda8352
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 5 deletions.
20 changes: 15 additions & 5 deletions stores/sqlstore.go
Expand Up @@ -817,7 +817,8 @@ func (s *SQLStore) Recover() (*RecoveredState, error) {
var (
channelID int64
name string
maxseq uint64
maxseq uint64 // We get that from the Channels table.
mmseq uint64 // This is the max seq found in the Messages table for given channel.
)
if err := channelRows.Scan(&channelID, &name, &maxseq); err != nil {
return nil, err
Expand All @@ -827,6 +828,16 @@ func (s *SQLStore) Recover() (*RecoveredState, error) {

msgStore := s.newSQLMsgStore(name, channelID, &channelLimits.MsgStoreLimits)

// We need to get the last seq from messages table before possibly expiring messages.
r = s.preparedStmts[sqlGetLastSeq].QueryRow(channelID)
if err := r.Scan(&mmseq); err != nil {
return nil, sqlStmtError(sqlGetLastSeq, err)
}
// If it is more than the one that was updated in the Channel row, then use this one.
if mmseq > maxseq {
maxseq = mmseq
}

if err := s.applyLimitsOnRecovery(msgStore); err != nil {
return nil, err
}
Expand All @@ -846,10 +857,9 @@ func (s *SQLStore) Recover() (*RecoveredState, error) {
msgStore.last = last
msgStore.totalCount = totalCount
msgStore.totalBytes = totalBytes
// If all messages have expired, the above should all be 0, however,
// the Channel table may contain a maxseq that we should use as starting
// point.
if msgStore.last == 0 {
// Since messages may have been removed due to limits, update first/last
// based on known max sequence.
if maxseq > msgStore.last {
msgStore.first = maxseq + 1
msgStore.last = maxseq
}
Expand Down
35 changes: 35 additions & 0 deletions stores/sqlstore_test.go
Expand Up @@ -1716,3 +1716,38 @@ func TestSQLSubStoreFlush(t *testing.T) {
t.Fatalf("Expected last sent to be %v, got %v", lastSent, rs.Sub.LastSent)
}
}

func TestSQLRecoverLastSeqAfterMessagesExpired(t *testing.T) {
if !doSQL {
t.SkipNow()
}
cleanupSQLDatastore(t)
defer cleanupSQLDatastore(t)

s := createDefaultSQLStore(t)
defer s.Close()

cs := storeCreateChannel(t, s, "foo")
payload := make([]byte, 100)
storeMsg(t, cs, "foo", 1, payload)
storeMsg(t, cs, "foo", 2, payload)
storeMsg(t, cs, "foo", 3, payload)
storeMsg(t, cs, "foo", 4, payload)
storeMsg(t, cs, "foo", 5, payload)
s.Close()

// Sleep for more than the maxAge we will then set
time.Sleep(700 * time.Millisecond)

// Restart store with a maxAge
limits := testDefaultStoreLimits
limits.MaxAge = 250 * time.Millisecond
s, state := openDefaultSQLStoreWithLimits(t, &limits)
defer s.Close()
cs = state.Channels["foo"].Channel
first, last := msgStoreFirstAndLastSequence(t, cs.Msgs)
if first != 6 && last != 5 {
t.Fatalf("Should be left with 6..5, got %v..%v", first, last)
}
s.Close()
}

0 comments on commit bda8352

Please sign in to comment.