Skip to content

Commit

Permalink
Merge fcc621a into 910d6e1
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Oct 14, 2019
2 parents 910d6e1 + fcc621a commit 0c8fac1
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 17 deletions.
37 changes: 21 additions & 16 deletions server/server.go
Expand Up @@ -907,9 +907,7 @@ func (ss *subStore) Store(sub *subState) error {
return err
}

ss.Lock()
ss.updateState(sub)
ss.Unlock()

return nil
}
Expand Down Expand Up @@ -938,6 +936,9 @@ func (ss *subStore) updateState(sub *subState) {
// maybe due to upgrades from much older releases that had bugs?).
// So don't panic and use as the shadow the one with the highest LastSent
// value.
if qs.shadow != nil {
ss.stan.log.Warnf("Duplicate shadow durable queue consumer (subid=%v) for group %q", sub.ID, sub.QGroup)
}
if qs.shadow == nil || sub.LastSent > qs.lastSent {
qs.shadow = sub
}
Expand Down Expand Up @@ -2361,17 +2362,21 @@ func (s *StanServer) processRecoveredChannels(channels map[string]*stores.Recove
return nil, err
}
if !s.isClustered {
ss := channel.ss
ss.Lock()
// Get the recovered subscriptions for this channel.
for _, recSub := range recoveredChannel.Subscriptions {
sub := s.recoverOneSub(channel, recSub.Sub, recSub.Pending, nil)
if sub != nil {
// Subscribe to subscription ACKs
if err := sub.startAckSub(s.nca, s.processAckMsg); err != nil {
ss.Unlock()
return nil, err
}
allSubs = append(allSubs, sub)
}
}
ss.Unlock()
// Now that we have recovered possible subscriptions for this channel,
// check if we should start the delete timer.
if channel.activity != nil {
Expand Down Expand Up @@ -3013,17 +3018,17 @@ func (s *StanServer) checkClientHealth(clientID string) {
// close the client (connection). This locks the
// client object internally so unlock here.
client.Unlock()
// If clustered, thread operations through Raft.
if s.isClustered {
s.barrier(func() {
s.barrier(func() {
// If clustered, thread operations through Raft.
if s.isClustered {
if err := s.replicateConnClose(&pb.CloseRequest{ClientID: clientID}); err != nil {
s.log.Errorf("[Client:%s] Failed to replicate disconnect on heartbeat expiration: %v",
clientID, err)
}
})
} else {
s.closeClient(clientID)
}
} else {
s.closeClient(clientID)
}
})
return
}
} else {
Expand Down Expand Up @@ -4605,15 +4610,13 @@ func (s *StanServer) updateDurable(ss *subStore, sub *subState, clientID string)
if err != nil {
return err
}
ss.Lock()
// Do this only for durable subscribers (not durable queue subscribers).
if sub.isDurableSubscriber() {
// Add back into plain subscribers
ss.psubs = append(ss.psubs, sub)
}
// And in ackInbox lookup map.
ss.acks[sub.AckInbox] = sub
ss.Unlock()

return nil
}
Expand All @@ -4633,6 +4636,9 @@ func (s *StanServer) processSub(c *channel, sr *pb.SubscriptionRequest, ackInbox
sub *subState
ss = c.ss
)

ss.Lock()

// Will be true for durable queue subscribers and durable subscribers alike.
isDurable := false
// Will be set to false for en existing durable subscriber or existing
Expand All @@ -4646,6 +4652,7 @@ func (s *StanServer) processSub(c *channel, sr *pb.SubscriptionRequest, ackInbox
if strings.Contains(sr.DurableName, ":") {
s.log.Errorf("[Client:%s] Invalid DurableName (%q) for queue subscriber from %s",
sr.ClientID, sr.DurableName, sr.Subject)
ss.Unlock()
return nil, ErrInvalidDurName
}
isDurable = true
Expand All @@ -4656,7 +4663,6 @@ func (s *StanServer) processSub(c *channel, sr *pb.SubscriptionRequest, ackInbox
}
// Lookup for an existing group. Only interested in situation where
// the group exist, but is empty and had a shadow subscriber.
ss.RLock()
qs := ss.qsubs[sr.QGroup]
if qs != nil {
qs.Lock()
Expand All @@ -4668,15 +4674,15 @@ func (s *StanServer) processSub(c *channel, sr *pb.SubscriptionRequest, ackInbox
qs.Unlock()
setStartPos = false
}
ss.RUnlock()
} else if sr.DurableName != "" {
// Check for DurableSubscriber status
if sub = ss.LookupByDurable(durableKey(sr)); sub != nil {
if sub = ss.durables[durableKey(sr)]; sub != nil {
sub.RLock()
clientID := sub.ClientID
sub.RUnlock()
if clientID != "" {
s.log.Errorf("[Client:%s] Duplicate durable subscription registration", sr.ClientID)
ss.Unlock()
return nil, ErrDupDurable
}
setStartPos = false
Expand Down Expand Up @@ -4766,14 +4772,13 @@ func (s *StanServer) processSub(c *channel, sr *pb.SubscriptionRequest, ackInbox
sub.ID = subID
err = s.addSubscription(ss, sub)
if err == nil && subID > 0 {
ss.Lock()
if subID >= c.nextSubID {
c.nextSubID = subID + 1
}
ss.Unlock()
}
}
}
ss.Unlock()
if err == nil && (!s.isClustered || s.isLeader()) {
err = sub.startAckSub(s.nca, s.processAckMsg)
if err == nil {
Expand Down
108 changes: 108 additions & 0 deletions server/server_queue_test.go
Expand Up @@ -1161,3 +1161,111 @@ func TestQueueNoRedeliveryDuringSubClose(t *testing.T) {
default:
}
}

func TestPersistentStoreDurableQueueSubRaceBetweenCreateAndClose(t *testing.T) {
cleanupDatastore(t)
defer cleanupDatastore(t)

opts := getTestDefaultOptsForPersistentStore()
s := runServerWithOpts(t, opts, nil)
defer shutdownRestartedServerOnTestExit(&s)

sc := NewDefaultConnection(t)
defer sc.Close()

qsub1, err := sc.QueueSubscribe("foo", "bar", func(_ *stan.Msg) {}, stan.DurableName("dur"))
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

cs := channelsGet(t, s.channels, "foo")
ss := &mockedSubStore{SubStore: cs.store.Subs}
cs.store.Subs = ss

// Will make the store create sub pause so that the addition of the new
// queue sub in the go routine will be delayed.
ss.Lock()
ss.ch = make(chan bool, 1)
ss.Unlock()
closeErrCh := make(chan error)
createErrCh := make(chan error)

sc2, err := stan.Connect(clusterName, clientName+"2")
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer sc2.Close()

// Since close operation uses a barrier that ensures that
// other operations complete, we need to start processing
// of qsub1.Close() start before we start the new queue sub,
// but ensure that it stops at one point. We will use the
// closeMu mutex to artificially block it.
s.closeMu.Lock()
go func() {
closeErrCh <- qsub1.Close()
}()
// Let the Close() proceed a bit...
time.Sleep(100 * time.Millisecond)
go func() {
_, err := sc2.QueueSubscribe("foo", "bar", func(_ *stan.Msg) {}, stan.DurableName("dur"))
createErrCh <- err
}()

// Let the QueueSubscribe() proceed a bit..
time.Sleep(100 * time.Millisecond)

// Now release the qsub1 Close()
s.closeMu.Unlock()

// Let the Close() complete, but we have to release the CreateSub()
// blockage from a different go routine otherwise with the fix,
// the Close() would still be blocked.
go func() {
time.Sleep(100 * time.Millisecond)
// Now release the addition to the subStore of the second queue sub
ss.ch <- true
}()

// Wait for the close to complete
if err := <-closeErrCh; err != nil {
t.Fatalf("Error on close: %v", err)
}
// Wait for create to complete
if err := <-createErrCh; err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

// Now close 2nd queue sub (connection close will do that for us)
sc2.Close()
// We don't need that connection either.
sc.Close()

// Shutdown and restart the server and make sure that we did not
// introduce a 2nd shadow subscription.
s.Shutdown()
l := &captureWarnLogger{}
opts.CustomLogger = l
s = runServerWithOpts(t, opts, nil)

cs = channelsGet(t, s.channels, "foo")
qs := cs.ss.qsubs["dur:bar"]
qs.RLock()
hasShadow := qs.shadow != nil
qs.RUnlock()
if !hasShadow {
t.Fatalf("Should have a shadow subscription")
}
var hasDuplicate bool
l.Lock()
for _, w := range l.warnings {
if strings.Contains(w, "Duplicate shadow durable") {
hasDuplicate = true
break
}
}
l.Unlock()
if hasDuplicate {
t.Fatalf("Duplicate shadow subscription found!")
}
}
6 changes: 6 additions & 0 deletions server/server_storefailures_test.go
Expand Up @@ -41,6 +41,7 @@ type mockedSubStore struct {
sync.RWMutex
fail bool
failFlushOnce bool
ch chan bool
}

func (ms *mockedStore) CreateChannel(name string) (*stores.Channel, error) {
Expand Down Expand Up @@ -253,7 +254,12 @@ func TestMsgLookupFailures(t *testing.T) {
func (ss *mockedSubStore) CreateSub(sub *spb.SubState) error {
ss.RLock()
fail := ss.fail
ch := ss.ch
ss.RUnlock()
if ch != nil {
// Wait for notification that we can continue
<-ch
}
if fail {
return fmt.Errorf("On purpose")
}
Expand Down
2 changes: 1 addition & 1 deletion server/snapshot.go
Expand Up @@ -429,8 +429,8 @@ func (r *raftFSM) restoreChannelsFromSnapshot(serverSnap *spb.RaftSnapshot, inNe
c.lSeqChecked = false

for _, ss := range sc.Subscriptions {
s.recoverOneSub(c, ss.State, nil, ss.AcksPending)
c.ss.Lock()
s.recoverOneSub(c, ss.State, nil, ss.AcksPending)
if ss.State.ID >= c.nextSubID {
c.nextSubID = ss.State.ID + 1
}
Expand Down

0 comments on commit 0c8fac1

Please sign in to comment.