diff --git a/server/consumer.go b/server/consumer.go index 8cc502e95ce..d9fa2301708 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -3288,7 +3288,7 @@ func (o *consumer) checkAckFloor() { } } } else if numPending > 0 { - // here it shorter to walk pending. + // here it is shorter to walk pending. // toTerm is seq, dseq, rcd for each entry. toTerm := make([]uint64, 0, numPending*3) o.mu.RLock() diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 1734f096242..52d498bd037 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2161,18 +2161,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps startMigrationMonitoring := func() { if mmt == nil { - mmt = time.NewTicker(10 * time.Millisecond) - mmtc = mmt.C - } - } - - adjustMigrationMonitoring := func() { - const delay = 500 * time.Millisecond - if mmt == nil { - mmt = time.NewTicker(delay) + mmt = time.NewTicker(500 * time.Millisecond) mmtc = mmt.C - } else { - mmt.Reset(delay) } } @@ -2407,9 +2397,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps continue } - // Adjust to our normal time delay. - adjustMigrationMonitoring() - // Make sure we have correct cluster information on the other peers. ci := js.clusterInfo(rg) mset.checkClusterInfo(ci) diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index 8abbe17c409..d217f84c2b1 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -3953,3 +3953,66 @@ func TestJetStreamSuperClusterGWOfflineSatus(t *testing.T) { return nil }) } + +func TestJetStreamSuperClusterMovingR1Stream(t *testing.T) { + // Make C2 have some latency. + gwm := gwProxyMap{ + "C2": &gwProxy{ + rtt: 10 * time.Millisecond, + up: 1 * 1024 * 1024 * 1024, // 1gbit + down: 1 * 1024 * 1024 * 1024, // 1gbit + }, + } + sc := createJetStreamTaggedSuperClusterWithGWProxy(t, gwm) + defer sc.shutdown() + + nc, js := jsClientConnect(t, sc.clusterForName("C1").randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + }) + require_NoError(t, err) + + toSend := 10_000 + for i := 0; i < toSend; i++ { + _, err := js.PublishAsync("TEST", []byte("HELLO WORLD")) + require_NoError(t, err) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + // Have it move to GCP. + _, err = js.UpdateStream(&nats.StreamConfig{ + Name: "TEST", + Placement: &nats.Placement{Tags: []string{"cloud:gcp"}}, + }) + require_NoError(t, err) + + checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + sc.waitOnStreamLeader(globalAccountName, "TEST") + si, err := js.StreamInfo("TEST") + if err != nil { + return err + } + if si.Cluster.Name != "C2" { + return fmt.Errorf("Wrong cluster: %q", si.Cluster.Name) + } + if si.Cluster.Leader == _EMPTY_ { + return fmt.Errorf("No leader yet") + } else if !strings.HasPrefix(si.Cluster.Leader, "C2") { + return fmt.Errorf("Wrong leader: %q", si.Cluster.Leader) + } + // Now we want to see that we shrink back to original. + if len(si.Cluster.Replicas) != 0 { + return fmt.Errorf("Expected 0 replicas, got %d", len(si.Cluster.Replicas)) + } + if si.State.Msgs != uint64(toSend) { + return fmt.Errorf("Only see %d msgs", si.State.Msgs) + } + return nil + }) +}