From 77bb7ea41e0915fb4cac8af00d8358a6e3857a59 Mon Sep 17 00:00:00 2001 From: Jay Chung Date: Mon, 17 Jun 2024 17:25:50 +0800 Subject: [PATCH 1/4] refactor(raft): remove unused code Signed-off-by: Jay Chung --- server/raft.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/server/raft.go b/server/raft.go index 16369981905..dd487baa3ac 100644 --- a/server/raft.go +++ b/server/raft.go @@ -1029,9 +1029,6 @@ func (n *raft) InstallSnapshot(data []byte) error { return errCatchupsRunning } - var state StreamState - n.wal.FastState(&state) - if n.applied == 0 { return errNoSnapAvailable } From 1d85de606a61f83c744047c2930c25ae39a85003 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sun, 16 Jun 2024 18:25:04 -0700 Subject: [PATCH 2/4] Remove statszRateLimit by setting to 0 for tests, which emulates prior behavior. Signed-off-by: Derek Collison --- server/events.go | 2 +- server/jetstream_helpers_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/events.go b/server/events.go index 67a4c527b6c..3ab75c56d7d 100644 --- a/server/events.go +++ b/server/events.go @@ -959,7 +959,7 @@ func (s *Server) sendStatsz(subj string) { s.sendInternalMsg(subj, _EMPTY_, &m.Server, &m) } -// Limit updates to the heartbeat interval, max one second. +// Limit updates to the heartbeat interval, max one second by default. func (s *Server) limitStatsz(subj string) bool { s.mu.Lock() defer s.mu.Unlock() diff --git a/server/jetstream_helpers_test.go b/server/jetstream_helpers_test.go index 3d63b0b5e37..da2814e1452 100644 --- a/server/jetstream_helpers_test.go +++ b/server/jetstream_helpers_test.go @@ -44,7 +44,7 @@ func init() { lostQuorumCheck = 4 * hbInterval // For statz and jetstream placement speedups as well. - statszRateLimit = time.Millisecond * 100 + statszRateLimit = 0 } // Used to setup clusters of clusters for tests. From 6d12e6cf7a88de40b9ec5cb086e2f005ed345152 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 17 Jun 2024 20:43:09 -0700 Subject: [PATCH 3/4] Allow for audit exception for `$JS.EVENT.>` and `$SYS.ACCOUNT.>` which may already exist. (#5556) Signed-off-by: Derek Collison --------- Signed-off-by: Derek Collison --- server/jetstream_test.go | 84 +++++++++++++++++++++++++++++++++++++--- server/stream.go | 21 ++++++++-- 2 files changed, 96 insertions(+), 9 deletions(-) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index dcf910dd639..d02ff5fdf7c 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -969,7 +969,7 @@ func TestJetStreamAddStreamOverlappingSubjects(t *testing.T) { // Test that any overlapping subjects will fail. expectErr(acc.addStream(&StreamConfig{Name: "foo"})) expectErr(acc.addStream(&StreamConfig{Name: "a", Subjects: []string{"baz", "bar"}})) - expectErr(acc.addStream(&StreamConfig{Name: "b", Subjects: []string{">"}})) + expectErr(acc.addStream(&StreamConfig{Name: "b", Subjects: []string{">"}, NoAck: true})) expectErr(acc.addStream(&StreamConfig{Name: "c", Subjects: []string{"baz.33"}})) expectErr(acc.addStream(&StreamConfig{Name: "d", Subjects: []string{"*.33"}})) expectErr(acc.addStream(&StreamConfig{Name: "e", Subjects: []string{"*.>"}})) @@ -984,7 +984,7 @@ func TestJetStreamAddStreamOverlapWithJSAPISubjects(t *testing.T) { expectErr := func(_ *stream, err error) { t.Helper() - if err == nil || !strings.Contains(err.Error(), "subjects overlap") { + if err == nil || !strings.Contains(err.Error(), "subjects that overlap with jetstream api") { t.Fatalf("Expected error but got none") } } @@ -22616,25 +22616,40 @@ func TestJetStreamAuditStreams(t *testing.T) { nc, js := jsClientConnect(t, s) defer nc.Close() + jsOverlap := errors.New("subjects that overlap with jetstream api require no-ack to be true") + sysOverlap := errors.New("subjects that overlap with system api require no-ack to be true") + _, err := js.AddStream(&nats.StreamConfig{ Name: "TEST", Subjects: []string{"$JS.>"}, }) - require_Error(t, err, NewJSStreamInvalidConfigError(fmt.Errorf("subjects overlap with jetstream api"))) + require_Error(t, err, NewJSStreamInvalidConfigError(jsOverlap)) + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"$JS.API.>"}, + }) + require_Error(t, err, NewJSStreamInvalidConfigError(jsOverlap)) _, err = js.AddStream(&nats.StreamConfig{ Name: "TEST", Subjects: []string{"$JSC.>"}, }) - require_Error(t, err, NewJSStreamInvalidConfigError(fmt.Errorf("subjects overlap with jetstream api"))) + require_Error(t, err, NewJSStreamInvalidConfigError(jsOverlap)) _, err = js.AddStream(&nats.StreamConfig{ Name: "TEST", Subjects: []string{"$SYS.>"}, }) - require_Error(t, err, NewJSStreamInvalidConfigError(fmt.Errorf("subjects overlap with system api"))) + require_Error(t, err, NewJSStreamInvalidConfigError(sysOverlap)) - // These should be ok if no pub ack. + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{">"}, + }) + require_Error(t, err, NewJSStreamInvalidConfigError(errors.New("capturing all subjects requires no-ack to be true"))) + + // These should all be ok if no pub ack. _, err = js.AddStream(&nats.StreamConfig{ Name: "TEST1", Subjects: []string{"$JS.>"}, @@ -22655,4 +22670,61 @@ func TestJetStreamAuditStreams(t *testing.T) { NoAck: true, }) require_NoError(t, err) + + // Since prior behavior did allow $JS.EVENT to be captured without no-ack, these might break + // on a server upgrade so make sure they still work ok without --no-ack. + + // To avoid overlap error. + err = js.DeleteStream("TEST1") + require_NoError(t, err) + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST4", + Subjects: []string{"$JS.EVENT.>"}, + }) + require_NoError(t, err) + + // Also allow $SYS.ACCOUNT to be captured without no-ack, these also might break + // on a server upgrade so make sure they still work ok without --no-ack. + + // To avoid overlap error. + err = js.DeleteStream("TEST3") + require_NoError(t, err) + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST5", + Subjects: []string{"$SYS.ACCOUNT.>"}, + }) + require_NoError(t, err) + + // We will test handling of ">" on a cluster here. + // Specific test for capturing everything which will require both no-ack and replicas of 1. + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js = jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{">"}, + }) + require_Error(t, err, NewJSStreamInvalidConfigError(errors.New("capturing all subjects requires no-ack to be true"))) + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{">"}, + Replicas: 3, + NoAck: true, + }) + require_Error(t, err, NewJSStreamInvalidConfigError(errors.New("capturing all subjects requires replicas of 1"))) + + // Ths should work ok. + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{">"}, + Replicas: 1, + NoAck: true, + }) + require_NoError(t, err) } diff --git a/server/stream.go b/server/stream.go index 2e05ee0b369..716409bb1f9 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1486,7 +1486,7 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi } // Check for literal duplication of subject interest in config - // and no overlap with any JS API subject space + // and no overlap with any JS or SYS API subject space. dset := make(map[string]struct{}, len(cfg.Subjects)) for _, subj := range cfg.Subjects { // Make sure the subject is valid. Check this first. @@ -1496,13 +1496,28 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi if _, ok := dset[subj]; ok { return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("duplicate subjects detected")) } + // Check for trying to capture everything. + if subj == fwcs { + if !cfg.NoAck { + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("capturing all subjects requires no-ack to be true")) + } + // Capturing everything also will require R1. + if cfg.Replicas != 1 { + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("capturing all subjects requires replicas of 1")) + } + } // Also check to make sure we do not overlap with our $JS API subjects. if !cfg.NoAck && (subjectIsSubsetMatch(subj, "$JS.>") || subjectIsSubsetMatch(subj, "$JSC.>")) { - return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("subjects overlap with jetstream api")) + // We allow an exception for $JS.EVENT.> since these could have been created in the past. + if !subjectIsSubsetMatch(subj, "$JS.EVENT.>") { + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("subjects that overlap with jetstream api require no-ack to be true")) + } } // And the $SYS subjects. if !cfg.NoAck && subjectIsSubsetMatch(subj, "$SYS.>") { - return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("subjects overlap with system api")) + if !subjectIsSubsetMatch(subj, "$SYS.ACCOUNT.>") { + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("subjects that overlap with system api require no-ack to be true")) + } } // Mark for duplicate check. dset[subj] = struct{}{} From a526e6a705db9b8b61ee50fa6227a1022ccf4dc2 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 18 Jun 2024 17:04:23 +0800 Subject: [PATCH 4/4] When installing new snapshot remove last inline if present. Previously we were cleaning up all old ones and doing it in a go routine. However this could cause multiple go routines to race and delete the wrong snapshot leaving none available. Signed-off-by: Derek Collison --- server/raft.go | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/server/raft.go b/server/raft.go index dd487baa3ac..2e72f71d431 100644 --- a/server/raft.go +++ b/server/raft.go @@ -1074,23 +1074,16 @@ func (n *raft) installSnapshot(snap *snapshot) error { return err } + // Delete our previous snapshot file if it exists. + if n.snapfile != _EMPTY_ && n.snapfile != sfile { + os.Remove(n.snapfile) + } // Remember our latest snapshot file. n.snapfile = sfile if _, err := n.wal.Compact(snap.lastIndex + 1); err != nil { n.setWriteErrLocked(err) return err } - // Remove any old snapshots. - // Do this in a go routine. - go func() { - psnaps, _ := os.ReadDir(snapDir) - for _, fi := range psnaps { - pn := fi.Name() - if pn != sn { - os.Remove(filepath.Join(snapDir, pn)) - } - } - }() return nil }