From e5eb10132b1995240914046b232ae232d77d271f Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Fri, 26 Apr 2019 14:14:48 -0600 Subject: [PATCH] [FIXED] Clustering: possible deadlock on sub/conn close There was a wrong assumption that calling raft.Apply() without waiting on the future result would be non blocking. This is not the case. RAFT internally uses go channels and there may be cases when a receiver on a channel is busy and so sending to that channel will block. This is what was happening. RAFT would invoke the streaming server code to apply the replication of a connection close. In this process we try to remove subscriptions of that connection. This requires the subStore lock. However, another go routine may be holding that lock (and possibly subscription's lock, queue state, etc..) and at the same time call raft.Apply() to replicate an aggregate of subscription's sent/ack operations. In some cases this would actually block, causing the deadlock. Reworked handling of replication of sub's sent/ack events so that this does not happen anymore. Resolves #812 Signed-off-by: Ivan Kozlovic --- server/clustering_test.go | 153 +++++++++++++++++++++- server/server.go | 263 +++++++++++++++++++++++++------------- 2 files changed, 325 insertions(+), 91 deletions(-) diff --git a/server/clustering_test.go b/server/clustering_test.go index 10c83d3d..a2fa8f15 100644 --- a/server/clustering_test.go +++ b/server/clustering_test.go @@ -2391,7 +2391,7 @@ func TestClusteringRaftLogReplay(t *testing.T) { waitForAcks(t, leader, clientName, 1, 1) subs := leader.clients.getSubs(clientName) // Flush the replication of SentMsg - leader.flushReplicatedSentAndAckSeqs(subs[0], true) + leader.replicateSubSentAndAck(subs[0]) atomic.StoreInt32(&doAckMsg, 1) servers = removeServer(servers, leader) @@ -5059,3 +5059,154 @@ func TestClusteringNoIncorrectMaxSubs(t *testing.T) { sc.Close() s1.Shutdown() } + +func TestClusteringDeadlockOnClientClose(t *testing.T) { + cleanupDatastore(t) + defer cleanupDatastore(t) + cleanupRaftLog(t) + defer cleanupRaftLog(t) + + // For this test, use a central NATS server. + ns := natsdTest.RunDefaultServer() + defer ns.Shutdown() + + // Configure first server + s1sOpts := getTestDefaultOptsForClustering("a", true) + s1 := runServerWithOpts(t, s1sOpts, nil) + defer s1.Shutdown() + + // Configure second server. + s2sOpts := getTestDefaultOptsForClustering("b", false) + s2 := runServerWithOpts(t, s2sOpts, nil) + defer s2.Shutdown() + + // Configure third server. + s3sOpts := getTestDefaultOptsForClustering("c", false) + s3 := runServerWithOpts(t, s3sOpts, nil) + defer s3.Shutdown() + + getLeader(t, 10*time.Second, s1, s2, s3) + + ch := make(chan bool, 1) + numMsgs := 1000 + numSubs := 20 + numQSubs := 5 + gCount := int32(0) + total := int32((numSubs + numQSubs) * numMsgs) + + cb := func(sc stan.Conn) func(*stan.Msg) { + count := 0 + return func(_ *stan.Msg) { + count++ + if count == numMsgs { + sc.Close() + } + if n := atomic.AddInt32(&gCount, 1); n == total { + ch <- true + } + } + } + + for i := 0; i < numSubs; i++ { + sc, err := stan.Connect(clusterName, fmt.Sprintf("sub%d", i)) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + defer sc.Close() + + // Create some plain, some durables + if i >= numSubs/2 { + if _, err := sc.Subscribe("foo", cb(sc), stan.DurableName("dur")); err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + } else { + if _, err := sc.Subscribe("foo", cb(sc)); err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + } + } + // Create queue subs on different groups so they each get a message + for i := 0; i < numQSubs; i++ { + sc, err := stan.Connect(clusterName, fmt.Sprintf("qsub%d", i)) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + defer sc.Close() + + if _, err := sc.QueueSubscribe("foo", fmt.Sprintf("group%d", i), cb(sc)); err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + } + + pubConn := NewDefaultConnection(t) + defer pubConn.Close() + + for i := 0; i < numMsgs; i++ { + if err := pubConn.Publish("foo", []byte("hello")); err != nil { + t.Fatalf("Error on publish: %v", err) + } + } + pubConn.Close() + + select { + case <-ch: + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive all msgs") + } + + waitForNumClients(t, s1, 0) + s1.Shutdown() +} + +func TestClusteringReplSubSentAckWhileClosing(t *testing.T) { + testSubSentAndAckSlowApply = true + defer func() { testSubSentAndAckSlowApply = false }() + + cleanupDatastore(t) + defer cleanupDatastore(t) + cleanupRaftLog(t) + defer cleanupRaftLog(t) + + // For this test, use a central NATS server. + ns := natsdTest.RunDefaultServer() + defer ns.Shutdown() + + // Configure first server + s1sOpts := getTestDefaultOptsForClustering("a", true) + s1 := runServerWithOpts(t, s1sOpts, nil) + defer s1.Shutdown() + + // Configure second server. + s2sOpts := getTestDefaultOptsForClustering("b", false) + s2 := runServerWithOpts(t, s2sOpts, nil) + defer s2.Shutdown() + + getLeader(t, 10*time.Second, s1, s2) + + sc := NewDefaultConnection(t) + defer sc.Close() + + for i := 0; i < 200; i++ { + if _, err := sc.PublishAsync("foo", []byte("hello"), nil); err != nil { + t.Fatalf("Error on publish: %v", err) + } + } + + count := 0 + cb := func(_ *stan.Msg) { + count++ + if count == 101 { + sc.Close() + } + } + if _, err := sc.Subscribe("foo", + cb, + stan.DurableName("dur"), + stan.DeliverAllAvailable(), + stan.SetManualAckMode()); err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + + waitForNumClients(t, s1, 0) + s1.Shutdown() +} diff --git a/server/server.go b/server/server.go index 10067da1..792cfa88 100644 --- a/server/server.go +++ b/server/server.go @@ -178,6 +178,7 @@ var ( clientCheckTimeout = defaultClientCheckTimeout lazyReplicationInterval = defaultLazyReplicationInterval testDeleteChannel bool + testSubSentAndAckSlowApply bool ) var ( @@ -653,7 +654,7 @@ type StanServer struct { raft *raftNode raftLogging bool isClustered bool - lazyRepl *lazyReplication + ssarepl *subsSentAndAckReplication snapReqSub *nats.Subscription // Our internal subscriptions @@ -671,9 +672,11 @@ type StanServer struct { pingResponseInvalidClientBytes []byte } -type lazyReplication struct { - sync.Mutex - subs map[*subState]struct{} +type subsSentAndAckReplication struct { + ready *sync.Map + waiting *sync.Map + gates *sync.Map + notifyCh chan struct{} } func (s *StanServer) isLeader() bool { @@ -734,14 +737,9 @@ type subState struct { } type subSentAndAck struct { - sent []uint64 - ack []uint64 - inFlusher bool -} - -func (sa *subSentAndAck) reset() { - sa.sent = sa.sent[:0] - sa.ack = sa.ack[:0] + sent []uint64 + ack []uint64 + applying bool } // Looks up, or create a new channel if it does not exist @@ -1807,9 +1805,14 @@ func (s *StanServer) start(runningState State) error { // If clustered, start Raft group. if s.isClustered { - s.lazyRepl = &lazyReplication{subs: make(map[*subState]struct{})} + s.ssarepl = &subsSentAndAckReplication{ + ready: &sync.Map{}, + waiting: &sync.Map{}, + gates: &sync.Map{}, + notifyCh: make(chan struct{}, 1), + } s.wg.Add(1) - go s.lazyReplicationOfSentAndAck() + go s.subsSentAndAckReplicator() // Default Raft log path to .// if not set. if s.opts.Clustering.RaftLogPath == "" { s.opts.Clustering.RaftLogPath = filepath.Join(s.opts.ID, s.opts.Clustering.NodeID) @@ -2051,6 +2054,7 @@ func (s *StanServer) leadershipLost() { sub.Lock() sub.stopAckSub() sub.clearAckTimer() + s.clearSentAndAck(sub) sub.Unlock() } } @@ -2901,10 +2905,12 @@ func (s *StanServer) checkClientHealth(clientID string) { client.Unlock() // If clustered, thread operations through Raft. if s.isClustered { - if err := s.replicateConnClose(&pb.CloseRequest{ClientID: clientID}); err != nil { - s.log.Errorf("[Client:%s] Failed to replicate disconnect on heartbeat expiration: %v", - clientID, err) - } + s.barrier(func() { + if err := s.replicateConnClose(&pb.CloseRequest{ClientID: clientID}); err != nil { + s.log.Errorf("[Client:%s] Failed to replicate disconnect on heartbeat expiration: %v", + clientID, err) + } + }) } else { s.closeClient(clientID) } @@ -2995,7 +3001,7 @@ func (s *StanServer) replicateConnClose(req *pb.CloseRequest) error { // flush the pending replication of sent/ack. subs := s.clients.getSubs(req.ClientID) for _, sub := range subs { - s.flushReplicatedSentAndAckSeqs(sub, true) + s.endSubSentAndAckReplication(sub, false) } op := &spb.RaftOperation{ @@ -3444,111 +3450,189 @@ func (s *StanServer) getMsgForRedelivery(c *channel, sub *subState, seq uint64) return &mcopy } -func createSubSentAndAck() *subSentAndAck { - return &subSentAndAck{ - sent: make([]uint64, 0), - ack: make([]uint64, 0), +// Little helper to signal a "chan struct{}" without risk of blocking. +func signalCh(c chan struct{}) { + select { + case c <- struct{}{}: + default: } } -// Lazyly replicates the fact that the server sent a message to a given subscription, -// or the subsription ack'ed a message. +// Keep track of sent or ack messages. +// If the number of operations reach a certain threshold, +// the sub is added to list of subs that should be flushed asap. +// This call does not do actual RAFT replication and should not block. // Caller holds the sub's Lock. -func (s *StanServer) replicateSentOrAck(sub *subState, sent bool, sequence uint64) { +func (s *StanServer) collectSentOrAck(sub *subState, sent bool, sequence uint64) { + sr := s.ssarepl if sub.replicate == nil { - sub.replicate = createSubSentAndAck() + sub.replicate = &subSentAndAck{ + sent: make([]uint64, 0, 100), + ack: make([]uint64, 0, 100), + } } - repl := sub.replicate + r := sub.replicate if sent { - repl.sent = append(repl.sent, sequence) + r.sent = append(r.sent, sequence) } else { - repl.ack = append(repl.ack, sequence) + r.ack = append(r.ack, sequence) } - if len(repl.sent)+len(repl.ack) >= 100 { - s.replicateSentAndAckSeqs(sub) - } else if !repl.inFlusher { - s.lazyRepl.Lock() - s.lazyRepl.subs[sub] = struct{}{} - repl.inFlusher = true - s.lazyRepl.Unlock() + // This function is called with exactly one event at a time. + // Use exact count to decide when to add to given map. This + // avoid the need for booleans to not add more than once. + l := len(r.sent) + len(r.ack) + if l == 1 { + sr.waiting.Store(sub, struct{}{}) + } else if l == 100 { + sr.waiting.Delete(sub) + sr.ready.Store(sub, struct{}{}) + signalCh(sr.notifyCh) } } -// Replicates through raft -// Caller holds the sub's Lock. -func (s *StanServer) replicateSentAndAckSeqs(sub *subState) { +// Replicates through RAFT +func (s *StanServer) replicateSubSentAndAck(sub *subState) { + var data []byte + + sr := s.ssarepl + sub.Lock() + r := sub.replicate + if r != nil && len(r.sent)+len(r.ack) > 0 { + data = createSubSentAndAckProto(sub, r) + r.sent = r.sent[:0] + r.ack = r.ack[:0] + r.applying = true + } + sub.Unlock() + + if data != nil { + if testSubSentAndAckSlowApply { + time.Sleep(100 * time.Millisecond) + } + s.raft.Apply(data, 0) + + sub.Lock() + r = sub.replicate + // If r is nil it means either that the leader lost leadrship, + // in which case we don't do anything, or the sub/conn is being + // closed and endSubSentAndAckReplication() is waiting on a + // channel stored in "gates" map. If we find it, signal. + if r == nil { + if c, ok := sr.gates.Load(sub); ok { + sr.gates.Delete(sub) + signalCh(c.(chan struct{})) + } + } else { + r.applying = false + } + sub.Unlock() + } +} + +// Little helper function to create a RaftOperation_SendAndAck protocol +// and serialize it. +func createSubSentAndAckProto(sub *subState, r *subSentAndAck) []byte { op := &spb.RaftOperation{ OpType: spb.RaftOperation_SendAndAck, SubSentAck: &spb.SubSentAndAck{ Channel: sub.subject, AckInbox: sub.AckInbox, - Sent: sub.replicate.sent, - Ack: sub.replicate.ack, + Sent: r.sent, + Ack: r.ack, }, } data, err := op.Marshal() if err != nil { panic(err) } - s.raft.Apply(data, 0) - sub.replicate.reset() + return data } -// Possibly issue a raft replication for the given subscription -// if there are pending sent/ack operations. -// This is called by a go-routine that does lazy replication, or -// when a subscription/connection is closed. In such case, we may -// skip the replication if the subscription is not durable for instance, -// because the subscription is going away anyway. -func (s *StanServer) flushReplicatedSentAndAckSeqs(sub *subState, onClose bool) { +// This is called when a subscription is closed or unsubscribed, or +// a connection is closed, but prior to the RAFT replication of such +// event. +// Depending on the type of event, we want to make sure that we flush +// the possibly remaning sent/ack events. +func (s *StanServer) endSubSentAndAckReplication(sub *subState, unsub bool) { + var ch chan struct{} + var data []byte + sub.Lock() - if sub.replicate != nil { - if len(sub.replicate.sent) > 0 || len(sub.replicate.ack) > 0 { - if !onClose || (sub.IsDurable || sub.qstate != nil) { - s.replicateSentAndAckSeqs(sub) - } - } - // When called from the lazy replication go-routine, onClose is false - // and the sub has already been removed from the map. - if onClose && sub.replicate.inFlusher { - s.lazyRepl.Lock() - delete(s.lazyRepl.subs, sub) - s.lazyRepl.Unlock() - } - sub.replicate.inFlusher = false + r := sub.replicate + if r == nil { + sub.Unlock() + return } + if !unsub && (sub.IsDurable || sub.qstate != nil) && len(r.sent)+len(r.ack) > 0 { + data = createSubSentAndAckProto(sub, r) + } + // If the replicator is about to apply, or in middle of it, we + // want to wait for it to finish regardless if we have to replicate + // something or not. We are not expecting this situation to occur often. + if r.applying { + ch = make(chan struct{}, 1) + s.ssarepl.gates.Store(sub, ch) + } + s.clearSentAndAck(sub) sub.Unlock() + + if ch != nil { + <-ch + } + if data != nil { + s.raft.Apply(data, 0) + } +} + +// Sub lock is held on entry +func (s *StanServer) clearSentAndAck(sub *subState) { + sr := s.ssarepl + sr.waiting.Delete(sub) + sr.ready.Delete(sub) + sub.replicate = nil } -// long-lived go-routine that periodically replicate -// subscriptions' pending Sent and/or Ack operations. -func (s *StanServer) lazyReplicationOfSentAndAck() { +// long-lived go-routine that performs RAFT replication of subscriptions' +// sent/ack operations. +func (s *StanServer) subsSentAndAckReplicator() { defer s.wg.Done() s.mu.Lock() - lr := s.lazyRepl + sr := s.ssarepl + ready := sr.ready + waiting := sr.waiting + notifyCh := sr.notifyCh s.mu.Unlock() ticker := time.NewTicker(lazyReplicationInterval) flush := func() { - lr.Lock() - for sub := range lr.subs { - delete(lr.subs, sub) - lr.Unlock() - s.flushReplicatedSentAndAckSeqs(sub, false) - lr.Lock() - } - lr.Unlock() + ready.Range(func(k, _ interface{}) bool { + ready.Delete(k) + s.replicateSubSentAndAck(k.(*subState)) + return true + }) } for { select { case <-s.shutdownCh: - // Try to flush outstanding before returning - flush() return case <-ticker.C: + addedToReady := false + waiting.Range(func(k, _ interface{}) bool { + waiting.Delete(k) + // Move to ready map + ready.Store(k, struct{}{}) + addedToReady = true + return true + }) + // If some were transferred and nobody has signaled + // to flush the ready ones, do it here + if addedToReady && len(notifyCh) == 0 { + flush() + } + case <-notifyCh: flush() } } @@ -3658,10 +3742,9 @@ func (s *StanServer) sendMsgToSub(sub *subState, m *pb.MsgProto, force bool) (bo return true, true } - // If in cluster mode, trigger replication (but leader does - // not wait on quorum result). + // If in cluster mode, schedule replication of the sent event. if s.isClustered { - s.replicateSentOrAck(sub, replicateSent, m.Sequence) + s.collectSentOrAck(sub, replicateSent, m.Sequence) } // Store in storage @@ -4182,19 +4265,20 @@ func (s *StanServer) replicateRemoveSubscription(req *pb.UnsubscribeRequest) err } func (s *StanServer) replicateCloseSubscription(req *pb.UnsubscribeRequest) error { + return s.replicateUnsubscribe(req, spb.RaftOperation_CloseSubscription) +} + +func (s *StanServer) replicateUnsubscribe(req *pb.UnsubscribeRequest, opType spb.RaftOperation_Type) error { // When closing a subscription, we need to possibly "flush" the // pending sent/ack that need to be replicated c := s.channels.get(req.Subject) if c != nil { sub := c.ss.LookupByAckInbox(req.Inbox) if sub != nil { - s.flushReplicatedSentAndAckSeqs(sub, true) + unsub := opType == spb.RaftOperation_RemoveSubscription + s.endSubSentAndAckReplication(sub, unsub) } } - return s.replicateUnsubscribe(req, spb.RaftOperation_CloseSubscription) -} - -func (s *StanServer) replicateUnsubscribe(req *pb.UnsubscribeRequest, opType spb.RaftOperation_Type) error { op := &spb.RaftOperation{ OpType: opType, Unsub: req, @@ -4835,10 +4919,9 @@ func (s *StanServer) processAck(c *channel, sub *subState, sequence uint64, from } if _, found := sub.acksPending[sequence]; found { - // If in cluster mode, replicate the ack but leader - // does not wait on quorum result. + // If in cluster mode, schedule replication of the ack. if s.isClustered { - s.replicateSentOrAck(sub, replicateAck, sequence) + s.collectSentOrAck(sub, replicateAck, sequence) } if s.trace && fromUser { s.log.Tracef("[Client:%s] Processing ack for subid=%d, subject=%s, seq=%d",