Skip to content

Commit 93f7ce4

Browse files
Jaremajnmoynepiotrpio
authored
[ADDED] Consumer groups with Pinned and Overflow (#1826)
Signed-off-by: Tomasz Pietrek <tomasz@nats.io> Signed-off-by: Jean-Noël Moyne <jnmoyne@gmail.com> Signed-off-by: Piotr Piotrowski <piotr@synadia.com> Co-authored-by: Jean-Noël Moyne <jnmoyne@gmail.com> Co-authored-by: Piotr Piotrowski <piotr@synadia.com>
1 parent 11ef899 commit 93f7ce4

File tree

10 files changed

+1356
-27
lines changed

10 files changed

+1356
-27
lines changed

jetstream/api.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2022-2023 The NATS Authors
1+
// Copyright 2022-2025 The NATS Authors
22
// Licensed under the Apache License, Version 2.0 (the "License");
33
// you may not use this file except in compliance with the License.
44
// You may obtain a copy of the License at
@@ -104,6 +104,9 @@ const (
104104

105105
// apiMsgDeleteT is the endpoint to remove a message.
106106
apiMsgDeleteT = "STREAM.MSG.DELETE.%s"
107+
108+
// apiConsumerUnpinT is the endpoint to unpin a consumer.
109+
apiConsumerUnpinT = "CONSUMER.UNPIN.%s.%s"
107110
)
108111

109112
func (js *jetStream) apiRequestJSON(ctx context.Context, subject string, resp any, data ...[]byte) (*jetStreamMsg, error) {

jetstream/consumer.go

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2022-2024 The NATS Authors
1+
// Copyright 2022-2025 The NATS Authors
22
// Licensed under the Apache License, Version 2.0 (the "License");
33
// you may not use this file except in compliance with the License.
44
// You may obtain a copy of the License at
@@ -374,3 +374,37 @@ func validateConsumerName(dur string) error {
374374
}
375375
return nil
376376
}
377+
378+
func unpinConsumer(ctx context.Context, js *jetStream, stream, consumer, group string) error {
379+
ctx, cancel := js.wrapContextWithoutDeadline(ctx)
380+
if cancel != nil {
381+
defer cancel()
382+
}
383+
if err := validateConsumerName(consumer); err != nil {
384+
return err
385+
}
386+
unpinSubject := fmt.Sprintf(apiConsumerUnpinT, stream, consumer)
387+
388+
var req = consumerUnpinRequest{
389+
Group: group,
390+
}
391+
392+
reqJSON, err := json.Marshal(req)
393+
if err != nil {
394+
return err
395+
}
396+
397+
var resp apiResponse
398+
399+
if _, err := js.apiRequestJSON(ctx, unpinSubject, &resp, reqJSON); err != nil {
400+
return err
401+
}
402+
if resp.Error != nil {
403+
if resp.Error.ErrorCode == JSErrCodeConsumerNotFound {
404+
return ErrConsumerNotFound
405+
}
406+
return resp.Error
407+
}
408+
409+
return nil
410+
}

jetstream/consumer_config.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,9 @@ type (
7676
// TimeStamp indicates when the info was gathered by the server.
7777
TimeStamp time.Time `json:"ts"`
7878

79+
// PriorityGroups contains the information about the currently defined priority groups
80+
PriorityGroups []PriorityGroupState `json:"priority_groups,omitempty"`
81+
7982
// Paused indicates whether the consumer is paused.
8083
Paused bool `json:"paused,omitempty"`
8184

@@ -84,6 +87,17 @@ type (
8487
PauseRemaining time.Duration `json:"pause_remaining,omitempty"`
8588
}
8689

90+
PriorityGroupState struct {
91+
// Group this status is for.
92+
Group string `json:"group"`
93+
94+
// PinnedClientID is the generated ID of the pinned client.
95+
PinnedClientID string `json:"pinned_client_id,omitempty"`
96+
97+
// PinnedTS is the timestamp when the client was pinned.
98+
PinnedTS time.Time `json:"pinned_ts,omitempty"`
99+
}
100+
87101
// ConsumerConfig is the configuration of a JetStream consumer.
88102
ConsumerConfig struct {
89103
// Name is an optional name for the consumer. If not set, one is
@@ -227,6 +241,18 @@ type (
227241

228242
// PauseUntil is for suspending the consumer until the deadline.
229243
PauseUntil *time.Time `json:"pause_until,omitempty"`
244+
245+
// PriorityPolicy represents he priority policy the consumer is set to.
246+
// Requires nats-server v2.11.0 or later.
247+
PriorityPolicy PriorityPolicy `json:"priority_policy,omitempty"`
248+
249+
// PinnedTTL represents the time after which the client will be unpinned
250+
// if no new pull requests are sent.Used with PriorityPolicyPinned.
251+
// Requires nats-server v2.11.0 or later.
252+
PinnedTTL time.Duration `json:"priority_timeout,omitempty"`
253+
254+
// PriorityGroups is a list of priority groups this consumer supports.
255+
PriorityGroups []string `json:"priority_groups,omitempty"`
230256
}
231257

232258
// OrderedConsumerConfig is the configuration of an ordered JetStream
@@ -298,8 +324,51 @@ type (
298324
Stream uint64 `json:"stream_seq"`
299325
Last *time.Time `json:"last_active,omitempty"`
300326
}
327+
328+
// PriorityPolicy determines the priority policy the consumer is set to.
329+
PriorityPolicy int
301330
)
302331

332+
const (
333+
// PriorityPolicyNone is the default priority policy.
334+
PriorityPolicyNone PriorityPolicy = iota
335+
336+
// PriorityPolicyPinned is the priority policy that pins a consumer to a
337+
// specific client.
338+
PriorityPolicyPinned
339+
340+
// PriorityPolicyOverflow is the priority policy that allows for
341+
// restricting when a consumer will receive messages based on the number of
342+
// pending messages or acks.
343+
PriorityPolicyOverflow
344+
)
345+
346+
func (p *PriorityPolicy) UnmarshalJSON(data []byte) error {
347+
switch string(data) {
348+
case jsonString(""):
349+
*p = PriorityPolicyNone
350+
case jsonString("pinned_client"):
351+
*p = PriorityPolicyPinned
352+
case jsonString("overflow"):
353+
*p = PriorityPolicyOverflow
354+
default:
355+
return fmt.Errorf("nats: can not unmarshal %q", data)
356+
}
357+
return nil
358+
}
359+
360+
func (p PriorityPolicy) MarshalJSON() ([]byte, error) {
361+
switch p {
362+
case PriorityPolicyNone:
363+
return json.Marshal("")
364+
case PriorityPolicyPinned:
365+
return json.Marshal("pinned_client")
366+
case PriorityPolicyOverflow:
367+
return json.Marshal("overflow")
368+
}
369+
return nil, fmt.Errorf("nats: unknown priority policy %v", p)
370+
}
371+
303372
const (
304373
// DeliverAllPolicy starts delivering messages from the very beginning of a
305374
// stream. This is the default.

jetstream/errors.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,10 @@ var (
195195
// consumer.
196196
ErrNoMessages JetStreamError = &jsError{message: "no messages"}
197197

198+
// ErrPinIDMismatch is returned when Pin ID sent in the request does not match
199+
// the currently pinned consumer subscriber ID on the server.
200+
ErrPinIDMismatch JetStreamError = &jsError{message: "pin ID mismatch"}
201+
198202
// ErrMaxBytesExceeded is returned when a message would exceed MaxBytes set
199203
// on a pull request.
200204
ErrMaxBytesExceeded JetStreamError = &jsError{message: "message size exceeds max bytes"}

jetstream/jetstream_options.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,73 @@ func (t PullThresholdBytes) configureMessages(opts *consumeOpts) error {
297297
return nil
298298
}
299299

300+
// PullMinPending sets the minimum number of messages that should be pending for
301+
// a consumer with PriorityPolicyOverflow to be considered for delivery.
302+
// If provided, PullPriorityGroup must be set as well and the consumer has to have
303+
// PriorityPolicy set to PriorityPolicyOverflow.
304+
//
305+
// PullMinPending implements both PullConsumeOpt and PullMessagesOpt, allowing
306+
// it to configure Consumer.Consume and Consumer.Messages.
307+
type PullMinPending int
308+
309+
func (min PullMinPending) configureConsume(opts *consumeOpts) error {
310+
if min < 1 {
311+
return fmt.Errorf("%w: min pending should be more than 0", ErrInvalidOption)
312+
}
313+
opts.MinPending = int64(min)
314+
return nil
315+
}
316+
317+
func (min PullMinPending) configureMessages(opts *consumeOpts) error {
318+
if min < 1 {
319+
return fmt.Errorf("%w: min pending should be more than 0", ErrInvalidOption)
320+
}
321+
opts.MinPending = int64(min)
322+
return nil
323+
}
324+
325+
// PullMinAckPending sets the minimum number of pending acks that should be
326+
// present for a consumer with PriorityPolicyOverflow to be considered for
327+
// delivery. If provided, PullPriorityGroup must be set as well and the consumer
328+
// has to have PriorityPolicy set to PriorityPolicyOverflow.
329+
//
330+
// PullMinAckPending implements both PullConsumeOpt and PullMessagesOpt, allowing
331+
// it to configure Consumer.Consume and Consumer.Messages.
332+
type PullMinAckPending int
333+
334+
func (min PullMinAckPending) configureConsume(opts *consumeOpts) error {
335+
if min < 1 {
336+
return fmt.Errorf("%w: min pending should be more than 0", ErrInvalidOption)
337+
}
338+
opts.MinAckPending = int64(min)
339+
return nil
340+
}
341+
342+
func (min PullMinAckPending) configureMessages(opts *consumeOpts) error {
343+
if min < 1 {
344+
return fmt.Errorf("%w: min pending should be more than 0", ErrInvalidOption)
345+
}
346+
opts.MinAckPending = int64(min)
347+
return nil
348+
}
349+
350+
// PullPriorityGroup sets the priority group for a consumer.
351+
// It has to match one of the priority groups set on the consumer.
352+
//
353+
// PullPriorityGroup implements both PullConsumeOpt and PullMessagesOpt, allowing
354+
// it to configure Consumer.Consume and Consumer.Messages.
355+
type PullPriorityGroup string
356+
357+
func (g PullPriorityGroup) configureConsume(opts *consumeOpts) error {
358+
opts.Group = string(g)
359+
return nil
360+
}
361+
362+
func (g PullPriorityGroup) configureMessages(opts *consumeOpts) error {
363+
opts.Group = string(g)
364+
return nil
365+
}
366+
300367
// PullHeartbeat sets the idle heartbeat duration for a pull subscription
301368
// If a client does not receive a heartbeat message from a stream for more
302369
// than the idle heartbeat setting, the subscription will be removed
@@ -368,6 +435,43 @@ func WithMessagesErrOnMissingHeartbeat(hbErr bool) PullMessagesOpt {
368435
})
369436
}
370437

438+
// FetchMinPending sets the minimum number of messages that should be pending for
439+
// a consumer with PriorityPolicyOverflow to be considered for delivery.
440+
// If provided, FetchPriorityGroup must be set as well and the consumer has to have
441+
// PriorityPolicy set to PriorityPolicyOverflow.
442+
func FetchMinPending(min int64) FetchOpt {
443+
return func(req *pullRequest) error {
444+
if min < 1 {
445+
return fmt.Errorf("%w: min pending should be more than 0", ErrInvalidOption)
446+
}
447+
req.MinPending = min
448+
return nil
449+
}
450+
}
451+
452+
// FetchMinAckPending sets the minimum number of pending acks that should be
453+
// present for a consumer with PriorityPolicyOverflow to be considered for
454+
// delivery. If provided, FetchPriorityGroup must be set as well and the consumer
455+
// has to have PriorityPolicy set to PriorityPolicyOverflow.
456+
func FetchMinAckPending(min int64) FetchOpt {
457+
return func(req *pullRequest) error {
458+
if min < 1 {
459+
return fmt.Errorf("%w: min ack pending should be more than 0", ErrInvalidOption)
460+
}
461+
req.MinAckPending = min
462+
return nil
463+
}
464+
}
465+
466+
// FetchPriorityGroup sets the priority group for a consumer.
467+
// It has to match one of the priority groups set on the consumer.
468+
func FetchPriorityGroup(group string) FetchOpt {
469+
return func(req *pullRequest) error {
470+
req.Group = group
471+
return nil
472+
}
473+
}
474+
371475
// FetchMaxWait sets custom timeout for fetching predefined batch of messages.
372476
//
373477
// If not provided, a default of 30 seconds will be used.

jetstream/message.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2022-2024 The NATS Authors
1+
// Copyright 2022-2025 The NATS Authors
22
// Licensed under the Apache License, Version 2.0 (the "License");
33
// you may not use this file except in compliance with the License.
44
// You may obtain a copy of the License at
@@ -147,6 +147,7 @@ const (
147147
reqTimeout = "408"
148148
maxBytesExceeded = "409"
149149
noResponders = "503"
150+
pinIdMismatch = "423"
150151
)
151152

152153
// Headers used when publishing messages.
@@ -418,6 +419,8 @@ func checkMsg(msg *nats.Msg) (bool, error) {
418419
return false, nats.ErrTimeout
419420
case controlMsg:
420421
return false, nil
422+
case pinIdMismatch:
423+
return false, ErrPinIDMismatch
421424
case maxBytesExceeded:
422425
if strings.Contains(strings.ToLower(descr), "message size exceeds maxbytes") {
423426
return false, ErrMaxBytesExceeded

0 commit comments

Comments
 (0)