Skip to content

Commit

Permalink
Merge d2f0ea5 into 32f9935
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Jan 18, 2022
2 parents 32f9935 + d2f0ea5 commit 1fa94bb
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 48 deletions.
9 changes: 7 additions & 2 deletions server/clustering.go
@@ -1,4 +1,4 @@
// Copyright 2017-2021 The NATS Authors
// Copyright 2017-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -650,10 +650,15 @@ func (r *raftFSM) Apply(l *raft.Log) interface{} {
case spb.RaftOperation_RemoveSubscription:
fallthrough
case spb.RaftOperation_CloseSubscription:
c, sub := s.getChannelAndSubForSubCloseOrUnsub(op.Unsub)
// Could be that the channel has been removed due to inactivity, etc..
if c == nil || sub == nil {
return nil
}
// Close/Unsub subscription replication.
isSubClose := op.OpType == spb.RaftOperation_CloseSubscription
s.closeMu.Lock()
err := s.unsubscribe(op.Unsub, isSubClose)
err := s.unsubscribeSub(c, op.Unsub.ClientID, sub, isSubClose, true)
s.closeMu.Unlock()
return err
case spb.RaftOperation_SendAndAck:
Expand Down
95 changes: 94 additions & 1 deletion server/clustering_test.go
@@ -1,4 +1,4 @@
// Copyright 2017-2021 The NATS Authors
// Copyright 2017-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -8846,3 +8846,96 @@ func TestClusteringQueueNoPendingCountIfNoMsg(t *testing.T) {
return nil
})
}

type captureSubCloseErrLogger struct {
dummyLogger
errCh chan string
}

func (l *captureSubCloseErrLogger) Errorf(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
if strings.Contains(msg, "sub close request for unknown") {
select {
case l.errCh <- msg:
default:
}
}
}

func TestClusteringDoNotReportSubCloseMissingSubjectOnReplay(t *testing.T) {
cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

maxInactivity := 250 * time.Millisecond

s1sOpts := getTestDefaultOptsForClustering("a", true)
s1sOpts.Clustering.TrailingLogs = 5
s1sOpts.MaxInactivity = maxInactivity
s1 := runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()

s2sOpts := getTestDefaultOptsForClustering("b", false)
s2sOpts.Clustering.TrailingLogs = 5
s2sOpts.MaxInactivity = maxInactivity
s2 := runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

s3sOpts := getTestDefaultOptsForClustering("c", false)
s3sOpts.Clustering.TrailingLogs = 5
s3sOpts.MaxInactivity = maxInactivity
s3 := runServerWithOpts(t, s3sOpts, nil)
defer s3.Shutdown()

servers := []*StanServer{s1, s2, s3}
getLeader(t, 10*time.Second, servers...)

sc := NewDefaultConnection(t)
defer sc.Close()

sub, err := sc.Subscribe("foo", func(_ *stan.Msg) {})
if err != nil {
t.Fatalf("Error on sub: %v", err)
}
if _, err := sc.Subscribe("bar", func(_ *stan.Msg) {}); err != nil {
t.Fatalf("Error on sub: %v", err)
}
for i := 0; i < 100; i++ {
sc.Publish("bar", []byte("msg"))
}

// Do snapshot on all servers
for _, s := range servers {
if err := s.raft.Snapshot().Error(); err != nil {
t.Fatalf("Error on snapshot: %v", err)
}
}
// Close sub on "foo", and wait for more than channel expiration
sub.Close()
time.Sleep(2 * maxInactivity)
s3.Shutdown()

// Set a logger that'll collect errors
l := &captureSubCloseErrLogger{errCh: make(chan string, 1)}
s3sOpts.CustomLogger = l
s3 = runServerWithOpts(t, s3sOpts, nil)
defer s3.Shutdown()

// Create a sub on a new channel. We will use that as a "marker"
if _, err := sc.Subscribe("baz", func(_ *stan.Msg) {}); err != nil {
// to know that s3 has processed the sub1.Close() request.
t.Fatalf("Error on sub: %v", err)
}
checkChannelsInAllServers(t, []string{"bar", "baz"}, 10*time.Second, s3)

select {
case e := <-l.errCh:
t.Fatalf("Got error: %s", e)
default:
// OK
}
}
85 changes: 42 additions & 43 deletions server/server.go
@@ -1,4 +1,4 @@
// Copyright 2016-2021 The NATS Authors
// Copyright 2016-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -170,6 +170,7 @@ var (
ErrDupDurable = errors.New("stan: duplicate durable registration")
ErrInvalidDurName = errors.New("stan: durable name of a durable queue subscriber can't contain the character ':'")
ErrUnknownClient = errors.New("stan: unknown clientID")
ErrUnknownChannel = errors.New("stan: unknown channel")
ErrNoChannel = errors.New("stan: no configured channel")
ErrClusteredRestart = errors.New("stan: cannot restart server in clustered mode if it was not previously clustered")
ErrChanDelInProgress = errors.New("stan: channel is being deleted")
Expand Down Expand Up @@ -4676,15 +4677,28 @@ func (s *StanServer) performUnsubOrCloseSubscription(m *nats.Msg, req *pb.Unsubs

s.barrier(func() {
var err error
c, sub := s.getChannelAndSubForSubCloseOrUnsub(req)
if c == nil {
s.log.Errorf("[Client:%s] %s request for unknown channel %s",
req.ClientID, getSubUnsubAction(isSubClose), req.Subject)
s.sendSubscriptionResponseErr(m.Reply, ErrUnknownChannel)
return
}
if sub == nil {
s.log.Errorf("[Client:%s] %s request for unknown inbox %s",
req.ClientID, getSubUnsubAction(isSubClose), req.Inbox)
s.sendSubscriptionResponseErr(m.Reply, ErrInvalidSub)
return
}
if s.isClustered {
op := spb.RaftOperation_RemoveSubscription
if isSubClose {
err = s.replicateCloseSubscription(req)
} else {
err = s.replicateRemoveSubscription(req)
op = spb.RaftOperation_CloseSubscription
}
err = s.replicateSubCloseOrUnsubscribe(req, op, sub)
} else {
s.closeMu.Lock()
err = s.unsubscribe(req, isSubClose)
err = s.unsubscribeSub(c, req.ClientID, sub, isSubClose, true)
s.closeMu.Unlock()
}
// If there was an error, it has been already logged.
Expand All @@ -4701,33 +4715,32 @@ func (s *StanServer) performUnsubOrCloseSubscription(m *nats.Msg, req *pb.Unsubs
})
}

func (s *StanServer) unsubscribe(req *pb.UnsubscribeRequest, isSubClose bool) error {
action := "unsub"
if isSubClose {
action = "sub close"
func getSubUnsubAction(close bool) string {
if close {
return "sub close"
}
return "unsub"
}

func (s *StanServer) getChannelAndSubForSubCloseOrUnsub(req *pb.UnsubscribeRequest) (*channel, *subState) {
c := s.channels.get(req.Subject)
if c == nil {
s.log.Errorf("[Client:%s] %s request missing subject %s",
req.ClientID, action, req.Subject)
return ErrInvalidSub
return nil, nil
}
sub := c.ss.LookupByAckInbox(req.Inbox)
if sub == nil {
sub = c.ss.LookupByInbox(req.Inbox)
}
if sub == nil {
s.log.Errorf("[Client:%s] %s request for missing inbox %s",
req.ClientID, action, req.Inbox)
return ErrInvalidSub
return c, nil
}
return s.unsubscribeSub(c, req.ClientID, action, sub, isSubClose, true)
return c, sub
}

func (s *StanServer) unsubscribeSub(c *channel, clientID, action string, sub *subState, isSubClose, shouldFlush bool) error {
func (s *StanServer) unsubscribeSub(c *channel, clientID string, sub *subState, isSubClose, shouldFlush bool) error {
// Remove from Client
if !s.clients.removeSub(clientID, sub) {
s.log.Errorf("[Client:%s] %s request for missing client", clientID, action)
s.log.Errorf("[Client:%s] %s request for unknown client", clientID, getSubUnsubAction(isSubClose))
return ErrUnknownClient
}
// Remove the subscription
Expand All @@ -4743,25 +4756,11 @@ func (s *StanServer) unsubscribeSub(c *channel, clientID, action string, sub *su
return err
}

func (s *StanServer) replicateRemoveSubscription(req *pb.UnsubscribeRequest) error {
return s.replicateUnsubscribe(req, spb.RaftOperation_RemoveSubscription)
}

func (s *StanServer) replicateCloseSubscription(req *pb.UnsubscribeRequest) error {
return s.replicateUnsubscribe(req, spb.RaftOperation_CloseSubscription)
}
func (s *StanServer) replicateSubCloseOrUnsubscribe(req *pb.UnsubscribeRequest,
opType spb.RaftOperation_Type, sub *subState) error {

func (s *StanServer) replicateUnsubscribe(req *pb.UnsubscribeRequest, opType spb.RaftOperation_Type) error {
// When closing a subscription, we need to possibly "flush" the
// pending sent/ack that need to be replicated
c := s.channels.get(req.Subject)
if c != nil {
sub := c.ss.LookupByAckInbox(req.Inbox)
if sub != nil {
unsub := opType == spb.RaftOperation_RemoveSubscription
s.endSubSentAndAckReplication(sub, unsub)
}
}
// Possibly flush sent/ack replication
s.endSubSentAndAckReplication(sub, opType == spb.RaftOperation_RemoveSubscription)
op := &spb.RaftOperation{
OpType: opType,
Unsub: req,
Expand Down Expand Up @@ -5315,17 +5314,17 @@ func (s *StanServer) closeDurableIfDuplicate(c *channel, sr *pb.SubscriptionRequ
if !duplicate {
return nil
}
creq := &pb.UnsubscribeRequest{
ClientID: sr.ClientID,
Subject: sr.Subject,
Inbox: ackInbox,
}
var err error
if s.isClustered {
err = s.replicateCloseSubscription(creq)
creq := &pb.UnsubscribeRequest{
ClientID: sr.ClientID,
Subject: sr.Subject,
Inbox: ackInbox,
}
err = s.replicateSubCloseOrUnsubscribe(creq, spb.RaftOperation_CloseSubscription, sub)
} else {
s.closeMu.Lock()
err = s.unsubscribe(creq, true)
err = s.unsubscribeSub(c, sr.ClientID, sub, true, true)
s.closeMu.Unlock()
}
return err
Expand Down
4 changes: 2 additions & 2 deletions server/snapshot.go
@@ -1,4 +1,4 @@
// Copyright 2017-2020 The NATS Authors
// Copyright 2017-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -297,7 +297,7 @@ func (r *raftFSM) Restore(snapshot io.ReadCloser) (retErr error) {
sub.RLock()
clientID := sub.ClientID
sub.RUnlock()
if err := s.unsubscribeSub(c, clientID, "unsub", sub, false, false); err != nil {
if err := s.unsubscribeSub(c, clientID, sub, false, false); err != nil {
return err
}
}
Expand Down

0 comments on commit 1fa94bb

Please sign in to comment.