Skip to content

Commit

Permalink
Adjust consumer creation to nats-server v2.9.0
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrpio committed Sep 14, 2022
1 parent 25b6392 commit 8842548
Show file tree
Hide file tree
Showing 4 changed files with 269 additions and 140 deletions.
131 changes: 53 additions & 78 deletions js.go
Expand Up @@ -122,9 +122,19 @@ const (
apiAccountInfo = "INFO"

// apiConsumerCreateT is used to create consumers.
apiConsumerCreateT = "CONSUMER.CREATE.%s"
// it accepts stream name and consumer name.
apiConsumerCreateT = "CONSUMER.CREATE.%s.%s"

// apiConsumerCreateT is used to create consumers.
// it accepts stream name, consumer name and filter subject
apiConsumerCreateWithFilterSubjectT = "CONSUMER.CREATE.%s.%s.%s"

// apiLegacyConsumerCreateT is used to create consumers.
// this is a legacy endpoint to support creating ephemerals before nats-server v2.9.0.
apiLegacyConsumerCreateT = "CONSUMER.CREATE.%s"

// apiDurableCreateT is used to create durable consumers.
// this is a legacy endpoint to support creating durable consumers before nats-server v2.9.0.
apiDurableCreateT = "CONSUMER.DURABLE.CREATE.%s.%s"

// apiConsumerInfoT is used to create consumers.
Expand Down Expand Up @@ -1031,6 +1041,7 @@ func (d nakDelay) configureAck(opts *ackOpts) error {
// ConsumerConfig is the configuration of a JetStream consumer.
type ConsumerConfig struct {
Durable string `json:"durable_name,omitempty"`
Name string `json:"name,omitempty"`
Description string `json:"description,omitempty"`
DeliverPolicy DeliverPolicy `json:"deliver_policy"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
Expand Down Expand Up @@ -1621,95 +1632,59 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,

// If we are creating or updating let's process that request.
if shouldCreate {
j, err := json.Marshal(ccreq)
if err != nil {
cleanUpSub()
return nil, err
}

var ccSubj string
if isDurable {
ccSubj = js.apiSubj(fmt.Sprintf(apiDurableCreateT, stream, cfg.Durable))
} else {
ccSubj = js.apiSubj(fmt.Sprintf(apiConsumerCreateT, stream))
}

if js.opts.shouldTrace {
ctrace := js.opts.ctrace
if ctrace.RequestSent != nil {
ctrace.RequestSent(ccSubj, j)
}
}
resp, err := nc.Request(ccSubj, j, js.opts.wait)
info, err := js.upsertConsumer(stream, cfg.Durable, ccreq.Config)
if err != nil {
cleanUpSub()
if err == ErrNoResponders {
err = ErrJetStreamNotEnabled
var apiErr *APIError
if ok := errors.As(err, &apiErr); !ok {
cleanUpSub()
return nil, err
}
return nil, err
}
if js.opts.shouldTrace {
ctrace := js.opts.ctrace
if ctrace.ResponseReceived != nil {
ctrace.ResponseReceived(ccSubj, resp.Data, resp.Header)
if consumer == _EMPTY_ ||
(apiErr.ErrorCode != JSErrCodeConsumerAlreadyExists && apiErr.ErrorCode != JSErrCodeConsumerNameExists) {
cleanUpSub()
if errors.Is(apiErr, ErrStreamNotFound) {
return nil, ErrStreamNotFound
}
return nil, err
}
}

var cinfo consumerResponse
err = json.Unmarshal(resp.Data, &cinfo)
if err != nil {
cleanUpSub()
return nil, err
}
info = cinfo.ConsumerInfo

if cinfo.Error != nil {
// We will not be using this sub here if we were push based.
if !isPullMode {
cleanUpSub()
}
if consumer != _EMPTY_ &&
(cinfo.Error.ErrorCode == JSErrCodeConsumerAlreadyExists || cinfo.Error.ErrorCode == JSErrCodeConsumerNameExists) {

info, err = js.ConsumerInfo(stream, consumer)
if err != nil {
return nil, err
}
deliver, err = processConsInfo(info, o.cfg, isPullMode, subj, queue)
if err != nil {
return nil, err
}
info, err = js.ConsumerInfo(stream, consumer)
if err != nil {
return nil, err
}
deliver, err = processConsInfo(info, o.cfg, isPullMode, subj, queue)
if err != nil {
return nil, err
}

if !isPullMode {
// We can't reuse the channel, so if one was passed, we need to create a new one.
if isSync {
ch = make(chan *Msg, cap(ch))
} else if ch != nil {
// User provided (ChanSubscription), simply try to drain it.
for done := false; !done; {
select {
case <-ch:
default:
done = true
}
if !isPullMode {
// We can't reuse the channel, so if one was passed, we need to create a new one.
if isSync {
ch = make(chan *Msg, cap(ch))
} else if ch != nil {
// User provided (ChanSubscription), simply try to drain it.
for done := false; !done; {
select {
case <-ch:
default:
done = true
}
}
jsi.deliver = deliver
jsi.hbi = info.Config.Heartbeat

// Recreate the subscription here.
sub, err = nc.subscribe(jsi.deliver, queue, cb, ch, isSync, jsi)
if err != nil {
return nil, err
}
hasFC = info.Config.FlowControl
hasHeartbeats = info.Config.Heartbeat > 0
}
} else {
if errors.Is(cinfo.Error, ErrStreamNotFound) {
return nil, ErrStreamNotFound
jsi.deliver = deliver
jsi.hbi = info.Config.Heartbeat

// Recreate the subscription here.
sub, err = nc.subscribe(jsi.deliver, queue, cb, ch, isSync, jsi)
if err != nil {
return nil, err
}
return nil, cinfo.Error
hasFC = info.Config.FlowControl
hasHeartbeats = info.Config.Heartbeat > 0
}
} else {
// Since the library created the JS consumer, it will delete it on Unsubscribe()/Drain()
Expand Down Expand Up @@ -1960,7 +1935,7 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
cfg.DeliverPolicy = DeliverByStartSequencePolicy
cfg.OptStartSeq = sseq

ccSubj := fmt.Sprintf(apiConsumerCreateT, jsi.stream)
ccSubj := fmt.Sprintf(apiLegacyConsumerCreateT, jsi.stream)
j, err := json.Marshal(jsi.ccreq)
js := jsi.js
sub.mu.Unlock()
Expand Down
14 changes: 10 additions & 4 deletions jserrors.go
Expand Up @@ -30,7 +30,7 @@ var (
// ErrStreamNotFound is an error returned when stream with given name does not exist.
ErrStreamNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamNotFound, Description: "stream not found", Code: 404}}

// ErrStreamNameAlreadyInUse is returned when a stream with given name already exists and has a different configuration
// ErrStreamNameAlreadyInUse is returned when a stream with given name already exists and has a different configuration.
ErrStreamNameAlreadyInUse JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamNameInUse, Description: "stream name already in use", Code: 400}}

// ErrConsumerNotFound is an error returned when consumer with given name does not exist.
Expand Down Expand Up @@ -59,7 +59,7 @@ var (
// ErrStreamNameRequired is returned when the provided stream name is empty.
ErrStreamNameRequired JetStreamError = &jsError{message: "stream name is required"}

// ErrConsumerNameRequired is returned when the provided consumer durable name is empty,
// ErrConsumerNameRequired is returned when the provided consumer durable name is empty.
ErrConsumerNameRequired JetStreamError = &jsError{message: "consumer name is required"}

// ErrConsumerConfigRequired is returned when empty consumer consuguration is supplied to add/update consumer.
Expand Down Expand Up @@ -98,8 +98,14 @@ var (
// ErrCantAckIfConsumerAckNone is returned when attempting to ack a message for consumer with AckNone policy set.
ErrCantAckIfConsumerAckNone JetStreamError = &jsError{message: "cannot acknowledge a message for a consumer with AckNone policy"}

// DEPRECATED: ErrInvalidDurableName is no longer returned and will be removed in future releases
// Use ErrInvalidConsumerName instead
// ErrRequireServerVersion is returned when using a feature which requires a higher server version.
ErrRequireServerVersion JetStreamError = &jsError{message: "invalid server version"}

// ErrConsumerNameMismtch is returned user provides both consumer name and durable name and they are not equal.
ErrConsumerNameMismtch JetStreamError = &jsError{message: "consumer name and durable name do not match"}

// DEPRECATED: ErrInvalidDurableName is no longer returned and will be removed in future releases.
// Use ErrInvalidConsumerName instead.
ErrInvalidDurableName = errors.New("nats: invalid durable name")
)

Expand Down
55 changes: 43 additions & 12 deletions jsm.go
Expand Up @@ -259,34 +259,57 @@ type consumerResponse struct {

// AddConsumer will add a JetStream consumer.
func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
if cfg != nil && cfg.Durable != _EMPTY_ {
consInfo, err := js.ConsumerInfo(stream, cfg.Durable)
if cfg == nil {
cfg = &ConsumerConfig{}
}
if cfg.Name != _EMPTY_ && !js.nc.serverMinVersion(2, 9, 0) {
return nil, fmt.Errorf("%w: %s", ErrRequireServerVersion, "consumer name requires at least server version 2.9.0")
}
if cfg.Name != _EMPTY_ && cfg.Durable != _EMPTY_ && cfg.Name != cfg.Durable {
return nil, ErrConsumerNameMismtch
}
consumerName := cfg.Name
if consumerName == _EMPTY_ {
consumerName = cfg.Durable
}
if consumerName != _EMPTY_ {
consInfo, err := js.ConsumerInfo(stream, consumerName)
if err != nil && !errors.Is(err, ErrConsumerNotFound) && !errors.Is(err, ErrStreamNotFound) {
return nil, err
}

if consInfo != nil {
sameConfig := checkConfig(&consInfo.Config, cfg)
if sameConfig != nil {
return nil, fmt.Errorf("%w: creating consumer %q on stream %q", ErrConsumerNameAlreadyInUse, cfg.Durable, stream)
return nil, fmt.Errorf("%w: creating consumer %q on stream %q", ErrConsumerNameAlreadyInUse, consumerName, stream)
}
}
}

return js.upsertConsumer(stream, cfg, opts...)
return js.upsertConsumer(stream, consumerName, cfg, opts...)
}

func (js *js) UpdateConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
if cfg == nil {
return nil, ErrConsumerConfigRequired
}
if cfg.Durable == _EMPTY_ {
if cfg.Name != _EMPTY_ && !js.nc.serverMinVersion(2, 9, 0) {
return nil, fmt.Errorf("%w: %s", ErrRequireServerVersion, "consumer name requires at least server version 2.9.0")
}
if cfg.Name != _EMPTY_ && cfg.Durable != _EMPTY_ && cfg.Name != cfg.Durable {
return nil, ErrConsumerNameMismtch
}
consumerName := cfg.Name
if consumerName == _EMPTY_ {
consumerName = cfg.Durable
}
if consumerName == _EMPTY_ {
return nil, ErrConsumerNameRequired
}
return js.upsertConsumer(stream, cfg, opts...)
return js.upsertConsumer(stream, consumerName, cfg, opts...)
}

func (js *js) upsertConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
func (js *js) upsertConsumer(stream, consumerName string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
if err := checkStreamName(stream); err != nil {
return nil, err
}
Expand All @@ -304,13 +327,21 @@ func (js *js) upsertConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt)
}

var ccSubj string
if cfg != nil && cfg.Durable != _EMPTY_ {
if err := checkConsumerName(cfg.Durable); err != nil {
return nil, err
if consumerName == _EMPTY_ {
// if consumer name is empty, use the legacy ephemeral endpoint
ccSubj = fmt.Sprintf(apiLegacyConsumerCreateT, stream)
} else if err := checkConsumerName(consumerName); err != nil {
return nil, err
} else if js.nc.serverMinVersion(2, 9, 0) {
// if above server version 2.9.0, use the endpoints with consumer name
if cfg.FilterSubject == _EMPTY_ || cfg.FilterSubject == ">" {
ccSubj = fmt.Sprintf(apiConsumerCreateT, stream, consumerName)
} else {
ccSubj = fmt.Sprintf(apiConsumerCreateWithFilterSubjectT, stream, consumerName, cfg.FilterSubject)
}
ccSubj = fmt.Sprintf(apiDurableCreateT, stream, cfg.Durable)
} else {
ccSubj = fmt.Sprintf(apiConsumerCreateT, stream)
// if consumer name is not empty and the server version is lower than 2.9.0, use the legacy DURABLE.CREATE endpoint
ccSubj = fmt.Sprintf(apiDurableCreateT, stream, consumerName)
}

resp, err := js.apiRequestWithContext(o.ctx, js.apiSubj(ccSubj), req)
Expand Down

0 comments on commit 8842548

Please sign in to comment.