Skip to content

Commit

Permalink
[ADDED] JetStream: UpdateConsumer and new consumer config's options
Browse files Browse the repository at this point in the history
Namely: MaxRequestBatch, MaxRequestExpires and InactiveThreshold

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Jan 31, 2022
1 parent 972a071 commit 63b8e8b
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 4 deletions.
2 changes: 1 addition & 1 deletion go_test.mod
Expand Up @@ -4,7 +4,7 @@ go 1.16

require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.7.1-0.20220121194245-cfdca3df7649
github.com/nats-io/nats-server/v2 v2.7.2-0.20220131171338-74c0fdc3bb8b
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
Expand Down
6 changes: 3 additions & 3 deletions go_test.sum
Expand Up @@ -17,9 +17,9 @@ github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY=
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.7.1-0.20220121194245-cfdca3df7649 h1:RhaQyUdFELJCBm3lG9xG5pmYyaNNSb49y+YSi8P9hio=
github.com/nats-io/nats-server/v2 v2.7.1-0.20220121194245-cfdca3df7649/go.mod h1:cjxtMhZsZovK1XS2iiapCduR8HuqB/YpFamL0qntIcw=
github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats-server/v2 v2.7.2-0.20220131171338-74c0fdc3bb8b h1:h8EYD8Q7yUbjXmMT6z1XI7SAV+aiHhkNEc1O+WImMh4=
github.com/nats-io/nats-server/v2 v2.7.2-0.20220131171338-74c0fdc3bb8b/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8=
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand Down
37 changes: 37 additions & 0 deletions js.go
Expand Up @@ -903,6 +903,13 @@ type ConsumerConfig struct {
FlowControl bool `json:"flow_control,omitempty"`
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
HeadersOnly bool `json:"headers_only,omitempty"`

// Pull based options.
MaxRequestBatch int `json:"max_batch,omitempty"`
MaxRequestExpires time.Duration `json:"max_expires,omitempty"`

// Ephemeral inactivity threshold.
InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"`
}

// ConsumerInfo is the info from a JetStream consumer.
Expand Down Expand Up @@ -2240,6 +2247,36 @@ func HeadersOnly() SubOpt {
})
}

// MaxRequestBatch sets the maximum pull consumer batch size that a Fetch()
// can request.
func MaxRequestBatch(max int) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.MaxRequestBatch = max
return nil
})
}

// MaxRequestExpires sets the maximum pull consumer request expiration that a
// Fetch() can request (using the Fetch's timeout value).
func MaxRequestExpires(max time.Duration) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.MaxRequestExpires = max
return nil
})
}

// InactiveThreshold indicates how long the server should keep an ephemeral
// after detecting loss of interest.
func InactiveThreshold(threshold time.Duration) SubOpt {
return subOptFn(func(opts *subOpts) error {
if threshold < 0 {
return fmt.Errorf("invalid InactiveThreshold value (%v), needs to be greater or equal to 0", threshold)
}
opts.cfg.InactiveThreshold = threshold
return nil
})
}

func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
sub.mu.Lock()
// TODO(dlc) - Better way to mark especially if we attach.
Expand Down
13 changes: 13 additions & 0 deletions jsm.go
Expand Up @@ -54,6 +54,9 @@ type JetStreamManager interface {
// AddConsumer adds a consumer to a stream.
AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error)

// UpdateConsumer updates an existing consumer.
UpdateConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error)

// DeleteConsumer deletes a consumer.
DeleteConsumer(stream, consumer string, opts ...JSOpt) error

Expand Down Expand Up @@ -276,6 +279,16 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*C
return info.ConsumerInfo, nil
}

func (js *js) UpdateConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
if cfg == nil {
return nil, ErrConsumerConfigRequired
}
if cfg.Durable == _EMPTY_ {
return nil, ErrInvalidDurableName
}
return js.AddConsumer(stream, cfg, opts...)
}

// consumerDeleteResponse is the response for a Consumer delete request.
type consumerDeleteResponse struct {
apiResponse
Expand Down
143 changes: 143 additions & 0 deletions test/js_test.go
Expand Up @@ -35,6 +35,20 @@ import (
natsserver "github.com/nats-io/nats-server/v2/test"
)

func getConnAndJS(t *testing.T, s *server.Server) (*nats.Conn, nats.JetStreamContext) {
t.Helper()

nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
js, err := nc.JetStream()
if err != nil {
t.Fatalf("Got error during initialization %v", err)
}
return nc, js
}

func TestJetStreamNotEnabled(t *testing.T) {
s := RunServerOnPort(-1)
defer s.Shutdown()
Expand Down Expand Up @@ -778,6 +792,32 @@ func TestJetStreamSubscribe(t *testing.T) {

// Both ChanQueueSubscribers use the same consumer.
expectConsumers(t, 4)

sub, err = js.SubscribeSync("foo", nats.InactiveThreshold(-100*time.Millisecond))
if err == nil || !strings.Contains(err.Error(), "invalid InactiveThreshold") {
t.Fatalf("Expected error about invalid option, got %v", err)
}

// Create an ephemeral with a lower inactive threshold
sub, err = js.SubscribeSync("foo", nats.InactiveThreshold(50*time.Millisecond))
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
ci, err := sub.ConsumerInfo()
if err != nil {
t.Fatalf("Error on consumer info: %v", err)
}
name := ci.Name
nc.Close()

time.Sleep(150 * time.Millisecond)

nc, js = getConnAndJS(t, s)
defer nc.Close()

if ci, err := js.ConsumerInfo("TEST", name); err == nil {
t.Fatalf("Expected no consumer to exist, got %+v", ci)
}
}

func TestJetStreamAckPending_Pull(t *testing.T) {
Expand Down Expand Up @@ -1306,6 +1346,83 @@ func TestJetStreamManagement(t *testing.T) {
}
})

t.Run("update consumer", func(t *testing.T) {
ci, err := js.AddConsumer("foo", &nats.ConsumerConfig{
Durable: "update_push_consumer",
DeliverSubject: "bar",
AckPolicy: nats.AckExplicitPolicy,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Currently, server supports these fields:
// description, ack_wait, max_deliver, sample_freq, max_ack_pending, max_waiting and headers_only
expected := ci.Config
expected.Description = "my description"
expected.AckWait = 2 * time.Second
expected.MaxDeliver = 1
expected.SampleFrequency = "30"
expected.MaxAckPending = 10
expected.HeadersOnly = true

// Check that stream name is required
_, err = js.UpdateConsumer("", &expected)
if err != nats.ErrStreamNameRequired {
t.Fatalf("Expected stream name required error, got %v", err)
}
// Check that durable name is required
expected.Durable = ""
_, err = js.UpdateConsumer("foo", &expected)
if err != nats.ErrInvalidDurableName {
t.Fatalf("Expected consumer name required error, got %v", err)
}
expected.Durable = "update_push_consumer"

// Check that configuration is required
_, err = js.UpdateConsumer("foo", nil)
if err != nats.ErrConsumerConfigRequired {
t.Fatalf("Expected consumer configuration required error, got %v", err)
}

// Now check that update works and expected fields have been updated
ci, err = js.UpdateConsumer("foo", &expected)
if err != nil {
t.Fatalf("Error on update: %v", err)
}
if !reflect.DeepEqual(ci.Config, expected) {
t.Fatalf("Expected config to be %+v, got %+v", expected, ci.Config)
}

// Now check with pull consumer
ci, err = js.AddConsumer("foo", &nats.ConsumerConfig{
Durable: "update_pull_consumer",
AckPolicy: nats.AckExplicitPolicy,
MaxWaiting: 1,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Currently, server supports these fields:
// description, ack_wait, max_deliver, sample_freq, max_ack_pending, max_waiting and headers_only
expected = ci.Config
expected.Description = "my description"
expected.AckWait = 2 * time.Second
expected.MaxDeliver = 1
expected.SampleFrequency = "30"
expected.MaxAckPending = 10
expected.MaxWaiting = 20
expected.HeadersOnly = true
expected.MaxRequestBatch = 10
expected.MaxRequestExpires = 2 * time.Second
ci, err = js.UpdateConsumer("foo", &expected)
if err != nil {
t.Fatalf("Error on update: %v", err)
}
if !reflect.DeepEqual(ci.Config, expected) {
t.Fatalf("Expected config to be %+v, got %+v", expected, ci.Config)
}
})

t.Run("purge stream", func(t *testing.T) {
if err := js.PurgeStream("foo"); err != nil {
t.Fatalf("Unexpected error: %v", err)
Expand Down Expand Up @@ -5104,6 +5221,32 @@ func testJetStreamFetchOptions(t *testing.T, srvs ...*jsServer) {
}
}

t.Run("max request batch", func(t *testing.T) {
defer js.PurgeStream(subject)

sub, err := js.PullSubscribe(subject, "max-request-batch", nats.MaxRequestBatch(2))
if err != nil {
t.Fatal(err)
}
defer sub.Unsubscribe()
if _, err := sub.Fetch(10); err == nil || !strings.Contains(err.Error(), "MaxRequestBatch of 2") {
t.Fatalf("Expected error about max request batch size, got %v", err)
}
})

t.Run("max request expires", func(t *testing.T) {
defer js.PurgeStream(subject)

sub, err := js.PullSubscribe(subject, "max-request-expires", nats.MaxRequestExpires(50*time.Millisecond))
if err != nil {
t.Fatal(err)
}
defer sub.Unsubscribe()
if _, err := sub.Fetch(10); err == nil || !strings.Contains(err.Error(), "MaxRequestExpires of 50ms") {
t.Fatalf("Expected error about max request expiration, got %v", err)
}
})

t.Run("batch size", func(t *testing.T) {
defer js.PurgeStream(subject)

Expand Down

0 comments on commit 63b8e8b

Please sign in to comment.