Skip to content

Commit

Permalink
Merge pull request #620 from nats-io/fix_sql_no_caching_on_recovery
Browse files Browse the repository at this point in the history
[FIXED] SQL: Possible panic on recovery with -sql_no_caching
  • Loading branch information
kozlovic committed Jul 12, 2018
2 parents 823db0a + 8a6b2fa commit e49798c
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 17 deletions.
41 changes: 24 additions & 17 deletions stores/sqlstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1948,22 +1948,27 @@ func (ss *SQLSubStore) recoverPendingRow(rows *sql.Rows, sub *spb.SubState, ap *
}
pendingAcks[seq] = struct{}{}
} else {
row := &sqlSubsPendingRow{
ID: ss.curRow,
msgs: sqlSeqMapPool.Get().(map[uint64]struct{}),
var row *sqlSubsPendingRow
if ap != nil {
row = &sqlSubsPendingRow{
ID: ss.curRow,
msgs: sqlSeqMapPool.Get().(map[uint64]struct{}),
}
ap.lastSent = lastSent
ap.prevLastSent = lastSent
}
ap.lastSent = lastSent
ap.prevLastSent = lastSent

if lastSent > sub.LastSent {
sub.LastSent = lastSent
}
if len(pendingBytes) > 0 {
if err := sqlDecodeSeqs(pendingBytes, func(seq uint64) {
pendingAcks[seq] = struct{}{}
row.msgsRefs++
row.msgs[seq] = struct{}{}
ap.msgToRow[seq] = row
if ap != nil {
row.msgsRefs++
row.msgs[seq] = struct{}{}
ap.msgToRow[seq] = row
}
}); err != nil {
return err
}
Expand All @@ -1972,15 +1977,17 @@ func (ss *SQLSubStore) recoverPendingRow(rows *sql.Rows, sub *spb.SubState, ap *
if err := sqlDecodeSeqs(acksBytes, func(seq uint64) {
if _, exists := pendingAcks[seq]; exists {
delete(pendingAcks, seq)
row.acksRefs++
ap.ackToRow[seq] = row

seqRow := ap.msgToRow[seq]
if seqRow != nil {
delete(ap.msgToRow, seq)
seqRow.msgsRefs--
if seqRow.msgsRefs == 0 && seqRow.acksRefs == 0 {
gcedRows[seqRow.ID] = struct{}{}
if ap != nil {
row.acksRefs++
ap.ackToRow[seq] = row

seqRow := ap.msgToRow[seq]
if seqRow != nil {
delete(ap.msgToRow, seq)
seqRow.msgsRefs--
if seqRow.msgsRefs == 0 && seqRow.acksRefs == 0 {
gcedRows[seqRow.ID] = struct{}{}
}
}
}
}
Expand Down
44 changes: 44 additions & 0 deletions stores/sqlstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,10 @@ func TestSQLSubStoreCaching(t *testing.T) {
t.Fatalf("Error creating store: %v", err)
}
defer s.Close()
info := testDefaultServerInfo
if err := s.Init(&info); err != nil {
t.Fatalf("Error on init: %v", err)
}

channel := "foo"
cs := storeCreateChannel(t, s, channel)
Expand Down Expand Up @@ -1004,6 +1008,46 @@ func TestSQLSubStoreCaching(t *testing.T) {
// Delete sub
storeSubDelete(t, cs, channel, subID)
testSQLCheckPendingRow(t, db, subID)

// Start with fresh subscription
subID = storeSub(t, cs, channel)
// Store some pending messages
storeSubPending(t, cs, channel, subID, 1, 2, 3)
storeSubAck(t, cs, channel, subID, 2)
storeSubFlush(t, cs, channel)

// Recover with NoCaching enabled
s.Close()
s, err = NewSQLStore(testLogger, testSQLDriver, testSQLSource, nil, SQLNoCaching(true), SQLMaxOpenConns(5))
if err != nil {
t.Fatalf("Error opening store: %v", err)
}
defer s.Close()
state, err := s.Recover()
if err != nil {
t.Fatalf("Error on recovery: %v", err)
}
rc := state.Channels[channel]
if rc == nil {
t.Fatalf("Expected channel %q to be recovered", channel)
}
if count := len(rc.Subscriptions); count != 1 {
t.Fatalf("Expected 1 subscription to be recovered, got %v", count)
}
rs := rc.Subscriptions[0]
if count := len(rs.Pending); count != 2 {
t.Fatalf("Expected 2 pending messages, got %v", count)
}
if _, ok := rs.Pending[1]; !ok {
t.Fatalf("Expected seq 1 to be in pending")
}
if _, ok := rs.Pending[3]; !ok {
t.Fatalf("Expected seq 1 to be in pending")
}
last := rs.Sub.LastSent
if last != 3 {
t.Fatalf("Expected last sent to be 3, got %v", last)
}
}

func TestSQLSubStoreCachingFlushInterval(t *testing.T) {
Expand Down

0 comments on commit e49798c

Please sign in to comment.