Skip to content

Commit

Permalink
Merge 5bead65 into 5452fec
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Jul 13, 2018
2 parents 5452fec + 5bead65 commit f102ef5
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 3 deletions.
5 changes: 2 additions & 3 deletions stores/sqlstore.go
Expand Up @@ -2033,14 +2033,13 @@ func (ss *SQLSubStore) flush() error {
return err
}
for subid, ap := range ss.cache.subs {
prevLastSent := ap.prevLastSent
ap.prevLastSent = ap.lastSent
if len(ap.msgs) == 0 && len(ap.acks) == 0 {
// Update subscription's lastSent column if it has changed.
if ap.lastSent != prevLastSent {
if ap.lastSent != ap.prevLastSent {
if _, err := tx.Exec(sqlStmts[sqlSubUpdateLastSent], ap.lastSent, ss.channelID, subid); err != nil {
return err
}
ap.prevLastSent = ap.lastSent
}
// Since there was no pending nor ack for this sub, simply continue
// with the next subscription.
Expand Down
61 changes: 61 additions & 0 deletions stores/sqlstore_test.go
Expand Up @@ -1658,3 +1658,64 @@ func TestSQLRecoverWithMaxBytes(t *testing.T) {
}
s.Close()
}

func TestSQLSubStoreFlush(t *testing.T) {
if !doSQL {
t.SkipNow()
}
cleanupSQLDatastore(t)
defer cleanupSQLDatastore(t)

// Create a store with caching enabled
s, err := NewSQLStore(testLogger, testSQLDriver, testSQLSource, nil, SQLNoCaching(false))
if err != nil {
t.Fatalf("Error creating store: %v", err)
}
defer s.Close()
info := testDefaultServerInfo
if err := s.Init(&info); err != nil {
s.Close()
t.Fatalf("Error on Init: %v", err)
}

channel := "foo"
cs := storeCreateChannel(t, s, channel)
lastSent := uint64(3)
for i := uint64(1); i <= lastSent; i++ {
storeMsg(t, cs, channel, i, []byte("msg"))
}

subID := storeSub(t, cs, channel)
for i := uint64(1); i <= lastSent; i++ {
storeSubPending(t, cs, channel, subID, i)
storeSubFlush(t, cs, channel)
storeSubAck(t, cs, channel, subID, i)
storeSubFlush(t, cs, channel)
}
s.Close()

// Recover
s, err = NewSQLStore(testLogger, testSQLDriver, testSQLSource, nil, SQLNoCaching(false))
if err != nil {
t.Fatalf("Error creating store: %v", err)
}
defer s.Close()
state, err := s.Recover()
if err != nil {
t.Fatalf("Error on recovery")
}
if count := len(state.Channels); count != 1 {
t.Fatalf("Expected 1 channel, got %v", count)
}
rc := state.Channels[channel]
if count := len(rc.Subscriptions); count != 1 {
t.Fatalf("Expected 1 subscription, got %v", count)
}
rs := rc.Subscriptions[0]
if count := len(rs.Pending); count != 0 {
t.Fatalf("Expected no pending message, got %v", count)
}
if rs.Sub.LastSent != lastSent {
t.Fatalf("Expected last sent to be %v, got %v", lastSent, rs.Sub.LastSent)
}
}

0 comments on commit f102ef5

Please sign in to comment.