Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-picks for 2.10.17-RC.5 #5560

Merged
merged 4 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -959,7 +959,7 @@ func (s *Server) sendStatsz(subj string) {
s.sendInternalMsg(subj, _EMPTY_, &m.Server, &m)
}

// Limit updates to the heartbeat interval, max one second.
// Limit updates to the heartbeat interval, max one second by default.
func (s *Server) limitStatsz(subj string) bool {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func init() {
lostQuorumCheck = 4 * hbInterval

// For statz and jetstream placement speedups as well.
statszRateLimit = time.Millisecond * 100
statszRateLimit = 0
}

// Used to setup clusters of clusters for tests.
Expand Down
84 changes: 78 additions & 6 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -969,7 +969,7 @@ func TestJetStreamAddStreamOverlappingSubjects(t *testing.T) {
// Test that any overlapping subjects will fail.
expectErr(acc.addStream(&StreamConfig{Name: "foo"}))
expectErr(acc.addStream(&StreamConfig{Name: "a", Subjects: []string{"baz", "bar"}}))
expectErr(acc.addStream(&StreamConfig{Name: "b", Subjects: []string{">"}}))
expectErr(acc.addStream(&StreamConfig{Name: "b", Subjects: []string{">"}, NoAck: true}))
expectErr(acc.addStream(&StreamConfig{Name: "c", Subjects: []string{"baz.33"}}))
expectErr(acc.addStream(&StreamConfig{Name: "d", Subjects: []string{"*.33"}}))
expectErr(acc.addStream(&StreamConfig{Name: "e", Subjects: []string{"*.>"}}))
Expand All @@ -984,7 +984,7 @@ func TestJetStreamAddStreamOverlapWithJSAPISubjects(t *testing.T) {

expectErr := func(_ *stream, err error) {
t.Helper()
if err == nil || !strings.Contains(err.Error(), "subjects overlap") {
if err == nil || !strings.Contains(err.Error(), "subjects that overlap with jetstream api") {
t.Fatalf("Expected error but got none")
}
}
Expand Down Expand Up @@ -22616,25 +22616,40 @@ func TestJetStreamAuditStreams(t *testing.T) {
nc, js := jsClientConnect(t, s)
defer nc.Close()

jsOverlap := errors.New("subjects that overlap with jetstream api require no-ack to be true")
sysOverlap := errors.New("subjects that overlap with system api require no-ack to be true")

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"$JS.>"},
})
require_Error(t, err, NewJSStreamInvalidConfigError(fmt.Errorf("subjects overlap with jetstream api")))
require_Error(t, err, NewJSStreamInvalidConfigError(jsOverlap))

_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"$JS.API.>"},
})
require_Error(t, err, NewJSStreamInvalidConfigError(jsOverlap))

_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"$JSC.>"},
})
require_Error(t, err, NewJSStreamInvalidConfigError(fmt.Errorf("subjects overlap with jetstream api")))
require_Error(t, err, NewJSStreamInvalidConfigError(jsOverlap))

_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"$SYS.>"},
})
require_Error(t, err, NewJSStreamInvalidConfigError(fmt.Errorf("subjects overlap with system api")))
require_Error(t, err, NewJSStreamInvalidConfigError(sysOverlap))

// These should be ok if no pub ack.
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{">"},
})
require_Error(t, err, NewJSStreamInvalidConfigError(errors.New("capturing all subjects requires no-ack to be true")))

// These should all be ok if no pub ack.
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST1",
Subjects: []string{"$JS.>"},
Expand All @@ -22655,4 +22670,61 @@ func TestJetStreamAuditStreams(t *testing.T) {
NoAck: true,
})
require_NoError(t, err)

// Since prior behavior did allow $JS.EVENT to be captured without no-ack, these might break
// on a server upgrade so make sure they still work ok without --no-ack.

// To avoid overlap error.
err = js.DeleteStream("TEST1")
require_NoError(t, err)

_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST4",
Subjects: []string{"$JS.EVENT.>"},
})
require_NoError(t, err)

// Also allow $SYS.ACCOUNT to be captured without no-ack, these also might break
// on a server upgrade so make sure they still work ok without --no-ack.

// To avoid overlap error.
err = js.DeleteStream("TEST3")
require_NoError(t, err)

_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST5",
Subjects: []string{"$SYS.ACCOUNT.>"},
})
require_NoError(t, err)

// We will test handling of ">" on a cluster here.
// Specific test for capturing everything which will require both no-ack and replicas of 1.
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js = jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{">"},
})
require_Error(t, err, NewJSStreamInvalidConfigError(errors.New("capturing all subjects requires no-ack to be true")))

_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{">"},
Replicas: 3,
NoAck: true,
})
require_Error(t, err, NewJSStreamInvalidConfigError(errors.New("capturing all subjects requires replicas of 1")))

// Ths should work ok.
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{">"},
Replicas: 1,
NoAck: true,
})
require_NoError(t, err)
}
18 changes: 4 additions & 14 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1029,9 +1029,6 @@ func (n *raft) InstallSnapshot(data []byte) error {
return errCatchupsRunning
}

var state StreamState
n.wal.FastState(&state)

if n.applied == 0 {
return errNoSnapAvailable
}
Expand Down Expand Up @@ -1077,23 +1074,16 @@ func (n *raft) installSnapshot(snap *snapshot) error {
return err
}

// Delete our previous snapshot file if it exists.
if n.snapfile != _EMPTY_ && n.snapfile != sfile {
os.Remove(n.snapfile)
}
// Remember our latest snapshot file.
n.snapfile = sfile
if _, err := n.wal.Compact(snap.lastIndex + 1); err != nil {
n.setWriteErrLocked(err)
return err
}
// Remove any old snapshots.
// Do this in a go routine.
go func() {
psnaps, _ := os.ReadDir(snapDir)
for _, fi := range psnaps {
pn := fi.Name()
if pn != sn {
os.Remove(filepath.Join(snapDir, pn))
}
}
}()

return nil
}
Expand Down
21 changes: 18 additions & 3 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1486,7 +1486,7 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
}

// Check for literal duplication of subject interest in config
// and no overlap with any JS API subject space
// and no overlap with any JS or SYS API subject space.
dset := make(map[string]struct{}, len(cfg.Subjects))
for _, subj := range cfg.Subjects {
// Make sure the subject is valid. Check this first.
Expand All @@ -1496,13 +1496,28 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
if _, ok := dset[subj]; ok {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("duplicate subjects detected"))
}
// Check for trying to capture everything.
if subj == fwcs {
if !cfg.NoAck {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("capturing all subjects requires no-ack to be true"))
}
// Capturing everything also will require R1.
if cfg.Replicas != 1 {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("capturing all subjects requires replicas of 1"))
}
}
// Also check to make sure we do not overlap with our $JS API subjects.
if !cfg.NoAck && (subjectIsSubsetMatch(subj, "$JS.>") || subjectIsSubsetMatch(subj, "$JSC.>")) {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("subjects overlap with jetstream api"))
// We allow an exception for $JS.EVENT.> since these could have been created in the past.
if !subjectIsSubsetMatch(subj, "$JS.EVENT.>") {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("subjects that overlap with jetstream api require no-ack to be true"))
}
}
// And the $SYS subjects.
if !cfg.NoAck && subjectIsSubsetMatch(subj, "$SYS.>") {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("subjects overlap with system api"))
if !subjectIsSubsetMatch(subj, "$SYS.ACCOUNT.>") {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("subjects that overlap with system api require no-ack to be true"))
}
}
// Mark for duplicate check.
dset[subj] = struct{}{}
Expand Down