Skip to content

Commit

Permalink
Merge branch 'main' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
derekcollison committed Jan 5, 2023
2 parents 2520468 + b9d10ad commit 55352f2
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 0 deletions.
5 changes: 5 additions & 0 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1220,6 +1220,11 @@ func (js *jetStream) applyMetaSnapshot(buf []byte) error {
// Now walk the ones to check and process consumers.
var caAdd, caDel []*consumerAssignment
for _, sa := range saChk {
// Make sure to add in all the new ones from sa.
for _, ca := range sa.consumers {
caAdd = append(caAdd, ca)
}

if osa := js.streamAssignment(sa.Client.serviceAccount(), sa.Config.Name); osa != nil {
for _, ca := range osa.consumers {
if sa.consumers[ca.Name] == nil {
Expand Down
89 changes: 89 additions & 0 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2369,3 +2369,92 @@ func TestJetStreamClusterMemLeaderRestart(t *testing.T) {
t.Fatalf("expected a current leader after old leader restarted")
}
}

// Customer reported R1 consumers that seemed to be ghosted after server restart.
func TestJetStreamClusterLostConsumers(t *testing.T) {
c := createJetStreamClusterExplicit(t, "GHOST", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"events.>"},
Replicas: 3,
})
require_NoError(t, err)

for i := 0; i < 10; i++ {
for j := 0; j < 10; j++ {
_, err := js.Publish(fmt.Sprintf("events.%d.%d", i, j), []byte("test"))
require_NoError(t, err)
}
}

s := c.randomServer()
s.Shutdown()

c.waitOnLeader()
c.waitOnStreamLeader(globalAccountName, "TEST")

nc, _ = jsClientConnect(t, c.randomServer())
defer nc.Close()

cc := CreateConsumerRequest{
Stream: "TEST",
Config: ConsumerConfig{
AckPolicy: AckExplicit,
},
}
req, err := json.Marshal(cc)
require_NoError(t, err)

reqSubj := fmt.Sprintf(JSApiConsumerCreateT, "TEST")

// Now create 50 consumers. We do not wait for the answer.
for i := 0; i < 50; i++ {
nc.Publish(reqSubj, req)
}
nc.Flush()

// Grab the meta leader.
ml := c.leader()
require_NoError(t, ml.JetStreamSnapshotMeta())

numConsumerAssignments := func(s *Server) int {
t.Helper()
js := s.getJetStream()
js.mu.RLock()
defer js.mu.RUnlock()
cc := js.cluster
for _, asa := range cc.streams {
for _, sa := range asa {
return len(sa.consumers)
}
}
return 0
}

checkFor(t, time.Second, 100*time.Millisecond, func() error {
num := numConsumerAssignments(ml)
if num == 50 {
return nil
}
return fmt.Errorf("Consumers is only %d", num)
})

// Restart the server we shutdown. We snapshotted to the snapshot
// has to fill in the new consumers.
// The bug would fail to add them to the meta state since the stream
// existed.
s = c.restartServer(s)

checkFor(t, time.Second, 100*time.Millisecond, func() error {
num := numConsumerAssignments(s)
if num == 50 {
return nil
}
return fmt.Errorf("Consumers is only %d", num)
})
}

0 comments on commit 55352f2

Please sign in to comment.