Skip to content

Commit

Permalink
Merge pull request #1224 from nats-io/cluster_queue_rdlv
Browse files Browse the repository at this point in the history
[FIXED] Clustering: possible wrong pending_count and is_stalled on followers
  • Loading branch information
kozlovic committed Nov 17, 2021
2 parents 4f5a913 + 78d6066 commit 0888c7d
Show file tree
Hide file tree
Showing 9 changed files with 131 additions and 18 deletions.
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -12,7 +12,7 @@ require (
github.com/nats-io/nats-server/v2 v2.6.4
github.com/nats-io/nats.go v1.13.1-0.20211018182449-f2416a8b1483
github.com/nats-io/nuid v1.0.1
github.com/nats-io/stan.go v0.10.1
github.com/nats-io/stan.go v0.10.2
github.com/prometheus/procfs v0.7.3
go.etcd.io/bbolt v1.3.6
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -69,8 +69,8 @@ github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/nats-io/stan.go v0.10.1 h1:a0PPS12JHAC2xA2OaQ3aB9/wi49MnvxBTstgMlgwoFU=
github.com/nats-io/stan.go v0.10.1/go.mod h1:vo2ax8K2IxaR3JtEMLZRFKIdoK/3o1/PKueapB7ezX0=
github.com/nats-io/stan.go v0.10.2 h1:gQLd05LhzmhFkHm3/qP/klYHfM/hys45GyHa1Uly/kI=
github.com/nats-io/stan.go v0.10.2/go.mod h1:vo2ax8K2IxaR3JtEMLZRFKIdoK/3o1/PKueapB7ezX0=
github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
82 changes: 82 additions & 0 deletions server/clustering_test.go
Expand Up @@ -8596,6 +8596,88 @@ func TestClusteringQueueRedelivery(t *testing.T) {
waitForAcks(t, s2, clientName, 2, 0)
}

func TestClusteringQueueRedeliveryPendingAndStalled(t *testing.T) {
cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

// For this test, use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure first server
s1sOpts := getTestDefaultOptsForClustering("a", true)
s1 := runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()

// Configure second server.
s2sOpts := getTestDefaultOptsForClustering("b", false)
s2 := runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

servers := []*StanServer{s1, s2}
getLeader(t, 10*time.Second, servers...)

sc := NewDefaultConnection(t)
defer sc.Close()

ch := make(chan bool, 1)
if _, err := sc.QueueSubscribe("foo", "bar", func(m *stan.Msg) {
if m.Redelivered {
m.Ack()
return
}
// Wait for more than AckWait, then ack
time.Sleep(150 * time.Millisecond)
m.Ack()
ch <- true
}, stan.SetManualAckMode(), stan.AckWait(ackWaitInMs(100)), stan.MaxInflight(3)); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
// Create second queue member that does not ack.
if _, err := sc.QueueSubscribe("foo", "bar", func(m *stan.Msg) {},
stan.SetManualAckMode(), stan.AckWait(ackWaitInMs(500)), stan.MaxInflight(3)); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

for count := 0; count < 5; {
if err := sc.Publish("foo", []byte("msg")); err != nil {
t.Fatalf("Error on publish: %v", err)
}
select {
case <-ch:
count++
case <-time.After(time.Second):
// Try another message
}
}

// Make sure that state is replicated
time.Sleep(testLazyReplicationInterval * 2)

// Ensure that the pending map and stalled are 0 and false
// on all servers for all subs.
waitFor(t, 2*time.Second, 50*time.Millisecond, func() error {
for _, s := range servers {
subs := s.clients.getSubs(clientName)
for _, sub := range subs {
var err error
sub.RLock()
if len(sub.acksPending) != 0 || sub.stalled {
err = fmt.Errorf("Invalid values: node=%s - acksPending=%v - stalled=%v",
s.opts.Clustering.NodeID, sub.acksPending, sub.stalled)
}
sub.RUnlock()
if err != nil {
return err
}
}
}
return nil
})
}

func TestClusteringQueueRedeliverySentAndAck(t *testing.T) {
// Set this to something very large so we can manually cause the flush.
lazyReplicationInterval = time.Hour
Expand Down
5 changes: 4 additions & 1 deletion server/server.go
Expand Up @@ -47,7 +47,7 @@ import (
// Server defaults.
const (
// VERSION is the current version for the NATS Streaming server.
VERSION = "0.23.1"
VERSION = "0.23.2-beta01"

DefaultClusterID = "test-cluster"
DefaultDiscoverPrefix = "_STAN.discover"
Expand Down Expand Up @@ -5495,6 +5495,9 @@ func (s *StanServer) processAck(c *channel, sub *subState, sequence uint64, from
qsub.Lock()
_, found := qsub.acksPending[sequence]
if found {
if s.isClustered {
s.collectSentOrAck(qsub, replicateAck, sequence)
}
delete(qsub.acksPending, sequence)
persistAck(qsub)
}
Expand Down
2 changes: 1 addition & 1 deletion vendor/github.com/nats-io/stan.go/README.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/github.com/nats-io/stan.go/go_tests.mod

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 7 additions & 9 deletions vendor/github.com/nats-io/stan.go/go_tests.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 32 additions & 2 deletions vendor/github.com/nats-io/stan.go/stan.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/modules.txt
Expand Up @@ -57,7 +57,7 @@ github.com/nats-io/nkeys
# github.com/nats-io/nuid v1.0.1
## explicit
github.com/nats-io/nuid
# github.com/nats-io/stan.go v0.10.1
# github.com/nats-io/stan.go v0.10.2
## explicit
github.com/nats-io/stan.go
github.com/nats-io/stan.go/pb
Expand Down

0 comments on commit 0888c7d

Please sign in to comment.