Skip to content

Commit

Permalink
Fixed SubscribeSync that could possibly share channel across 2 subs
Browse files Browse the repository at this point in the history
Also added missing subscription's lock when setting some fields
at the end of initialization.

Fixed some flappers and missing defers.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Apr 29, 2021
1 parent 5e87f99 commit 92272d3
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 2 deletions.
6 changes: 6 additions & 0 deletions js.go
Expand Up @@ -1054,6 +1054,10 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync

// Use the deliver subject from latest consumer config to attach.
if ccfg.DeliverSubject != _EMPTY_ {
// We can't reuse the channel, so if one was passed, we need to create a new one.
if ch != nil {
ch = make(chan *Msg, cap(ch))
}
sub, err = js.nc.subscribe(ccfg.DeliverSubject, queue, cb, ch, isSync,
&jsSub{js: js, hbs: hasHeartbeats, fc: hasFC})
if err != nil {
Expand All @@ -1069,11 +1073,13 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
consumer = info.Name
deliver = info.Config.DeliverSubject
}
sub.mu.Lock()
sub.jsi.stream = stream
sub.jsi.consumer = consumer
sub.jsi.durable = isDurable
sub.jsi.attached = attached
sub.jsi.deliver = deliver
sub.mu.Unlock()

return sub, nil
}
Expand Down
14 changes: 13 additions & 1 deletion test/js_test.go
Expand Up @@ -3465,6 +3465,7 @@ func TestJetStream_UnsubscribeCloseDrain(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer mc.Close()
jsm, err := mc.JetStream()
if err != nil {
t.Errorf("Unexpected error: %v", err)
Expand Down Expand Up @@ -3844,7 +3845,9 @@ func TestJetStreamSubscribe_RateLimit(t *testing.T) {
}

// Change rate limit.
recvd := make(chan *nats.Msg)
// Make the receive channel able to possibly hold ALL messages, but
// we expect it to hold less due to rate limiting.
recvd := make(chan *nats.Msg, totalMsgs)
duration := 2 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), duration)
defer cancel()
Expand Down Expand Up @@ -3954,6 +3957,7 @@ func setupJSClusterWithSize(t *testing.T, clusterName string, size int) []*jsSer
t.Error(err)
}
waitForJSReady(t, nc)
nc.Close()

return nodes
}
Expand Down Expand Up @@ -4006,6 +4010,7 @@ func withJSClusterAndStream(t *testing.T, clusterName string, size int, stream *
if err != nil {
t.Error(err)
}
defer nc.Close()

timeout := time.Now().Add(10 * time.Second)
for time.Now().Before(timeout) {
Expand Down Expand Up @@ -4084,6 +4089,7 @@ func TestJetStream_ClusterPlacement(t *testing.T) {
if err != nil {
t.Error(err)
}
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
Expand Down Expand Up @@ -4112,6 +4118,7 @@ func TestJetStream_ClusterPlacement(t *testing.T) {
if err != nil {
t.Error(err)
}
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
Expand Down Expand Up @@ -4141,6 +4148,7 @@ func TestJetStream_ClusterPlacement(t *testing.T) {
if err != nil {
t.Error(err)
}
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
Expand Down Expand Up @@ -4176,6 +4184,7 @@ func testJetStreamMirror_Source(t *testing.T, nodes ...*jsServer) {
if err != nil {
t.Error(err)
}
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
Expand Down Expand Up @@ -4631,6 +4640,7 @@ func testJetStream_ClusterMultipleSubscribe(t *testing.T, subject string, srvs .
if err != nil {
t.Fatal(err)
}
defer nc.Close()

var wg sync.WaitGroup
ctx, done := context.WithTimeout(context.Background(), 2*time.Second)
Expand Down Expand Up @@ -4711,6 +4721,7 @@ func testJetStream_ClusterMultipleQueueSubscribe(t *testing.T, subject string, s
if err != nil {
t.Fatal(err)
}
defer nc.Close()

var wg sync.WaitGroup
ctx, done := context.WithTimeout(context.Background(), 2*time.Second)
Expand Down Expand Up @@ -4792,6 +4803,7 @@ func testJetStream_ClusterMultiplePullSubscribe(t *testing.T, subject string, sr
if err != nil {
t.Fatal(err)
}
defer nc.Close()

var wg sync.WaitGroup
ctx, done := context.WithTimeout(context.Background(), 2*time.Second)
Expand Down
12 changes: 11 additions & 1 deletion ws_test.go
Expand Up @@ -570,6 +570,7 @@ func TestWSControlFrames(t *testing.T) {
nc, err := Connect(url,
ReconnectWait(50*time.Millisecond),
DisconnectErrHandler(func(_ *Conn, err error) { dch <- err }),
ReconnectHandler(func(_ *Conn) { rch <- true }),
)
if err != nil {
t.Fatalf("Error on connect: %v", err)
Expand Down Expand Up @@ -599,10 +600,19 @@ func TestWSControlFrames(t *testing.T) {
s = RunServerWithOptions(sopts)
defer s.Shutdown()

// Wait to reconnect
// Wait for both connections to reconnect
if err := Wait(rch); err != nil {
t.Fatalf("Should have reconnected: %v", err)
}
if err := Wait(rch); err != nil {
t.Fatalf("Should have reconnected: %v", err)
}
// Even if the first connection reconnects, there is no guarantee
// that the resend of SUB has yet been processed by the server.
// Doing a flush here will give us the guarantee.
if err := ncSub.Flush(); err != nil {
t.Fatalf("Error on flush: %v", err)
}

// Publish and close connection.
if err := nc.Publish("foo", []byte("msg")); err != nil {
Expand Down

0 comments on commit 92272d3

Please sign in to comment.