Skip to content

Commit

Permalink
[FIXED] R1 stream move would sometimes lose all msgs. (#4413)
Browse files Browse the repository at this point in the history
When moving streams, we could check too soon and be in a gap where the
replica peer has not registered a catchup request but had made contact
via the NRG layer.

This would cause us to think the replica was caught up, incorrectly, and
drop our leadership, which would cancel any catchup requests.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison authored Aug 22, 2023
2 parents 2fc3f45 + e5d208b commit 90f5371
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 15 deletions.
2 changes: 1 addition & 1 deletion server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3288,7 +3288,7 @@ func (o *consumer) checkAckFloor() {
}
}
} else if numPending > 0 {
// here it shorter to walk pending.
// here it is shorter to walk pending.
// toTerm is seq, dseq, rcd for each entry.
toTerm := make([]uint64, 0, numPending*3)
o.mu.RLock()
Expand Down
15 changes: 1 addition & 14 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2161,18 +2161,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps

startMigrationMonitoring := func() {
if mmt == nil {
mmt = time.NewTicker(10 * time.Millisecond)
mmtc = mmt.C
}
}

adjustMigrationMonitoring := func() {
const delay = 500 * time.Millisecond
if mmt == nil {
mmt = time.NewTicker(delay)
mmt = time.NewTicker(500 * time.Millisecond)
mmtc = mmt.C
} else {
mmt.Reset(delay)
}
}

Expand Down Expand Up @@ -2407,9 +2397,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
continue
}

// Adjust to our normal time delay.
adjustMigrationMonitoring()

// Make sure we have correct cluster information on the other peers.
ci := js.clusterInfo(rg)
mset.checkClusterInfo(ci)
Expand Down
63 changes: 63 additions & 0 deletions server/jetstream_super_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3953,3 +3953,66 @@ func TestJetStreamSuperClusterGWOfflineSatus(t *testing.T) {
return nil
})
}

func TestJetStreamSuperClusterMovingR1Stream(t *testing.T) {
// Make C2 have some latency.
gwm := gwProxyMap{
"C2": &gwProxy{
rtt: 10 * time.Millisecond,
up: 1 * 1024 * 1024 * 1024, // 1gbit
down: 1 * 1024 * 1024 * 1024, // 1gbit
},
}
sc := createJetStreamTaggedSuperClusterWithGWProxy(t, gwm)
defer sc.shutdown()

nc, js := jsClientConnect(t, sc.clusterForName("C1").randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
})
require_NoError(t, err)

toSend := 10_000
for i := 0; i < toSend; i++ {
_, err := js.PublishAsync("TEST", []byte("HELLO WORLD"))
require_NoError(t, err)
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}

// Have it move to GCP.
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "TEST",
Placement: &nats.Placement{Tags: []string{"cloud:gcp"}},
})
require_NoError(t, err)

checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
sc.waitOnStreamLeader(globalAccountName, "TEST")
si, err := js.StreamInfo("TEST")
if err != nil {
return err
}
if si.Cluster.Name != "C2" {
return fmt.Errorf("Wrong cluster: %q", si.Cluster.Name)
}
if si.Cluster.Leader == _EMPTY_ {
return fmt.Errorf("No leader yet")
} else if !strings.HasPrefix(si.Cluster.Leader, "C2") {
return fmt.Errorf("Wrong leader: %q", si.Cluster.Leader)
}
// Now we want to see that we shrink back to original.
if len(si.Cluster.Replicas) != 0 {
return fmt.Errorf("Expected 0 replicas, got %d", len(si.Cluster.Replicas))
}
if si.State.Msgs != uint64(toSend) {
return fmt.Errorf("Only see %d msgs", si.State.Msgs)
}
return nil
})
}

0 comments on commit 90f5371

Please sign in to comment.