diff --git a/js.go b/js.go index 1a23b49ad..21faa29d1 100644 --- a/js.go +++ b/js.go @@ -899,8 +899,14 @@ func (sub *Subscription) Poll() error { } func (js *js) getConsumerInfo(stream, consumer string) (*ConsumerInfo, error) { + ctx, cancel := context.WithTimeout(context.Background(), js.opts.wait) + defer cancel() + return js.getConsumerInfoContext(ctx, stream, consumer) +} + +func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer string) (*ConsumerInfo, error) { ccInfoSubj := fmt.Sprintf(apiConsumerInfoT, stream, consumer) - resp, err := js.nc.Request(js.apiSubj(ccInfoSubj), nil, js.opts.wait) + resp, err := js.nc.RequestWithContext(ctx, js.apiSubj(ccInfoSubj), nil) if err != nil { if err == ErrNoResponders { err = ErrJetStreamNotEnabled diff --git a/jsm.go b/jsm.go index dee6fc4af..7fbaed714 100644 --- a/jsm.go +++ b/jsm.go @@ -26,49 +26,49 @@ import ( // JetStreamManager is the public interface for managing JetStream streams & consumers. type JetStreamManager interface { // AddStream creates a stream. - AddStream(cfg *StreamConfig) (*StreamInfo, error) + AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) // UpdateStream updates a stream. - UpdateStream(cfg *StreamConfig) (*StreamInfo, error) + UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) // DeleteStream deletes a stream. - DeleteStream(name string) error + DeleteStream(name string, opts ...JSOpt) error // StreamInfo retrieves information from a stream. - StreamInfo(stream string) (*StreamInfo, error) + StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) // PurgeStream purges a stream messages. - PurgeStream(name string) error + PurgeStream(name string, opts ...JSOpt) error // StreamsInfo can be used to retrieve a list of StreamInfo objects. StreamsInfo(opts ...JSOpt) <-chan *StreamInfo // StreamNames is used to retrieve a list of Stream names. - StreamNames(ctx context.Context) <-chan string + StreamNames(opts ...JSOpt) <-chan string // GetMsg retrieves a raw stream message stored in JetStream by sequence number. - GetMsg(name string, seq uint64) (*RawStreamMsg, error) + GetMsg(name string, seq uint64, opts ...JSOpt) (*RawStreamMsg, error) // DeleteMsg erases a message from a stream. - DeleteMsg(name string, seq uint64) error + DeleteMsg(name string, seq uint64, opts ...JSOpt) error // AddConsumer adds a consumer to a stream. - AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, error) + AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) // DeleteConsumer deletes a consumer. - DeleteConsumer(stream, consumer string) error + DeleteConsumer(stream, consumer string, opts ...JSOpt) error // ConsumerInfo retrieves information of a consumer from a stream. - ConsumerInfo(stream, name string) (*ConsumerInfo, error) + ConsumerInfo(stream, name string, opts ...JSOpt) (*ConsumerInfo, error) // ConsumersInfo is used to retrieve a list of ConsumerInfo objects. ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo // ConsumerNames is used to retrieve a list of Consumer names. - ConsumerNames(ctx context.Context, stream string) <-chan string + ConsumerNames(stream string, opts ...JSOpt) <-chan string // AccountInfo retrieves info about the JetStream usage from an account. - AccountInfo() (*AccountInfo, error) + AccountInfo(opts ...JSOpt) (*AccountInfo, error) } // StreamConfig will determine the properties for a stream. @@ -171,8 +171,16 @@ type accountInfoResponse struct { } // AccountInfo retrieves info about the JetStream usage from the current account. -func (js *js) AccountInfo() (*AccountInfo, error) { - resp, err := js.nc.Request(js.apiSubj(apiAccountInfo), nil, js.opts.wait) +func (js *js) AccountInfo(opts ...JSOpt) (*AccountInfo, error) { + o, cancel, err := getJSContextOpts(js.opts, opts...) + if err != nil { + return nil, err + } + if cancel != nil { + defer cancel() + } + + resp, err := js.nc.RequestWithContext(o.ctx, js.apiSubj(apiAccountInfo), nil) if err != nil { return nil, err } @@ -204,7 +212,15 @@ type consumerResponse struct { } // AddConsumer will add a JetStream consumer. -func (js *js) AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, error) { +func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) { + o, cancel, err := getJSContextOpts(js.opts, opts...) + if err != nil { + return nil, err + } + if cancel != nil { + defer cancel() + } + if stream == _EMPTY_ { return nil, ErrStreamNameRequired } @@ -223,7 +239,7 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, er ccSubj = fmt.Sprintf(apiConsumerCreateT, stream) } - resp, err := js.nc.Request(js.apiSubj(ccSubj), req, js.opts.wait) + resp, err := js.nc.RequestWithContext(o.ctx, js.apiSubj(ccSubj), req) if err != nil { if err == ErrNoResponders { err = ErrJetStreamNotEnabled @@ -248,13 +264,21 @@ type consumerDeleteResponse struct { } // DeleteConsumer deletes a Consumer. -func (js *js) DeleteConsumer(stream, consumer string) error { +func (js *js) DeleteConsumer(stream, consumer string, opts ...JSOpt) error { + o, cancel, err := getJSContextOpts(js.opts, opts...) + if err != nil { + return err + } + if cancel != nil { + defer cancel() + } + if stream == _EMPTY_ { return ErrStreamNameRequired } dcSubj := js.apiSubj(fmt.Sprintf(apiConsumerDeleteT, stream, consumer)) - r, err := js.nc.Request(dcSubj, nil, js.opts.wait) + r, err := js.nc.RequestWithContext(o.ctx, dcSubj, nil) if err != nil { return err } @@ -269,8 +293,15 @@ func (js *js) DeleteConsumer(stream, consumer string) error { } // ConsumerInfo returns information about a Consumer. -func (js *js) ConsumerInfo(stream, consumer string) (*ConsumerInfo, error) { - return js.getConsumerInfo(stream, consumer) +func (js *js) ConsumerInfo(stream, consumer string, opts ...JSOpt) (*ConsumerInfo, error) { + o, cancel, err := getJSContextOpts(js.opts, opts...) + if err != nil { + return nil, err + } + if cancel != nil { + defer cancel() + } + return js.getConsumerInfoContext(o.ctx, stream, consumer) } // consumerLister fetches pages of ConsumerInfo objects. This object is not @@ -367,11 +398,9 @@ func (jsc *js) ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo ch := make(chan *ConsumerInfo) l := &consumerLister{js: &js{nc: jsc.nc, opts: o}, stream: stream} go func() { - defer func() { - if cancel != nil { - cancel() - } - }() + if cancel != nil { + defer cancel() + } defer close(ch) for l.Next() { for _, info := range l.Page() { @@ -457,26 +486,24 @@ func (c *consumerNamesLister) Err() error { } // ConsumerNames is used to retrieve a list of Consumer names. -func (js *js) ConsumerNames(ctx context.Context, stream string) <-chan string { - var cancel context.CancelFunc - if ctx == nil { - ctx, cancel = context.WithTimeout(context.Background(), js.opts.wait) +func (jsc *js) ConsumerNames(stream string, opts ...JSOpt) <-chan string { + o, cancel, err := getJSContextOpts(jsc.opts, opts...) + if err != nil { + return nil } ch := make(chan string) - l := &consumerNamesLister{stream: stream, js: js} + l := &consumerNamesLister{stream: stream, js: &js{nc: jsc.nc, opts: o}} go func() { - defer func() { - if cancel != nil { - cancel() - } - }() + if cancel != nil { + defer cancel() + } defer close(ch) for l.Next() { for _, info := range l.Page() { select { case ch <- info: - case <-ctx.Done(): + case <-o.ctx.Done(): return } } @@ -492,7 +519,15 @@ type streamCreateResponse struct { *StreamInfo } -func (js *js) AddStream(cfg *StreamConfig) (*StreamInfo, error) { +func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) { + o, cancel, err := getJSContextOpts(js.opts, opts...) + if err != nil { + return nil, err + } + if cancel != nil { + defer cancel() + } + if cfg == nil || cfg.Name == _EMPTY_ { return nil, ErrStreamNameRequired } @@ -503,7 +538,7 @@ func (js *js) AddStream(cfg *StreamConfig) (*StreamInfo, error) { } csSubj := js.apiSubj(fmt.Sprintf(apiStreamCreateT, cfg.Name)) - r, err := js.nc.Request(csSubj, req, js.opts.wait) + r, err := js.nc.RequestWithContext(o.ctx, csSubj, req) if err != nil { return nil, err } @@ -519,9 +554,17 @@ func (js *js) AddStream(cfg *StreamConfig) (*StreamInfo, error) { type streamInfoResponse = streamCreateResponse -func (js *js) StreamInfo(stream string) (*StreamInfo, error) { +func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) { + o, cancel, err := getJSContextOpts(js.opts, opts...) + if err != nil { + return nil, err + } + if cancel != nil { + defer cancel() + } + csSubj := js.apiSubj(fmt.Sprintf(apiStreamInfoT, stream)) - r, err := js.nc.Request(csSubj, nil, js.opts.wait) + r, err := js.nc.RequestWithContext(o.ctx, csSubj, nil) if err != nil { return nil, err } @@ -582,7 +625,15 @@ type PeerInfo struct { } // UpdateStream updates a Stream. -func (js *js) UpdateStream(cfg *StreamConfig) (*StreamInfo, error) { +func (js *js) UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) { + o, cancel, err := getJSContextOpts(js.opts, opts...) + if err != nil { + return nil, err + } + if cancel != nil { + defer cancel() + } + if cfg == nil || cfg.Name == _EMPTY_ { return nil, ErrStreamNameRequired } @@ -593,7 +644,7 @@ func (js *js) UpdateStream(cfg *StreamConfig) (*StreamInfo, error) { } usSubj := js.apiSubj(fmt.Sprintf(apiStreamUpdateT, cfg.Name)) - r, err := js.nc.Request(usSubj, req, js.opts.wait) + r, err := js.nc.RequestWithContext(o.ctx, usSubj, req) if err != nil { return nil, err } @@ -614,13 +665,21 @@ type streamDeleteResponse struct { } // DeleteStream deletes a Stream. -func (js *js) DeleteStream(name string) error { +func (js *js) DeleteStream(name string, opts ...JSOpt) error { + o, cancel, err := getJSContextOpts(js.opts, opts...) + if err != nil { + return err + } + if cancel != nil { + defer cancel() + } + if name == _EMPTY_ { return ErrStreamNameRequired } dsSubj := js.apiSubj(fmt.Sprintf(apiStreamDeleteT, name)) - r, err := js.nc.Request(dsSubj, nil, js.opts.wait) + r, err := js.nc.RequestWithContext(o.ctx, dsSubj, nil) if err != nil { return err } @@ -664,7 +723,15 @@ type apiMsgGetResponse struct { } // GetMsg retrieves a raw stream message stored in JetStream by sequence number. -func (js *js) GetMsg(name string, seq uint64) (*RawStreamMsg, error) { +func (js *js) GetMsg(name string, seq uint64, opts ...JSOpt) (*RawStreamMsg, error) { + o, cancel, err := getJSContextOpts(js.opts, opts...) + if err != nil { + return nil, err + } + if cancel != nil { + defer cancel() + } + if name == _EMPTY_ { return nil, ErrStreamNameRequired } @@ -675,7 +742,7 @@ func (js *js) GetMsg(name string, seq uint64) (*RawStreamMsg, error) { } dsSubj := js.apiSubj(fmt.Sprintf(apiMsgGetT, name)) - r, err := js.nc.Request(dsSubj, req, js.opts.wait) + r, err := js.nc.RequestWithContext(o.ctx, dsSubj, req) if err != nil { return nil, err } @@ -718,7 +785,15 @@ type msgDeleteResponse struct { } // DeleteMsg deletes a message from a stream. -func (js *js) DeleteMsg(name string, seq uint64) error { +func (js *js) DeleteMsg(name string, seq uint64, opts ...JSOpt) error { + o, cancel, err := getJSContextOpts(js.opts, opts...) + if err != nil { + return err + } + if cancel != nil { + defer cancel() + } + if name == _EMPTY_ { return ErrStreamNameRequired } @@ -729,7 +804,7 @@ func (js *js) DeleteMsg(name string, seq uint64) error { } dsSubj := js.apiSubj(fmt.Sprintf(apiMsgDeleteT, name)) - r, err := js.nc.Request(dsSubj, req, js.opts.wait) + r, err := js.nc.RequestWithContext(o.ctx, dsSubj, req) if err != nil { return err } @@ -750,9 +825,17 @@ type streamPurgeResponse struct { } // PurgeStream purges messages on a Stream. -func (js *js) PurgeStream(name string) error { +func (js *js) PurgeStream(name string, opts ...JSOpt) error { + o, cancel, err := getJSContextOpts(js.opts, opts...) + if err != nil { + return err + } + if cancel != nil { + defer cancel() + } + psSubj := js.apiSubj(fmt.Sprintf(apiStreamPurgeT, name)) - r, err := js.nc.Request(psSubj, nil, js.opts.wait) + r, err := js.nc.RequestWithContext(o.ctx, psSubj, nil) if err != nil { return err } @@ -858,11 +941,9 @@ func (jsc *js) StreamsInfo(opts ...JSOpt) <-chan *StreamInfo { ch := make(chan *StreamInfo) l := &streamLister{js: &js{nc: jsc.nc, opts: o}} go func() { - defer func() { - if cancel != nil { - cancel() - } - }() + if cancel != nil { + defer cancel() + } defer close(ch) for l.Next() { for _, info := range l.Page() { @@ -935,26 +1016,24 @@ func (l *streamNamesLister) Err() error { } // StreamNames is used to retrieve a list of Stream names. -func (js *js) StreamNames(ctx context.Context) <-chan string { - var cancel context.CancelFunc - if ctx == nil { - ctx, cancel = context.WithTimeout(context.Background(), js.opts.wait) +func (jsc *js) StreamNames(opts ...JSOpt) <-chan string { + o, cancel, err := getJSContextOpts(jsc.opts, opts...) + if err != nil { + return nil } ch := make(chan string) - l := &streamNamesLister{js: js} + l := &streamNamesLister{js: &js{nc: jsc.nc, opts: o}} go func() { - defer func() { - if cancel != nil { - cancel() - } - }() + if cancel != nil { + defer cancel() + } defer close(ch) for l.Next() { for _, info := range l.Page() { select { case ch <- info: - case <-ctx.Done(): + case <-o.ctx.Done(): return } } diff --git a/test/js_test.go b/test/js_test.go index c7d2b8644..2eda34870 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -1130,7 +1130,7 @@ func TestJetStreamManagement(t *testing.T) { var names []string ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - for name := range js.ConsumerNames(ctx, "foo") { + for name := range js.ConsumerNames("foo", nats.Context(ctx)) { names = append(names, name) } if got, want := len(names), 1; got != want { @@ -1162,7 +1162,7 @@ func TestJetStreamManagement(t *testing.T) { var names []string ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - for name := range js.StreamNames(ctx) { + for name := range js.StreamNames(nats.Context(ctx)) { names = append(names, name) } if got, want := len(names), 1; got != want {