diff --git a/js.go b/js.go index 539b55e69..ffe8b5e1e 100644 --- a/js.go +++ b/js.go @@ -125,24 +125,51 @@ type JetStream interface { PublishAsyncComplete() <-chan struct{} // Subscribe creates an async Subscription for JetStream. + // The stream and consumer names can be provided with the nats.Bind() option. + // For creating an ephemeral (where the consumer name is picked by the server), + // you can provide the stream name with nats.BindStream(). + // If no stream name is specified, the library will attempt to figure out which + // stream the subscription is for. See important notes below for more details. + // + // IMPORTANT NOTES: + // * If none of the options Bind() nor Durable() are specified, the library will + // send a request to the server to create an ephemeral JetStream consumer, + // which will be deleted after an Unsubscribe() or Drain(), or automatically + // by the server after a short period of time after the NATS subscription is + // gone. + // * If Durable() option is specified, the library will attempt to lookup a JetStream + // consumer with this name, and if found, will bind to it and not attempt to + // delete it. However, if not found, the library will send a request to create + // such durable JetStream consumer. The library will delete the JetStream consumer + // after an Unsubscribe() or Drain(). + // * If Bind() option is provided, the library will attempt to lookup the + // consumer with the given name, and if successful, bind to it. If the lookup fails, + // then the Subscribe() call will return an error. Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) // SubscribeSync creates a Subscription that can be used to process messages synchronously. + // See important note in Subscribe() SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error) // ChanSubscribe creates channel based Subscription. + // See important note in Subscribe() ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) // ChanQueueSubscribe creates channel based Subscription with a queue group. + // See important note in QueueSubscribe() ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) // QueueSubscribe creates a Subscription with a queue group. + // If no optional durable name nor binding options are specified, the queue name will be used as a durable name. + // See important note in Subscribe() QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) // QueueSubscribeSync creates a Subscription with a queue group that can be used to process messages synchronously. + // See important note in QueueSubscribe() QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error) // PullSubscribe creates a Subscription that can fetch messages. + // See important note in Subscribe() PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error) } @@ -900,8 +927,7 @@ func (opt subOptFn) configureSubscribe(opts *subOpts) error { return opt(opts) } -// Subscribe will create a subscription to the appropriate stream and consumer. -// +// Subscribe creates an async Subscription for JetStream. // The stream and consumer names can be provided with the nats.Bind() option. // For creating an ephemeral (where the consumer name is picked by the server), // you can provide the stream name with nats.BindStream(). @@ -909,19 +935,19 @@ func (opt subOptFn) configureSubscribe(opts *subOpts) error { // stream the subscription is for. See important notes below for more details. // // IMPORTANT NOTES: -// * If Bind() and Durable() options are not specified, the library will +// * If none of the options Bind() nor Durable() are specified, the library will // send a request to the server to create an ephemeral JetStream consumer, // which will be deleted after an Unsubscribe() or Drain(), or automatically // by the server after a short period of time after the NATS subscription is // gone. -// * If Durable() only is specified, the library will attempt to lookup a JetStream -// consumer with this name and if found, will bind to it and not attempt to +// * If Durable() option is specified, the library will attempt to lookup a JetStream +// consumer with this name, and if found, will bind to it and not attempt to // delete it. However, if not found, the library will send a request to create -// such durable JetStream consumer, but will still attempt to delete it after -// an Unsubscribe() or Drain(). +// such durable JetStream consumer. The library will delete the JetStream consumer +// after an Unsubscribe() or Drain(). // * If Bind() option is provided, the library will attempt to lookup the -// consumer with the given name, and if the lookup fails, then the Subscribe() -// call will return an error. +// consumer with the given name, and if successful, bind to it. If the lookup fails, +// then the Subscribe() call will return an error. func (js *js) Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) { if cb == nil { return nil, ErrBadSubscription @@ -929,15 +955,15 @@ func (js *js) Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscripti return js.subscribe(subj, _EMPTY_, cb, nil, false, false, opts) } -// SubscribeSync will create a sync subscription to the appropriate stream and consumer. +// SubscribeSync creates a Subscription that can be used to process messages synchronously. // See important note in Subscribe() func (js *js) SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error) { mch := make(chan *Msg, js.nc.Opts.SubChanLen) return js.subscribe(subj, _EMPTY_, nil, mch, true, false, opts) } -// QueueSubscribe will create a subscription to the appropriate stream and consumer with queue semantics. -// If not optional durable name or binding option is specified, the queue name will be used as a durable name. +// QueueSubscribe creates a Subscription with a queue group. +// If no optional durable name nor binding options are specified, the queue name will be used as a durable name. // See important note in Subscribe() func (js *js) QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) { if cb == nil { @@ -946,28 +972,27 @@ func (js *js) QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) return js.subscribe(subj, queue, cb, nil, false, false, opts) } -// QueueSubscribeSync will create a sync subscription to the appropriate stream and consumer with queue semantics. -// If not optional durable name or binding option is specified, the queue name will be used as a durable name. -// See important note in Subscribe() +// QueueSubscribeSync creates a Subscription with a queue group that can be used to process messages synchronously. +// See important note in QueueSubscribe() func (js *js) QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error) { mch := make(chan *Msg, js.nc.Opts.SubChanLen) return js.subscribe(subj, queue, nil, mch, true, false, opts) } -// ChanSubscribe will create a subscription to the appropriate stream and consumer using a channel. +// ChanSubscribe creates channel based Subscription. // See important note in Subscribe() func (js *js) ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) { return js.subscribe(subj, _EMPTY_, nil, ch, false, false, opts) } -// ChanQueueSubscribe will create a subscription to the appropriate stream and consumer using a channel. -// If not optional durable name or binding option is specified, the queue name will be used as a durable name. -// See important note in Subscribe() +// ChanQueueSubscribe creates channel based Subscription with a queue group. +// See important note in QueueSubscribe() func (js *js) ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) { return js.subscribe(subj, queue, nil, ch, false, false, opts) } -// PullSubscribe creates a pull subscriber. +// PullSubscribe creates a Subscription that can fetch messages. +// See important note in Subscribe() func (js *js) PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error) { mch := make(chan *Msg, js.nc.Opts.SubChanLen) return js.subscribe(subj, _EMPTY_, nil, mch, true, true, append(opts, Durable(durable)))