Skip to content

Commit

Permalink
Merge pull request #724 from nats-io/js_fix_shared_channel
Browse files Browse the repository at this point in the history
Fixed SubscribeSync that could possibly share channel across 2 subs
  • Loading branch information
kozlovic committed Apr 30, 2021
2 parents 793562c + 92272d3 commit 1667213
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 2 deletions.
6 changes: 6 additions & 0 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -1163,6 +1163,10 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync

// Use the deliver subject from latest consumer config to attach.
if ccfg.DeliverSubject != _EMPTY_ {
// We can't reuse the channel, so if one was passed, we need to create a new one.
if ch != nil {
ch = make(chan *Msg, cap(ch))
}
sub, err = js.nc.subscribe(ccfg.DeliverSubject, queue, cb, ch, isSync,
&jsSub{js: js, hbs: hasHeartbeats, fc: hasFC})
if err != nil {
Expand All @@ -1178,11 +1182,13 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
consumer = info.Name
deliver = info.Config.DeliverSubject
}
sub.mu.Lock()
sub.jsi.stream = stream
sub.jsi.consumer = consumer
sub.jsi.durable = isDurable
sub.jsi.attached = attached
sub.jsi.deliver = deliver
sub.mu.Unlock()

return sub, nil
}
Expand Down
14 changes: 13 additions & 1 deletion test/js_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3465,6 +3465,7 @@ func TestJetStream_UnsubscribeCloseDrain(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer mc.Close()
jsm, err := mc.JetStream()
if err != nil {
t.Errorf("Unexpected error: %v", err)
Expand Down Expand Up @@ -3844,7 +3845,9 @@ func TestJetStreamSubscribe_RateLimit(t *testing.T) {
}

// Change rate limit.
recvd := make(chan *nats.Msg)
// Make the receive channel able to possibly hold ALL messages, but
// we expect it to hold less due to rate limiting.
recvd := make(chan *nats.Msg, totalMsgs)
duration := 2 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), duration)
defer cancel()
Expand Down Expand Up @@ -3954,6 +3957,7 @@ func setupJSClusterWithSize(t *testing.T, clusterName string, size int) []*jsSer
t.Error(err)
}
waitForJSReady(t, nc)
nc.Close()

return nodes
}
Expand Down Expand Up @@ -4006,6 +4010,7 @@ func withJSClusterAndStream(t *testing.T, clusterName string, size int, stream *
if err != nil {
t.Error(err)
}
defer nc.Close()

timeout := time.Now().Add(10 * time.Second)
for time.Now().Before(timeout) {
Expand Down Expand Up @@ -4084,6 +4089,7 @@ func TestJetStream_ClusterPlacement(t *testing.T) {
if err != nil {
t.Error(err)
}
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
Expand Down Expand Up @@ -4112,6 +4118,7 @@ func TestJetStream_ClusterPlacement(t *testing.T) {
if err != nil {
t.Error(err)
}
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
Expand Down Expand Up @@ -4141,6 +4148,7 @@ func TestJetStream_ClusterPlacement(t *testing.T) {
if err != nil {
t.Error(err)
}
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
Expand Down Expand Up @@ -4176,6 +4184,7 @@ func testJetStreamMirror_Source(t *testing.T, nodes ...*jsServer) {
if err != nil {
t.Error(err)
}
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
Expand Down Expand Up @@ -4631,6 +4640,7 @@ func testJetStream_ClusterMultipleSubscribe(t *testing.T, subject string, srvs .
if err != nil {
t.Fatal(err)
}
defer nc.Close()

var wg sync.WaitGroup
ctx, done := context.WithTimeout(context.Background(), 2*time.Second)
Expand Down Expand Up @@ -4711,6 +4721,7 @@ func testJetStream_ClusterMultipleQueueSubscribe(t *testing.T, subject string, s
if err != nil {
t.Fatal(err)
}
defer nc.Close()

var wg sync.WaitGroup
ctx, done := context.WithTimeout(context.Background(), 2*time.Second)
Expand Down Expand Up @@ -4792,6 +4803,7 @@ func testJetStream_ClusterMultiplePullSubscribe(t *testing.T, subject string, sr
if err != nil {
t.Fatal(err)
}
defer nc.Close()

var wg sync.WaitGroup
ctx, done := context.WithTimeout(context.Background(), 2*time.Second)
Expand Down
12 changes: 11 additions & 1 deletion ws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,7 @@ func TestWSControlFrames(t *testing.T) {
nc, err := Connect(url,
ReconnectWait(50*time.Millisecond),
DisconnectErrHandler(func(_ *Conn, err error) { dch <- err }),
ReconnectHandler(func(_ *Conn) { rch <- true }),
)
if err != nil {
t.Fatalf("Error on connect: %v", err)
Expand Down Expand Up @@ -599,10 +600,19 @@ func TestWSControlFrames(t *testing.T) {
s = RunServerWithOptions(sopts)
defer s.Shutdown()

// Wait to reconnect
// Wait for both connections to reconnect
if err := Wait(rch); err != nil {
t.Fatalf("Should have reconnected: %v", err)
}
if err := Wait(rch); err != nil {
t.Fatalf("Should have reconnected: %v", err)
}
// Even if the first connection reconnects, there is no guarantee
// that the resend of SUB has yet been processed by the server.
// Doing a flush here will give us the guarantee.
if err := ncSub.Flush(); err != nil {
t.Fatalf("Error on flush: %v", err)
}

// Publish and close connection.
if err := nc.Publish("foo", []byte("msg")); err != nil {
Expand Down

0 comments on commit 1667213

Please sign in to comment.