Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds consumer metadata checking to next requests #5141

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 29 additions & 9 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2965,36 +2965,36 @@ func (o *consumer) needAck(sseq uint64, subj string) bool {
}

// Helper for the next message requests.
func nextReqFromMsg(msg []byte) (time.Time, int, int, bool, time.Duration, time.Time, error) {
func nextReqFromMsg(msg []byte) (time.Time, int, int, bool, time.Duration, time.Time, map[string]string, error) {
req := bytes.TrimSpace(msg)

switch {
case len(req) == 0:
return time.Time{}, 1, 0, false, 0, time.Time{}, nil
return time.Time{}, 1, 0, false, 0, time.Time{}, nil, nil

case req[0] == '{':
var cr JSApiConsumerGetNextRequest
if err := json.Unmarshal(req, &cr); err != nil {
return time.Time{}, -1, 0, false, 0, time.Time{}, err
return time.Time{}, -1, 0, false, 0, time.Time{}, nil, err
}
var hbt time.Time
if cr.Heartbeat > 0 {
if cr.Heartbeat*2 > cr.Expires {
return time.Time{}, 1, 0, false, 0, time.Time{}, errors.New("heartbeat value too large")
return time.Time{}, 1, 0, false, 0, time.Time{}, nil, errors.New("heartbeat value too large")
}
hbt = time.Now().Add(cr.Heartbeat)
}
if cr.Expires == time.Duration(0) {
return time.Time{}, cr.Batch, cr.MaxBytes, cr.NoWait, cr.Heartbeat, hbt, nil
return time.Time{}, cr.Batch, cr.MaxBytes, cr.NoWait, cr.Heartbeat, hbt, nil, nil
}
return time.Now().Add(cr.Expires), cr.Batch, cr.MaxBytes, cr.NoWait, cr.Heartbeat, hbt, nil
return time.Now().Add(cr.Expires), cr.Batch, cr.MaxBytes, cr.NoWait, cr.Heartbeat, hbt, cr.Metadata, nil
default:
if n, err := strconv.Atoi(string(req)); err == nil {
return time.Time{}, n, 0, false, 0, time.Time{}, nil
return time.Time{}, n, 0, false, 0, time.Time{}, nil, nil
}
}

return time.Time{}, 1, 0, false, 0, time.Time{}, nil
return time.Time{}, 1, 0, false, 0, time.Time{}, nil, nil
}

// Represents a request that is on the internal waiting queue
Expand Down Expand Up @@ -3310,7 +3310,7 @@ func (o *consumer) processNextMsgRequest(reply string, msg []byte) {
}

// Check payload here to see if they sent in batch size or a formal request.
expires, batchSize, maxBytes, noWait, hb, hbt, err := nextReqFromMsg(msg)
expires, batchSize, maxBytes, noWait, hb, hbt, consumerMetaData, err := nextReqFromMsg(msg)
if err != nil {
sendErr(400, fmt.Sprintf("Bad Request - %v", err))
return
Expand All @@ -3332,6 +3332,26 @@ func (o *consumer) processNextMsgRequest(reply string, msg []byte) {
return
}

if len(consumerMetaData) > 0 && len(o.cfg.Metadata) > 0 {
var matching = true
for k, v := range consumerMetaData {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a one way check.
If consumer metadata on the server has key-pairs that are not present on the metadata passed in request, the check will still return true, a false positive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a false positive, that's the behavior I want:
I pass you a collection of metadata key/values and I want to make sure those are present in the consumer's metadata and they match the value. It's not a 'deep equal' but a 'check that those keys and values are there and the same'.

configValue, ok := o.cfg.Metadata[k]
if ok {
if configValue != v {
matching = false
break
}
} else {
matching = false
break
}
}
if !matching {
sendErr(409, "Request's medata does not match the consumer's")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should to be the same error code.

I would really like us to have separate code's at least for errors that have a very distinct way to react to them in the client.

Also this should be a const, not a literal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just did the same as what the existing other 409 errors do (e.g. line 3308) 🤷‍♂️

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typos and incorrect 's here

Copy link
Member

@Jarema Jarema Mar 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be 409.
Client libraries should not compare strings to determine what is happening and what to do.

EDIT: I know we did it all over the place already, but it would be really nice to improve the situation, at least for the new errors...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use http status codes here, and 4xx are client errors, with 409 being conflict which seems correct.

return
}
}

// If we have the max number of requests already pending try to expire.
if o.waiting.isFull() {
// Try to expire some of the requests.
Expand Down
11 changes: 6 additions & 5 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,11 +725,12 @@ const JSApiConsumerListResponseType = "io.nats.jetstream.api.v1.consumer_list_re

// JSApiConsumerGetNextRequest is for getting next messages for pull based consumers.
type JSApiConsumerGetNextRequest struct {
Expires time.Duration `json:"expires,omitempty"`
Batch int `json:"batch,omitempty"`
MaxBytes int `json:"max_bytes,omitempty"`
NoWait bool `json:"no_wait,omitempty"`
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
Expires time.Duration `json:"expires,omitempty"`
Batch int `json:"batch,omitempty"`
MaxBytes int `json:"max_bytes,omitempty"`
NoWait bool `json:"no_wait,omitempty"`
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"`
}

// JSApiStreamTemplateCreateResponse for creating templates.
Expand Down
81 changes: 80 additions & 1 deletion server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ func TestJetStreamConsumerMaxDeliveries(t *testing.T) {

func TestJetStreamNextReqFromMsg(t *testing.T) {
bef := time.Now()
expires, _, _, _, _, _, err := nextReqFromMsg([]byte(`{"expires":5000000000}`)) // nanoseconds
expires, _, _, _, _, _, _, err := nextReqFromMsg([]byte(`{"expires":5000000000}`)) // nanoseconds
require_NoError(t, err)
now := time.Now()
if expires.Before(bef.Add(5*time.Second)) || expires.After(now.Add(5*time.Second)) {
Expand Down Expand Up @@ -23078,3 +23078,82 @@ func TestJetStreamDirectGetMultiPaging(t *testing.T) {
processPartial(b + 1) // 100 + EOB
}
}

func TestJetStreamPullConsumerNextMetadata(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
})
require_NoError(t, err)

msgSize := 128
msg := make([]byte, msgSize)
crand.Read(msg)

for i := 0; i < 1; i++ {
_, err := js.Publish("foo", msg)
require_NoError(t, err)
}

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "dur",
AckPolicy: nats.AckExplicitPolicy,
Metadata: map[string]string{"foo": "bar",
"foo2": "bar"},
})
require_NoError(t, err)

sub := natsSubSync(t, nc, nats.NewInbox())

req := JSApiConsumerGetNextRequest{Batch: 0, MaxBytes: 1024, Expires: 250 * time.Millisecond, Metadata: map[string]string{"foo": "bar", "foo2": "bar2"}}
reqb, _ := json.Marshal(req)
err = nc.PublishRequest("$JS.API.CONSUMER.MSG.NEXT.TEST.dur", sub.Subject, reqb)
require_NoError(t, err)

// Expect an error status as the metadata doesn't match (wrong value for key "foo2").
pullMsg, err := sub.NextMsg(time.Second * 1)
require_NoError(t, err)

if v := pullMsg.Header.Get("Status"); v != "409" {
t.Fatalf("Expected 409, got: %s", v)
}

req = JSApiConsumerGetNextRequest{Batch: 0, MaxBytes: 1024, Expires: 250 * time.Millisecond, Metadata: map[string]string{"foo": "bar", "foo2": "bar", "foo3": "bar"}}
reqb, _ = json.Marshal(req)
err = nc.PublishRequest("$JS.API.CONSUMER.MSG.NEXT.TEST.dur", sub.Subject, reqb)
require_NoError(t, err)

// Expect an error status as the metadata doesn't match (foo3 missing).
pullMsg, err = sub.NextMsg(time.Second * 1)
require_NoError(t, err)

if v := pullMsg.Header.Get("Status"); v != "409" {
t.Fatalf("Expected 409, got: %s", v)
}

req = JSApiConsumerGetNextRequest{Batch: 0, MaxBytes: 1024, Expires: 250 * time.Millisecond, Metadata: map[string]string{"foo": "bar", "foo2": "bar"}}
reqb, _ = json.Marshal(req)
err = nc.PublishRequest("$JS.API.CONSUMER.MSG.NEXT.TEST.dur", sub.Subject, reqb)
require_NoError(t, err)

// This time should be no error
pullMsg, err = sub.NextMsg(time.Second * 1)
require_NoError(t, err)

if len(pullMsg.Header) != 0 {
t.Fatalf("Expected no header, got: %v", pullMsg.Header)
}

pullMsg, err = sub.NextMsg(time.Second * 1)
require_NoError(t, err)

if v := pullMsg.Header.Get("Description"); v != "Batch Completed" {
t.Fatalf("Expected Batch Completed, got: %s", v)
}
}