diff --git a/stores/sqlstore.go b/stores/sqlstore.go index cde26161..430efb77 100644 --- a/stores/sqlstore.go +++ b/stores/sqlstore.go @@ -2033,14 +2033,13 @@ func (ss *SQLSubStore) flush() error { return err } for subid, ap := range ss.cache.subs { - prevLastSent := ap.prevLastSent - ap.prevLastSent = ap.lastSent if len(ap.msgs) == 0 && len(ap.acks) == 0 { // Update subscription's lastSent column if it has changed. - if ap.lastSent != prevLastSent { + if ap.lastSent != ap.prevLastSent { if _, err := tx.Exec(sqlStmts[sqlSubUpdateLastSent], ap.lastSent, ss.channelID, subid); err != nil { return err } + ap.prevLastSent = ap.lastSent } // Since there was no pending nor ack for this sub, simply continue // with the next subscription. diff --git a/stores/sqlstore_test.go b/stores/sqlstore_test.go index 598019d9..14d17f38 100644 --- a/stores/sqlstore_test.go +++ b/stores/sqlstore_test.go @@ -1658,3 +1658,64 @@ func TestSQLRecoverWithMaxBytes(t *testing.T) { } s.Close() } + +func TestSQLSubStoreFlush(t *testing.T) { + if !doSQL { + t.SkipNow() + } + cleanupSQLDatastore(t) + defer cleanupSQLDatastore(t) + + // Create a store with caching enabled + s, err := NewSQLStore(testLogger, testSQLDriver, testSQLSource, nil, SQLNoCaching(false)) + if err != nil { + t.Fatalf("Error creating store: %v", err) + } + defer s.Close() + info := testDefaultServerInfo + if err := s.Init(&info); err != nil { + s.Close() + t.Fatalf("Error on Init: %v", err) + } + + channel := "foo" + cs := storeCreateChannel(t, s, channel) + lastSent := uint64(3) + for i := uint64(1); i <= lastSent; i++ { + storeMsg(t, cs, channel, i, []byte("msg")) + } + + subID := storeSub(t, cs, channel) + for i := uint64(1); i <= lastSent; i++ { + storeSubPending(t, cs, channel, subID, i) + storeSubFlush(t, cs, channel) + storeSubAck(t, cs, channel, subID, i) + storeSubFlush(t, cs, channel) + } + s.Close() + + // Recover + s, err = NewSQLStore(testLogger, testSQLDriver, testSQLSource, nil, SQLNoCaching(false)) + if err != nil { + t.Fatalf("Error creating store: %v", err) + } + defer s.Close() + state, err := s.Recover() + if err != nil { + t.Fatalf("Error on recovery") + } + if count := len(state.Channels); count != 1 { + t.Fatalf("Expected 1 channel, got %v", count) + } + rc := state.Channels[channel] + if count := len(rc.Subscriptions); count != 1 { + t.Fatalf("Expected 1 subscription, got %v", count) + } + rs := rc.Subscriptions[0] + if count := len(rs.Pending); count != 0 { + t.Fatalf("Expected no pending message, got %v", count) + } + if rs.Sub.LastSent != lastSent { + t.Fatalf("Expected last sent to be %v, got %v", lastSent, rs.Sub.LastSent) + } +}