Skip to content

Commit

Permalink
Merge pull request #1201 from nats-io/fix_redeliver_race
Browse files Browse the repository at this point in the history
[FIXED] Possible redelivery from node that lost leadership
  • Loading branch information
kozlovic committed Jul 15, 2021
2 parents 528588f + 3de4990 commit aa11d83
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 8 deletions.
88 changes: 88 additions & 0 deletions server/clustering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8186,3 +8186,91 @@ func TestClusteringDurableReplaced(t *testing.T) {
t.Fatalf("Expected 1 durable, got %v", lenDur)
}
}

func TestClusteringRaceCausesFollowerToRedeliverMsgs(t *testing.T) {
testRaceLeaderTransfer = true
defer func() { testRaceLeaderTransfer = false }()

cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

// For this test, use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure first server
s1sOpts := getTestDefaultOptsForClustering("a", true)
s1sOpts.ReplaceDurable = true
s1 := runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()

// Wait for it to bootstrap
getLeader(t, 10*time.Second, s1)

// Configure second server.
s2sOpts := getTestDefaultOptsForClustering("b", false)
s2sOpts.ReplaceDurable = true
s2 := runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

// Configure third server.
s3sOpts := getTestDefaultOptsForClustering("c", false)
s3sOpts.ReplaceDurable = true
s3 := runServerWithOpts(t, s3sOpts, nil)
defer s3.Shutdown()

sc := NewDefaultConnection(t)
defer sc.Close()

ch := make(chan bool, 10)
errCh := make(chan error, 1)
prev := uint32(0)
if _, err := sc.Subscribe("foo",
func(m *stan.Msg) {
if !m.Redelivered {
ch <- true
} else {
if m.RedeliveryCount != prev+1 {
select {
case errCh <- fmt.Errorf("Received duplicate redelivered msg: %+v", m):
default:
}
}
prev = m.RedeliveryCount
}
},
stan.AckWait(ackWaitInMs(500)),
stan.DeliverAllAvailable(),
stan.DurableName("dur"),
stan.SetManualAckMode()); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
if err := sc.Publish("foo", []byte("hello")); err != nil {
t.Fatalf("Error on publish: %v", err)
}

// Wait for message to be delivered
if err := WaitTime(ch, time.Second); err != nil {
t.Fatalf("Did not get message: %v", err)
}

// Stop the leader
s1.Shutdown()

// Wait for new leader
leader := getLeader(t, 10*time.Second, s2, s3)

// Stepdown
if err := leader.raft.LeadershipTransfer().Error(); err != nil {
t.Fatalf("Error stepping down: %v", err)
}

select {
case err := <-errCh:
t.Fatal(err)
case <-time.After(4 * time.Second):
// ok
}
}
19 changes: 11 additions & 8 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ var (
lazyReplicationInterval = defaultLazyReplicationInterval
testDeleteChannel bool
testSubSentAndAckSlowApply bool
testRaceLeaderTransfer bool
)

var (
Expand Down Expand Up @@ -2298,10 +2299,7 @@ func (s *StanServer) leadershipAcquired() error {
}
}
if len(allSubs) > 0 {
s.startGoRoutine(func() {
s.performRedeliveryOnStartup(allSubs)
s.wg.Done()
})
s.performRedeliveryOnStartup(allSubs)
}

if err := s.nc.Flush(); err != nil {
Expand Down Expand Up @@ -2661,6 +2659,9 @@ func (s *StanServer) postRecoveryProcessing(recoveredClients []*stores.Client, r
// Redelivers unacknowledged messages, releases the hold for new messages delivery,
// and kicks delivery of available messages.
func (s *StanServer) performRedeliveryOnStartup(recoveredSubs []*subState) {
if testRaceLeaderTransfer {
time.Sleep(time.Second)
}
queues := make(map[*queueState]*channel)

for _, sub := range recoveredSubs {
Expand All @@ -2674,6 +2675,9 @@ func (s *StanServer) performRedeliveryOnStartup(recoveredSubs []*subState) {
sub.Unlock()
continue
}
if sub.ackTimer == nil {
s.setupAckTimer(sub, sub.ackWait)
}
// Unlock in order to call function below
sub.Unlock()
// Send old messages (lock is acquired in that function)
Expand Down Expand Up @@ -3689,14 +3693,13 @@ func (s *StanServer) performAckExpirationRedelivery(sub *subState, isStartup boo
qs := sub.qstate
clientID := sub.ClientID
subID := sub.ID
if sub.ackTimer == nil {
s.setupAckTimer(sub, sub.ackWait)
}
if qs == nil {
// If the client has some failed heartbeats, ignore this request.
if sub.hasFailedHB {
// Reset the timer
sub.ackTimer.Reset(sub.ackWait)
if s.isStandaloneOrLeader() {
sub.ackTimer.Reset(sub.ackWait)
}
sub.Unlock()
if s.debug {
s.log.Debugf("[Client:%s] Skipping redelivery to subid=%d due to missed client heartbeat", clientID, subID)
Expand Down

0 comments on commit aa11d83

Please sign in to comment.