Skip to content

Commit

Permalink
Fix for a bug that could leave a raft node running when stopping a st…
Browse files Browse the repository at this point in the history
…ream.

This can happen when we reset a stream internally and the stream had a prior snapshot.

Also make sure to always release resources back to the account regardless if the store is no longer present.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed May 1, 2023
1 parent 1eed0e8 commit f5ac5a4
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 12 deletions.
34 changes: 34 additions & 0 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3849,3 +3849,37 @@ func TestJetStreamClusterHealthzCheckForStoppedAssets(t *testing.T) {
return nil
})
}

// Make sure that stopping a stream shutdowns down it's raft node.
func TestJetStreamClusterStreamNodeShutdownBugOnStop(t *testing.T) {
c := createJetStreamClusterExplicit(t, "NATS", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
Replicas: 3,
})
require_NoError(t, err)

for i := 0; i < 100; i++ {
sendStreamMsg(t, nc, "foo", "HELLO")
}

s := c.randomServer()
numNodesStart := s.numRaftNodes()
mset, err := s.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
node := mset.raftNode()
require_NotNil(t, node)
node.InstallSnapshot(mset.stateSnapshot())
// Stop the stream
mset.stop(false, false)

if numNodes := s.numRaftNodes(); numNodes != numNodesStart-1 {
t.Fatalf("RAFT nodes after stream stop incorrect: %d vs %d", numNodesStart, numNodes)
}
}
26 changes: 14 additions & 12 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4548,9 +4548,11 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
if deleteFlag {
n.Delete()
sa = mset.sa
} else if n.NeedSnapshot() {
// Attempt snapshot on clean exit.
n.InstallSnapshot(mset.stateSnapshotLocked())
} else {
if n.NeedSnapshot() {
// Attempt snapshot on clean exit.
n.InstallSnapshot(mset.stateSnapshotLocked())
}
n.Stop()
}
}
Expand Down Expand Up @@ -4642,23 +4644,23 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
sysc.closeConnection(ClientClosed)
}

if store == nil {
return nil
}

if deleteFlag {
if err := store.Delete(); err != nil {
return err
if store != nil {
if err := store.Delete(); err != nil {
return err
}
}
// Release any resources.
js.releaseStreamResources(&mset.cfg)

// cleanup directories after the stream
accDir := filepath.Join(js.config.StoreDir, accName)
// no op if not empty
os.Remove(filepath.Join(accDir, streamsDir))
os.Remove(accDir)
} else if err := store.Stop(); err != nil {
return err
} else if store != nil {
if err := store.Stop(); err != nil {
return err
}
}

return nil
Expand Down

0 comments on commit f5ac5a4

Please sign in to comment.