Skip to content

Commit

Permalink
Merge pull request #3539 from nats-io/js_handle_bad_replicas_count
Browse files Browse the repository at this point in the history
[FIXED] JetStream: return error on negative replicas count
  • Loading branch information
kozlovic committed Oct 10, 2022
2 parents cf71adb + 3c7aa55 commit e42260c
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 0 deletions.
4 changes: 4 additions & 0 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,10 @@ func checkConsumerCfg(
if config.Replicas > 0 && config.Replicas > cfg.Replicas {
return NewJSConsumerReplicasExceedsStreamError()
}
// Check that it is not negative
if config.Replicas < 0 {
return NewJSReplicasCountCannotBeNegativeError()
}

// Check if we have a BackOff defined that MaxDeliver is within range etc.
if lbo := len(config.BackOff); lbo > 0 && config.MaxDeliver <= lbo {
Expand Down
10 changes: 10 additions & 0 deletions server/errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -1308,5 +1308,15 @@
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSReplicasCountCannotBeNegative",
"code": 400,
"error_code": 10133,
"description": "replicas count cannot be negative",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
}
]
59 changes: 59 additions & 0 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,3 +460,62 @@ func TestJetStreamClusterDeleteConsumerWhileServerDown(t *testing.T) {
t.Fatalf("Expected to not find consumer, but did")
}
}

func TestJetStreamClusterNegativeReplicas(t *testing.T) {
s := RunBasicJetStreamServer()
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer s.Shutdown()

c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

testBadReplicas := func(t *testing.T, s *Server, name string) {
nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: name,
Replicas: -1,
})
require_Error(t, err, NewJSReplicasCountCannotBeNegativeError())

_, err = js.AddStream(&nats.StreamConfig{
Name: name,
Replicas: 1,
})
require_NoError(t, err)

// Check upadte now.
_, err = js.UpdateStream(&nats.StreamConfig{
Name: name,
Replicas: -11,
})
require_Error(t, err, NewJSReplicasCountCannotBeNegativeError())

// Now same for consumers
durName := fmt.Sprintf("%s_dur", name)
_, err = js.AddConsumer(name, &nats.ConsumerConfig{
Durable: durName,
Replicas: -1,
})
require_Error(t, err, NewJSReplicasCountCannotBeNegativeError())

_, err = js.AddConsumer(name, &nats.ConsumerConfig{
Durable: durName,
Replicas: 1,
})
require_NoError(t, err)

// Check update now
_, err = js.UpdateConsumer(name, &nats.ConsumerConfig{
Durable: durName,
Replicas: -11,
})
require_Error(t, err, NewJSReplicasCountCannotBeNegativeError())
}

t.Run("Standalone", func(t *testing.T) { testBadReplicas(t, s, "TEST1") })
t.Run("Clustered", func(t *testing.T) { testBadReplicas(t, c.randomServer(), "TEST2") })
}
14 changes: 14 additions & 0 deletions server/jetstream_errors_generated.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,9 @@ const (
// JSRaftGeneralErrF General RAFT error string ({err})
JSRaftGeneralErrF ErrorIdentifier = 10041

// JSReplicasCountCannotBeNegative replicas count cannot be negative
JSReplicasCountCannotBeNegative ErrorIdentifier = 10133

// JSRestoreSubscribeFailedErrF JetStream unable to subscribe to restore snapshot {subject}: {err}
JSRestoreSubscribeFailedErrF ErrorIdentifier = 10042

Expand Down Expand Up @@ -481,6 +484,7 @@ var (
JSNotEnabledForAccountErr: {Code: 503, ErrCode: 10039, Description: "JetStream not enabled for account"},
JSPeerRemapErr: {Code: 503, ErrCode: 10075, Description: "peer remap failed"},
JSRaftGeneralErrF: {Code: 500, ErrCode: 10041, Description: "{err}"},
JSReplicasCountCannotBeNegative: {Code: 400, ErrCode: 10133, Description: "replicas count cannot be negative"},
JSRestoreSubscribeFailedErrF: {Code: 500, ErrCode: 10042, Description: "JetStream unable to subscribe to restore snapshot {subject}: {err}"},
JSSequenceNotFoundErrF: {Code: 400, ErrCode: 10043, Description: "sequence {seq} not found"},
JSSnapshotDeliverSubjectInvalidErr: {Code: 400, ErrCode: 10015, Description: "deliver subject not valid"},
Expand Down Expand Up @@ -1423,6 +1427,16 @@ func NewJSRaftGeneralError(err error, opts ...ErrorOption) *ApiError {
}
}

// NewJSReplicasCountCannotBeNegativeError creates a new JSReplicasCountCannotBeNegative error: "replicas count cannot be negative"
func NewJSReplicasCountCannotBeNegativeError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

return ApiErrors[JSReplicasCountCannotBeNegative]
}

// NewJSRestoreSubscribeFailedError creates a new JSRestoreSubscribeFailedErrF error: "JetStream unable to subscribe to restore snapshot {subject}: {err}"
func NewJSRestoreSubscribeFailedError(err error, subject interface{}, opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
Expand Down
3 changes: 3 additions & 0 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -947,6 +947,9 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
if cfg.Replicas > StreamMaxReplicas {
return cfg, NewJSStreamInvalidConfigError(fmt.Errorf("maximum replicas is %d", StreamMaxReplicas))
}
if cfg.Replicas < 0 {
return cfg, NewJSReplicasCountCannotBeNegativeError()
}
if cfg.MaxMsgs == 0 {
cfg.MaxMsgs = -1
}
Expand Down

0 comments on commit e42260c

Please sign in to comment.