Skip to content

Commit

Permalink
Merge pull request #1204 from nats-io/sql_chan_max_seq_from_sub_last_…
Browse files Browse the repository at this point in the history
…sent

[FIXED] SQL: sub last_sent could end-up higher than store last_seq
  • Loading branch information
kozlovic committed Jul 21, 2021
2 parents 81a1982 + bb3c98a commit f66422d
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 6 deletions.
19 changes: 13 additions & 6 deletions stores/sqlstore.go
Expand Up @@ -909,12 +909,6 @@ func (s *SQLStore) Recover() (*RecoveredState, error) {
msgStore.last = last
msgStore.totalCount = totalCount
msgStore.totalBytes = totalBytes
// Since messages may have been removed due to limits, update first/last
// based on known max sequence.
if maxseq > msgStore.last {
msgStore.first = maxseq + 1
msgStore.last = maxseq
}

subStore := s.newSQLSubStore(channelID, &channelLimits.SubStoreLimits)
// Prevent scheduling to flusher while we are recovering
Expand Down Expand Up @@ -988,11 +982,24 @@ func (s *SQLStore) Recover() (*RecoveredState, error) {
}

// Add to the recovered subscriptions
if sub.LastSent > maxseq {
maxseq = sub.LastSent
}
subscriptions = append(subscriptions, &RecoveredSubscription{Sub: sub, Pending: pendingAcks})
} else if lastSent > maxseq {
maxseq = lastSent
}
}
subRows.Close()

// Since messages may have been removed due to limits, or having a higher
// last_sent in some of the subscription, update first/last based on known
// max sequence.
if maxseq > msgStore.last {
msgStore.first = maxseq + 1
msgStore.last = maxseq
}

if !s.opts.NoCaching {
// Clear but also allow scheduling now that the recovery is complete.
subStore.cache.needsFlush = false
Expand Down
105 changes: 105 additions & 0 deletions stores/sqlstore_test.go
Expand Up @@ -2212,3 +2212,108 @@ func TestSQLBulkInsertLimit(t *testing.T) {
}
}
}

func TestSQLStoreMaxSeqWithExpiredMsgs(t *testing.T) {
if !doSQL {
t.SkipNow()
}

cleanupSQLDatastore(t)
defer cleanupSQLDatastore(t)

// Create store with caching enabled
limits := testDefaultStoreLimits
limits.MaxAge = 250 * time.Millisecond
s, err := NewSQLStore(testLogger, testSQLDriver, testSQLSource, &limits, SQLNoCaching(false))
if err != nil {
t.Fatalf("Error creating store: %v", err)
}
defer s.Close()

info := testDefaultServerInfo
if err := s.Init(&info); err != nil {
t.Fatalf("Error on init: %v", err)
}

cs := storeCreateChannel(t, s, "foo")

sub := spb.SubState{
ClientID: "me",
AckWaitInSecs: 30,
IsDurable: true,
MaxInFlight: 1024,
QGroup: "dur:queue",
Inbox: "my.inbox",
AckInbox: "ack.inbox",
}
if err := cs.Subs.CreateSub(&sub); err != nil {
t.Fatalf("Error creating sub: %v", err)
}

for seq := uint64(1); seq <= 100; seq++ {
msg := &pb.MsgProto{
Sequence: seq,
Subject: "foo",
Data: []byte(fmt.Sprintf("%v", seq)),
Timestamp: time.Now().UnixNano(),
}
if _, err := cs.Msgs.Store(msg); err != nil {
t.Fatalf("Error storing message: %v", err)
}
}
if err := cs.Msgs.Flush(); err != nil {
t.Fatalf("Error on flush: %v", err)
}

for seq := uint64(1); seq <= 100; seq++ {
cs.Subs.AddSeqPending(sub.ID, seq)
}
for seq := uint64(1); seq <= 98; seq++ {
cs.Subs.AckSeqPending(sub.ID, seq)
}
sub.IsClosed = true
cs.Subs.UpdateSub(&sub)
cs.Subs.Flush()

s.Close()

// Wait for more than max age
time.Sleep(500 * time.Millisecond)

for i := 0; i < 2; i++ {
s, err = NewSQLStore(testLogger, testSQLDriver, testSQLSource, &limits, SQLNoCaching(false))
if err != nil {
t.Fatalf("Error creating store: %v", err)
}
defer s.Close()

// Recover state
rs, err := s.Recover()
if err != nil {
t.Fatalf("Error on recovery: %v", err)
}
c, ok := rs.Channels["foo"]
if !ok {
t.Fatal("Did not recover channel foo")
}
cs = c.Channel
if i > 0 {
first, last, err := cs.Msgs.FirstAndLastSequence()
if err != nil {
t.Fatalf("Error getting first/last seq: %v", err)
}
if first != 101 && last != 100 {
t.Fatalf("Unexpected first/last sequence: %v/%v", first, last)
}

if l := len(c.Subscriptions); l != 1 {
t.Fatalf("Expected 1 sub, got %v", l)
}
ts := c.Subscriptions[0]
if ts.Sub.LastSent != 100 {
t.Fatalf("Unexpected sub's LastSent: %v", ts.Sub.LastSent)
}
}
s.Close()
}
}

0 comments on commit f66422d

Please sign in to comment.