Skip to content

Commit

Permalink
Merge pull request #693 from nats-io/js-chan-subscribe
Browse files Browse the repository at this point in the history
js: Fix ChanSubscribe acting as SubscribeSync
  • Loading branch information
wallyqs committed Mar 30, 2021
2 parents 882e98e + bd0fb9d commit c0e12e4
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 14 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.15

require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.2.1-0.20210327180151-03aee09847d0
github.com/nats-io/nats-server/v2 v2.2.1-0.20210330155036-61cbd74e213d
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ github.com/nats-io/nats-server/v2 v2.1.8-0.20200929001935-7f44d075f7ad/go.mod h1
github.com/nats-io/nats-server/v2 v2.1.8-0.20201129161730-ebe63db3e3ed/go.mod h1:XD0zHR/jTXdZvWaQfS5mQgsXj6x12kMjKLyAk/cOGgY=
github.com/nats-io/nats-server/v2 v2.1.8-0.20210205154825-f7ab27f7dad4/go.mod h1:kauGd7hB5517KeSqspW2U1Mz/jhPbTrE8eOXzUPk1m0=
github.com/nats-io/nats-server/v2 v2.1.8-0.20210227190344-51550e242af8/go.mod h1:/QQ/dpqFavkNhVnjvMILSQ3cj5hlmhB66adlgNbjuoA=
github.com/nats-io/nats-server/v2 v2.2.1-0.20210327180151-03aee09847d0 h1:ybeT5VFA73CVQb4rCL+48+up91xWheriSBbJ3M2Pzps=
github.com/nats-io/nats-server/v2 v2.2.1-0.20210327180151-03aee09847d0/go.mod h1:eKlAaGmSQHZMFQA6x56AaP5/Bl9N3mWF4awyT2TTpzc=
github.com/nats-io/nats-server/v2 v2.2.1-0.20210330155036-61cbd74e213d h1:Fi5DT3pdyqP280FPGdkQD+bDjfpR5orUhZ2hhVEU/JA=
github.com/nats-io/nats-server/v2 v2.2.1-0.20210330155036-61cbd74e213d/go.mod h1:eKlAaGmSQHZMFQA6x56AaP5/Bl9N3mWF4awyT2TTpzc=
github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE=
github.com/nats-io/nats.go v1.10.1-0.20200531124210-96f2130e4d55/go.mod h1:ARiFsjW9DVxk48WJbO3OSZ2DG8fjkMi7ecLmXoY/n9I=
github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a/go.mod h1:8eAIv96Mo9QW6Or40jUHejS7e4VwZ3VRYD6Sf0BTDp4=
Expand Down
16 changes: 8 additions & 8 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -814,40 +814,40 @@ func (js *js) Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscripti
if cb == nil {
return nil, ErrBadSubscription
}
return js.subscribe(subj, _EMPTY_, cb, nil, opts)
return js.subscribe(subj, _EMPTY_, cb, nil, false, opts)
}

// SubscribeSync will create a sync subscription to the appropriate stream and consumer.
func (js *js) SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error) {
mch := make(chan *Msg, js.nc.Opts.SubChanLen)
return js.subscribe(subj, _EMPTY_, nil, mch, opts)
return js.subscribe(subj, _EMPTY_, nil, mch, true, opts)
}

// QueueSubscribe will create a subscription to the appropriate stream and consumer with queue semantics.
func (js *js) QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) {
if cb == nil {
return nil, ErrBadSubscription
}
return js.subscribe(subj, queue, cb, nil, opts)
return js.subscribe(subj, queue, cb, nil, false, opts)
}

// QueueSubscribeSync will create a sync subscription to the appropriate stream and consumer with queue semantics.
func (js *js) QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error) {
mch := make(chan *Msg, js.nc.Opts.SubChanLen)
return js.subscribe(subj, queue, nil, mch, opts)
return js.subscribe(subj, queue, nil, mch, true, opts)
}

// Subscribe will create a subscription to the appropriate stream and consumer.
func (js *js) ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) {
return js.subscribe(subj, _EMPTY_, nil, ch, opts)
return js.subscribe(subj, _EMPTY_, nil, ch, false, opts)
}

// PullSubscribe creates a pull subscriber.
func (js *js) PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error) {
return js.subscribe(subj, _EMPTY_, nil, nil, append(opts, Durable(durable)))
return js.subscribe(subj, _EMPTY_, nil, nil, false, append(opts, Durable(durable)))
}

func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []SubOpt) (*Subscription, error) {
func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync bool, opts []SubOpt) (*Subscription, error) {
cfg := ConsumerConfig{AckPolicy: ackPolicyNotSet}
o := subOpts{cfg: &cfg}
if len(opts) > 0 {
Expand Down Expand Up @@ -932,7 +932,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []
if isPullMode {
sub = &Subscription{Subject: subj, conn: js.nc, typ: PullSubscription, jsi: &jsSub{js: js, pull: true}}
} else {
sub, err = js.nc.subscribe(deliver, queue, cb, ch, cb == nil, &jsSub{js: js})
sub, err = js.nc.subscribe(deliver, queue, cb, ch, isSync, &jsSub{js: js})
if err != nil {
return nil, err
}
Expand Down
5 changes: 2 additions & 3 deletions test/js_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1876,7 +1876,7 @@ func TestJetStreamAutoMaxAckPending(t *testing.T) {
defer os.RemoveAll(config.StoreDir)
}

nc, err := nats.Connect(s.ClientURL())
nc, err := nats.Connect(s.ClientURL(), nats.SyncQueueLen(500))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand All @@ -1902,8 +1902,7 @@ func TestJetStreamAutoMaxAckPending(t *testing.T) {
nc.Flush()

// Create a consumer.
msgs := make(chan *nats.Msg, 500)
sub, err := js.ChanSubscribe("foo", msgs)
sub, err := js.SubscribeSync("foo")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down

0 comments on commit c0e12e4

Please sign in to comment.