Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Account for offline durables and for possible error on sub create #823

Merged
merged 1 commit into from May 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 11 additions & 7 deletions server/server.go
Expand Up @@ -2292,6 +2292,9 @@ func (s *StanServer) recoverOneSub(c *channel, recSub *spb.SubState, pendingAcks
// durable would not be able to be restarted.
sub.savedClientID = sub.ClientID
sub.ClientID = ""
s.monMu.Lock()
s.numSubs++
s.monMu.Unlock()
}

// Create a subState
Expand Down Expand Up @@ -4642,10 +4645,17 @@ func (s *StanServer) processSub(c *channel, sr *pb.SubscriptionRequest, ackInbox
s.nca.Flush()
}
}
// Do this before so that if we have to remove, count will be ok.
if subIsNew {
s.monMu.Lock()
s.numSubs++
s.monMu.Unlock()
}
if err != nil {
// Try to undo what has been done.
s.clients.removeSub(sr.ClientID, sub)
s.closeMu.Lock()
ss.Remove(c, sub, false)
ss.Remove(c, sub, subIsNew)
s.closeMu.Unlock()
s.log.Errorf("Unable to add subscription for %s: %v", sr.Subject, err)
return nil, err
Expand All @@ -4655,12 +4665,6 @@ func (s *StanServer) processSub(c *channel, sr *pb.SubscriptionRequest, ackInbox
traceSubState(s.log, sub, &traceCtx)
}

if subIsNew {
s.monMu.Lock()
s.numSubs++
s.monMu.Unlock()
}

return sub, nil
}

Expand Down
10 changes: 10 additions & 0 deletions server/server_storefailures_test.go
Expand Up @@ -250,6 +250,16 @@ func TestMsgLookupFailures(t *testing.T) {
sub.Unsubscribe()
}

func (ss *mockedSubStore) CreateSub(sub *spb.SubState) error {
ss.RLock()
fail := ss.fail
ss.RUnlock()
if fail {
return fmt.Errorf("On purpose")
}
return ss.SubStore.CreateSub(sub)
}

func (ss *mockedSubStore) AddSeqPending(subid, seq uint64) error {
ss.RLock()
fail := ss.fail
Expand Down
139 changes: 139 additions & 0 deletions server/server_sub_test.go
Expand Up @@ -1144,3 +1144,142 @@ func TestSubAckInboxFromOlderStore(t *testing.T) {
// ok...
}
}

func checkNumSubs(t *testing.T, s *StanServer, expected int) {
t.Helper()
waitFor(t, 2*time.Second, 15*time.Millisecond, func() error {
s.monMu.Lock()
count := s.numSubs
s.monMu.Unlock()
if count != expected {
return fmt.Errorf("Expected %v subscriptions, got %v", expected, count)
}
return nil
})
}

func TestNumSubs(t *testing.T) {
cleanupDatastore(t)
defer cleanupDatastore(t)

ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

opts := getTestDefaultOptsForPersistentStore()
opts.NATSServerURL = "nats://127.0.0.1:4222"
s := runServerWithOpts(t, opts, nil)
defer shutdownRestartedServerOnTestExit(&s)

sc := NewDefaultConnection(t)
defer sc.Close()

dur, err := sc.Subscribe("foo", func(_ *stan.Msg) {}, stan.DurableName("dur"))
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
checkNumSubs(t, s, 1)

s.Shutdown()
s = runServerWithOpts(t, opts, nil)

checkNumSubs(t, s, 1)
dur.Close()
checkNumSubs(t, s, 1)

s.Shutdown()
s = runServerWithOpts(t, opts, nil)

checkNumSubs(t, s, 1)

// Resume the durable
dur, err = sc.Subscribe("foo", func(_ *stan.Msg) {}, stan.DurableName("dur"))
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
checkNumSubs(t, s, 1)
// Now unsubscribe
dur.Unsubscribe()
checkNumSubs(t, s, 0)
sc.Close()

s.Shutdown()
s = runServerWithOpts(t, opts, nil)
checkNumSubs(t, s, 0)
}

func TestNumSubsOnSubFailure(t *testing.T) {
s := runServer(t, clusterName)
defer s.Shutdown()

s.channels.Lock()
s.channels.store = &mockedStore{Store: s.channels.store}
s.channels.Unlock()

cs, _ := s.lookupOrCreateChannel("foo")
mss := cs.store.Subs.(*mockedSubStore)
mss.Lock()
mss.fail = true
mss.Unlock()

sc := NewDefaultConnection(t)
defer sc.Close()

checkNumSubs(t, s, 0)
// Create a plain sub that fails
if _, err := sc.Subscribe("foo", func(_ *stan.Msg) {}); err == nil {
t.Fatalf("Expected failure")
}
checkNumSubs(t, s, 0)
// Try with durable
if _, err := sc.Subscribe("foo", func(_ *stan.Msg) {}, stan.DurableName("dur")); err == nil {
t.Fatalf("Expected failure")
}
checkNumSubs(t, s, 0)
// And queue durable
if _, err := sc.QueueSubscribe("foo", "queue", func(_ *stan.Msg) {}, stan.DurableName("dur")); err == nil {
t.Fatalf("Expected failure")
}

mss.Lock()
mss.fail = false
mss.Unlock()

// Now create a durable and durable queue group.
dur, err := sc.Subscribe("foo", func(_ *stan.Msg) {}, stan.DurableName("dur"))
if err != nil {
t.Fatalf("Error on sub: %v", err)
}
qdur, err := sc.QueueSubscribe("foo", "queue", func(_ *stan.Msg) {}, stan.DurableName("dur"))
if err != nil {
t.Fatalf("Error on sub: %v", err)
}
checkNumSubs(t, s, 2)
// Close them
dur.Close()
checkNumSubs(t, s, 2)
qdur.Close()
checkNumSubs(t, s, 2)

// Enable failure again
mss.Lock()
mss.fail = true
mss.Unlock()
// Try to resume, which should fail
if _, err := sc.Subscribe("foo", func(_ *stan.Msg) {}, stan.DurableName("dur")); err == nil {
t.Fatalf("Expected failure")
}
checkNumSubs(t, s, 2)
if _, err := sc.QueueSubscribe("foo", "queue", func(_ *stan.Msg) {}, stan.DurableName("dur")); err == nil {
t.Fatalf("Expected failure")
}
checkNumSubs(t, s, 2)

// Now add a plain sub which will fail, then close connection
// and make sure count is not decremented.
if _, err := sc.Subscribe("foo", func(_ *stan.Msg) {}, stan.DurableName("dur2")); err == nil {
t.Fatalf("Expected error")
}
checkNumSubs(t, s, 2)
sc.Close()
checkNumSubs(t, s, 2)
}