diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index f273064d81..4d4dfc9d48 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1220,6 +1220,11 @@ func (js *jetStream) applyMetaSnapshot(buf []byte) error { // Now walk the ones to check and process consumers. var caAdd, caDel []*consumerAssignment for _, sa := range saChk { + // Make sure to add in all the new ones from sa. + for _, ca := range sa.consumers { + caAdd = append(caAdd, ca) + } + if osa := js.streamAssignment(sa.Client.serviceAccount(), sa.Config.Name); osa != nil { for _, ca := range osa.consumers { if sa.consumers[ca.Name] == nil { diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 891c96c906..dc737a4a3e 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -2369,3 +2369,92 @@ func TestJetStreamClusterMemLeaderRestart(t *testing.T) { t.Fatalf("expected a current leader after old leader restarted") } } + +// Customer reported R1 consumers that seemed to be ghosted after server restart. +func TestJetStreamClusterLostConsumers(t *testing.T) { + c := createJetStreamClusterExplicit(t, "GHOST", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"events.>"}, + Replicas: 3, + }) + require_NoError(t, err) + + for i := 0; i < 10; i++ { + for j := 0; j < 10; j++ { + _, err := js.Publish(fmt.Sprintf("events.%d.%d", i, j), []byte("test")) + require_NoError(t, err) + } + } + + s := c.randomServer() + s.Shutdown() + + c.waitOnLeader() + c.waitOnStreamLeader(globalAccountName, "TEST") + + nc, _ = jsClientConnect(t, c.randomServer()) + defer nc.Close() + + cc := CreateConsumerRequest{ + Stream: "TEST", + Config: ConsumerConfig{ + AckPolicy: AckExplicit, + }, + } + req, err := json.Marshal(cc) + require_NoError(t, err) + + reqSubj := fmt.Sprintf(JSApiConsumerCreateT, "TEST") + + // Now create 50 consumers. We do not wait for the answer. + for i := 0; i < 50; i++ { + nc.Publish(reqSubj, req) + } + nc.Flush() + + // Grab the meta leader. + ml := c.leader() + require_NoError(t, ml.JetStreamSnapshotMeta()) + + numConsumerAssignments := func(s *Server) int { + t.Helper() + js := s.getJetStream() + js.mu.RLock() + defer js.mu.RUnlock() + cc := js.cluster + for _, asa := range cc.streams { + for _, sa := range asa { + return len(sa.consumers) + } + } + return 0 + } + + checkFor(t, time.Second, 100*time.Millisecond, func() error { + num := numConsumerAssignments(ml) + if num == 50 { + return nil + } + return fmt.Errorf("Consumers is only %d", num) + }) + + // Restart the server we shutdown. We snapshotted to the snapshot + // has to fill in the new consumers. + // The bug would fail to add them to the meta state since the stream + // existed. + s = c.restartServer(s) + + checkFor(t, time.Second, 100*time.Millisecond, func() error { + num := numConsumerAssignments(s) + if num == 50 { + return nil + } + return fmt.Errorf("Consumers is only %d", num) + }) +}