Skip to content

Commit

Permalink
Merge 21ea50a into b81c9e7
Browse files Browse the repository at this point in the history
  • Loading branch information
wallyqs committed Aug 22, 2022
2 parents b81c9e7 + 21ea50a commit a2187aa
Show file tree
Hide file tree
Showing 3 changed files with 219 additions and 107 deletions.
81 changes: 61 additions & 20 deletions jsm.go
Expand Up @@ -154,17 +154,60 @@ type ExternalStream struct {
DeliverPrefix string `json:"deliver"`
}

// APIError is included in all API responses if there was an error.
type APIError struct {
// JetStreamAPIError is included in all API responses if there was an error.
type JetStreamAPIError struct {
Code int `json:"code"`
ErrorCode ErrorCode `json:"err_code"`
Description string `json:"description,omitempty"`
}

// Error prints the JetStream API error code and description
func (e *JetStreamAPIError) Error() string {
return fmt.Sprintf("nats: API error %d: %s", e.ErrorCode, e.Description)
}

// Is matches against an JetStreamAPIError.
func (e *JetStreamAPIError) Is(err error) bool {
// Extract internal APIError to match against.
var aerr *JetStreamAPIError
ok := errors.As(err, &aerr)
if !ok {
return ok
}
return e.ErrorCode == aerr.ErrorCode
}

// JetStreamError is an error result that happens when using JetStream.
type JetStreamError interface {
APIError() *JetStreamAPIError
error
}

type jsError struct {
apiErr *JetStreamAPIError
message string
}

func (err *jsError) APIError() *JetStreamAPIError {
return err.apiErr
}

func (err *jsError) Error() string {
if err.apiErr != nil && err.apiErr.Description != "" {
return fmt.Sprintf("nats: %v", err.apiErr.Description)
}
return fmt.Sprintf("nats: %v", err.message)
}

func (err *jsError) Unwrap() error {
// Allow matching to embedded APIError in case there is one.
return err.apiErr
}

// apiResponse is a standard response from the JetStream JSON API
type apiResponse struct {
Type string `json:"type"`
Error *APIError `json:"error,omitempty"`
Type string `json:"type"`
Error *JetStreamAPIError `json:"error,omitempty"`
}

// apiPaged includes variables used to create paged responses from the JSON API
Expand Down Expand Up @@ -235,11 +278,6 @@ const (
JSErrCodeMessageNotFound ErrorCode = 10037
)

// Error prints the JetStream API error code and description
func (e *APIError) Error() string {
return fmt.Sprintf("nats: API error %d: %s", e.ErrorCode, e.Description)
}

// AccountInfo retrieves info about the JetStream usage from the current account.
// If JetStream is not enabled, this will return ErrJetStreamNotEnabled
// Other errors can happen but are generally considered retryable
Expand All @@ -265,13 +303,16 @@ func (js *js) AccountInfo(opts ...JSOpt) (*AccountInfo, error) {
return nil, err
}
if info.Error != nil {
if info.Error.ErrorCode == JSErrCodeJetStreamNotEnabledForAccount {
return nil, ErrJetStreamNotEnabledForAccount
}
if info.Error.ErrorCode == JSErrCodeJetStreamNotEnabled {
return nil, ErrJetStreamNotEnabled
var err error
// Internally checks based on error code instead of description match.
if errors.Is(info.Error, ErrJetStreamNotEnabledForAccount) {
err = ErrJetStreamNotEnabledForAccount
} else {
err = &jsError{
apiErr: info.Error,
}
}
return nil, info.Error
return nil, err
}

return &info.AccountInfo, nil
Expand Down Expand Up @@ -762,11 +803,11 @@ type StreamInfo struct {

// StreamSourceInfo shows information about an upstream stream source.
type StreamSourceInfo struct {
Name string `json:"name"`
Lag uint64 `json:"lag"`
Active time.Duration `json:"active"`
External *ExternalStream `json:"external"`
Error *APIError `json:"error"`
Name string `json:"name"`
Lag uint64 `json:"lag"`
Active time.Duration `json:"active"`
External *ExternalStream `json:"external"`
Error *JetStreamAPIError `json:"error"`
}

// StreamState is information about the given stream.
Expand Down
174 changes: 89 additions & 85 deletions nats.go
Expand Up @@ -90,91 +90,95 @@ const (

// Errors
var (
ErrConnectionClosed = errors.New("nats: connection closed")
ErrConnectionDraining = errors.New("nats: connection draining")
ErrDrainTimeout = errors.New("nats: draining connection timed out")
ErrConnectionReconnecting = errors.New("nats: connection reconnecting")
ErrSecureConnRequired = errors.New("nats: secure connection required")
ErrSecureConnWanted = errors.New("nats: secure connection not available")
ErrBadSubscription = errors.New("nats: invalid subscription")
ErrTypeSubscription = errors.New("nats: invalid subscription type")
ErrBadSubject = errors.New("nats: invalid subject")
ErrBadQueueName = errors.New("nats: invalid queue name")
ErrSlowConsumer = errors.New("nats: slow consumer, messages dropped")
ErrTimeout = errors.New("nats: timeout")
ErrBadTimeout = errors.New("nats: timeout invalid")
ErrAuthorization = errors.New("nats: authorization violation")
ErrAuthExpired = errors.New("nats: authentication expired")
ErrAuthRevoked = errors.New("nats: authentication revoked")
ErrAccountAuthExpired = errors.New("nats: account authentication expired")
ErrNoServers = errors.New("nats: no servers available for connection")
ErrJsonParse = errors.New("nats: connect message, json parse error")
ErrChanArg = errors.New("nats: argument needs to be a channel type")
ErrMaxPayload = errors.New("nats: maximum payload exceeded")
ErrMaxMessages = errors.New("nats: maximum messages delivered")
ErrSyncSubRequired = errors.New("nats: illegal call on an async subscription")
ErrMultipleTLSConfigs = errors.New("nats: multiple tls.Configs not allowed")
ErrNoInfoReceived = errors.New("nats: protocol exception, INFO not received")
ErrReconnectBufExceeded = errors.New("nats: outbound buffer limit exceeded")
ErrInvalidConnection = errors.New("nats: invalid connection")
ErrInvalidMsg = errors.New("nats: invalid message or message nil")
ErrInvalidArg = errors.New("nats: invalid argument")
ErrInvalidContext = errors.New("nats: invalid context")
ErrNoDeadlineContext = errors.New("nats: context requires a deadline")
ErrNoEchoNotSupported = errors.New("nats: no echo option not supported by this server")
ErrClientIDNotSupported = errors.New("nats: client ID not supported by this server")
ErrUserButNoSigCB = errors.New("nats: user callback defined without a signature handler")
ErrNkeyButNoSigCB = errors.New("nats: nkey defined without a signature handler")
ErrNoUserCB = errors.New("nats: user callback not defined")
ErrNkeyAndUser = errors.New("nats: user callback and nkey defined")
ErrNkeysNotSupported = errors.New("nats: nkeys not supported by the server")
ErrStaleConnection = errors.New("nats: " + STALE_CONNECTION)
ErrTokenAlreadySet = errors.New("nats: token and token handler both set")
ErrMsgNotBound = errors.New("nats: message is not bound to subscription/connection")
ErrMsgNoReply = errors.New("nats: message does not have a reply")
ErrClientIPNotSupported = errors.New("nats: client IP not supported by this server")
ErrDisconnected = errors.New("nats: server is disconnected")
ErrHeadersNotSupported = errors.New("nats: headers not supported by this server")
ErrBadHeaderMsg = errors.New("nats: message could not decode headers")
ErrNoResponders = errors.New("nats: no responders available for request")
ErrNoContextOrTimeout = errors.New("nats: no context or timeout given")
ErrPullModeNotAllowed = errors.New("nats: pull based not supported")
ErrJetStreamNotEnabledForAccount = errors.New("nats: jetstream not enabled for this account")
ErrJetStreamNotEnabled = errors.New("nats: jetstream not enabled")
ErrJetStreamBadPre = errors.New("nats: jetstream api prefix not valid")
ErrNoStreamResponse = errors.New("nats: no response from stream")
ErrNotJSMessage = errors.New("nats: not a jetstream message")
ErrInvalidStreamName = errors.New("nats: invalid stream name")
ErrInvalidConsumerName = errors.New("nats: invalid consumer name")
ErrNoMatchingStream = errors.New("nats: no stream matches subject")
ErrSubjectMismatch = errors.New("nats: subject does not match consumer")
ErrContextAndTimeout = errors.New("nats: context and timeout can not both be set")
ErrInvalidJSAck = errors.New("nats: invalid jetstream publish response")
ErrMultiStreamUnsupported = errors.New("nats: multiple streams are not supported")
ErrStreamConfigRequired = errors.New("nats: stream configuration is required")
ErrStreamNameRequired = errors.New("nats: stream name is required")
ErrStreamNotFound = errors.New("nats: stream not found")
ErrConsumerNotFound = errors.New("nats: consumer not found")
ErrConsumerNameRequired = errors.New("nats: consumer name is required")
ErrConsumerConfigRequired = errors.New("nats: consumer configuration is required")
ErrStreamSnapshotConfigRequired = errors.New("nats: stream snapshot configuration is required")
ErrDeliverSubjectRequired = errors.New("nats: deliver subject is required")
ErrPullSubscribeToPushConsumer = errors.New("nats: cannot pull subscribe to push based consumer")
ErrPullSubscribeRequired = errors.New("nats: must use pull subscribe to bind to pull based consumer")
ErrConsumerNotActive = errors.New("nats: consumer not active")
ErrConsumerNameAlreadyInUse = errors.New("nats: consumer name already in use")
ErrMsgNotFound = errors.New("nats: message not found")
ErrMsgAlreadyAckd = errors.New("nats: message was already acknowledged")
ErrCantAckIfConsumerAckNone = errors.New("nats: cannot acknowledge a message for a consumer with AckNone policy")
ErrStreamInfoMaxSubjects = errors.New("nats: subject details would exceed maximum allowed")
ErrStreamNameAlreadyInUse = errors.New("nats: stream name already in use")
ErrMaxConnectionsExceeded = errors.New("nats: server maximum connections exceeded")
ErrBadRequest = errors.New("nats: bad request")
ErrConnectionNotTLS = errors.New("nats: connection is not tls")

// DEPRECATED: ErrInvalidDurableName is no longer returned and will be removed in future releases
// Use ErrInvalidConsumerName instead
ErrInvalidDurableName = errors.New("nats: invalid durable name")
ErrConnectionClosed = errors.New("nats: connection closed")
ErrConnectionDraining = errors.New("nats: connection draining")
ErrDrainTimeout = errors.New("nats: draining connection timed out")
ErrConnectionReconnecting = errors.New("nats: connection reconnecting")
ErrSecureConnRequired = errors.New("nats: secure connection required")
ErrSecureConnWanted = errors.New("nats: secure connection not available")
ErrBadSubscription = errors.New("nats: invalid subscription")
ErrTypeSubscription = errors.New("nats: invalid subscription type")
ErrBadSubject = errors.New("nats: invalid subject")
ErrBadQueueName = errors.New("nats: invalid queue name")
ErrSlowConsumer = errors.New("nats: slow consumer, messages dropped")
ErrTimeout = errors.New("nats: timeout")
ErrBadTimeout = errors.New("nats: timeout invalid")
ErrAuthorization = errors.New("nats: authorization violation")
ErrAuthExpired = errors.New("nats: authentication expired")
ErrAuthRevoked = errors.New("nats: authentication revoked")
ErrAccountAuthExpired = errors.New("nats: account authentication expired")
ErrNoServers = errors.New("nats: no servers available for connection")
ErrJsonParse = errors.New("nats: connect message, json parse error")
ErrChanArg = errors.New("nats: argument needs to be a channel type")
ErrMaxPayload = errors.New("nats: maximum payload exceeded")
ErrMaxMessages = errors.New("nats: maximum messages delivered")
ErrSyncSubRequired = errors.New("nats: illegal call on an async subscription")
ErrMultipleTLSConfigs = errors.New("nats: multiple tls.Configs not allowed")
ErrNoInfoReceived = errors.New("nats: protocol exception, INFO not received")
ErrReconnectBufExceeded = errors.New("nats: outbound buffer limit exceeded")
ErrInvalidConnection = errors.New("nats: invalid connection")
ErrInvalidMsg = errors.New("nats: invalid message or message nil")
ErrInvalidArg = errors.New("nats: invalid argument")
ErrInvalidContext = errors.New("nats: invalid context")
ErrNoDeadlineContext = errors.New("nats: context requires a deadline")
ErrNoEchoNotSupported = errors.New("nats: no echo option not supported by this server")
ErrClientIDNotSupported = errors.New("nats: client ID not supported by this server")
ErrUserButNoSigCB = errors.New("nats: user callback defined without a signature handler")
ErrNkeyButNoSigCB = errors.New("nats: nkey defined without a signature handler")
ErrNoUserCB = errors.New("nats: user callback not defined")
ErrNkeyAndUser = errors.New("nats: user callback and nkey defined")
ErrNkeysNotSupported = errors.New("nats: nkeys not supported by the server")
ErrStaleConnection = errors.New("nats: " + STALE_CONNECTION)
ErrTokenAlreadySet = errors.New("nats: token and token handler both set")
ErrMsgNotBound = errors.New("nats: message is not bound to subscription/connection")
ErrMsgNoReply = errors.New("nats: message does not have a reply")
ErrClientIPNotSupported = errors.New("nats: client IP not supported by this server")
ErrDisconnected = errors.New("nats: server is disconnected")
ErrHeadersNotSupported = errors.New("nats: headers not supported by this server")
ErrBadHeaderMsg = errors.New("nats: message could not decode headers")
ErrNoResponders = errors.New("nats: no responders available for request")
ErrNoContextOrTimeout = errors.New("nats: no context or timeout given")
ErrPullModeNotAllowed = errors.New("nats: pull based not supported")
ErrJetStreamBadPre = errors.New("nats: jetstream api prefix not valid")
ErrNoStreamResponse = errors.New("nats: no response from stream")
ErrNotJSMessage = errors.New("nats: not a jetstream message")
ErrInvalidStreamName = errors.New("nats: invalid stream name")
ErrInvalidConsumerName = errors.New("nats: invalid consumer name")
ErrNoMatchingStream = errors.New("nats: no stream matches subject")
ErrSubjectMismatch = errors.New("nats: subject does not match consumer")
ErrContextAndTimeout = errors.New("nats: context and timeout can not both be set")
ErrInvalidJSAck = errors.New("nats: invalid jetstream publish response")
ErrMultiStreamUnsupported = errors.New("nats: multiple streams are not supported")
ErrStreamConfigRequired = errors.New("nats: stream configuration is required")
ErrStreamNameRequired = errors.New("nats: stream name is required")
ErrStreamNotFound = errors.New("nats: stream not found")
ErrConsumerNotFound = errors.New("nats: consumer not found")
ErrConsumerNameAlreadyInUse = errors.New("nats: consumer name already in use")
ErrConsumerNameRequired = errors.New("nats: consumer name is required")
ErrConsumerConfigRequired = errors.New("nats: consumer configuration is required")
ErrStreamSnapshotConfigRequired = errors.New("nats: stream snapshot configuration is required")
ErrDeliverSubjectRequired = errors.New("nats: deliver subject is required")
ErrPullSubscribeToPushConsumer = errors.New("nats: cannot pull subscribe to push based consumer")
ErrPullSubscribeRequired = errors.New("nats: must use pull subscribe to bind to pull based consumer")
ErrMsgNotFound = errors.New("nats: message not found")
ErrMsgAlreadyAckd = errors.New("nats: message was already acknowledged")
ErrCantAckIfConsumerAckNone = errors.New("nats: cannot acknowledge a message for a consumer with AckNone policy")
ErrStreamInfoMaxSubjects = errors.New("nats: subject details would exceed maximum allowed")
ErrStreamNameAlreadyInUse = errors.New("nats: stream name already in use")
ErrMaxConnectionsExceeded = errors.New("nats: server maximum connections exceeded")
ErrBadRequest = errors.New("nats: bad request")
ErrConnectionNotTLS = errors.New("nats: connection is not tls")
)

var (
// ErrJetStreamNotEnabled is an error returned when JetStream is not enabled for an account.
ErrJetStreamNotEnabled JetStreamError = &jsError{apiErr: &JetStreamAPIError{ErrorCode: JSErrCodeJetStreamNotEnabled, Description: "jetstream not enabled"}}

// ErrJetStreamNotEnabledForAccount is an error returned when JetStream is not enabled for an account.
ErrJetStreamNotEnabledForAccount JetStreamError = &jsError{apiErr: &JetStreamAPIError{ErrorCode: JSErrCodeJetStreamNotEnabledForAccount, Description: "jetstream not enabled"}}

// ErrConsumerNotActive is an error returned when consumer is not active.
ErrConsumerNotActive JetStreamError = &jsError{message: "consumer not active"}
)

func init() {
Expand Down

0 comments on commit a2187aa

Please sign in to comment.