From c91c1043a065c35601436e77609223e245d549bb Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Fri, 13 Aug 2021 10:52:40 -0600 Subject: [PATCH] Fixed issue where JS would change subscription subject This would cause issues during a reconnect Signed-off-by: Ivan Kozlovic --- js.go | 10 ++----- js_test.go | 86 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ nats.go | 15 ++++++++-- 3 files changed, 100 insertions(+), 11 deletions(-) diff --git a/js.go b/js.go index 6f843bef0..4309740f5 100644 --- a/js.go +++ b/js.go @@ -818,6 +818,7 @@ type jsSub struct { // For pull subscribers, this is the next message subject to send requests to. nms string + psubj string // the subject that was passed by user to the subscribe calls consumer string stream string deliver string @@ -1114,13 +1115,10 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, dseq: 1, pull: isPullMode, nms: nms, + psubj: subj, } sub, err = nc.subscribe(deliver, queue, cb, ch, isSync, jsi) - // Since JetStream sends on different subject, make sure this reflects the user's intentions. - sub.mu.Lock() - sub.Subject = subj - sub.mu.Unlock() if err != nil { return nil, err } @@ -1211,10 +1209,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, if err != nil { return nil, err } - // Since JetStream sends on different subject, make sure this reflects the user's intentions. - sub.mu.Lock() - sub.Subject = subj - sub.mu.Unlock() } } else { if cinfo.Error.Code == 404 { diff --git a/js_test.go b/js_test.go index 7ffdd283f..bad65cc62 100644 --- a/js_test.go +++ b/js_test.go @@ -409,3 +409,89 @@ func TestJetStreamConcurrentDurablePushConsumers(t *testing.T) { } } } + +func TestJetStreamSubscribeReconnect(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + rch := make(chan struct{}, 1) + nc, err := Connect(s.ClientURL(), + ReconnectWait(50*time.Millisecond), + ReconnectHandler(func(_ *Conn) { + select { + case rch <- struct{}{}: + default: + } + })) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + js, err := nc.JetStream(MaxWait(250 * time.Millisecond)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Create the stream using our client API. + _, err = js.AddStream(&StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + sub, err := js.SubscribeSync("foo", Durable("bar")) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + + sendAndReceive := func(msgContent string) { + t.Helper() + var ok bool + var err error + for i := 0; i < 5; i++ { + if _, err = js.Publish("foo", []byte(msgContent)); err != nil { + time.Sleep(250 * time.Millisecond) + continue + } + ok = true + break + } + if !ok { + t.Fatalf("Error on publish: %v", err) + } + msg, err := sub.NextMsg(time.Second) + if err != nil { + t.Fatal("Did not get message") + } + if string(msg.Data) != msgContent { + t.Fatalf("Unexpected content: %q", msg.Data) + } + if err := msg.AckSync(); err != nil { + t.Fatalf("Error on ack: %v", err) + } + } + + sendAndReceive("msg1") + + // Cause a disconnect... + nc.mu.Lock() + nc.conn.Close() + nc.mu.Unlock() + + // Wait for reconnect + select { + case <-rch: + case <-time.After(time.Second): + t.Fatal("Did not reconnect") + } + + // Make sure we can send and receive the msg + sendAndReceive("msg2") +} diff --git a/nats.go b/nats.go index e7b456150..093a11de3 100644 --- a/nats.go +++ b/nats.go @@ -1270,7 +1270,15 @@ func defaultErrHandler(nc *Conn, sub *Subscription, err error) { } var errStr string if sub != nil { - errStr = fmt.Sprintf("%s on connection [%d] for subscription on %q\n", err.Error(), cid, sub.Subject) + var subject string + sub.mu.Lock() + if sub.jsi != nil { + subject = sub.jsi.psubj + } else { + subject = sub.Subject + } + sub.mu.Unlock() + errStr = fmt.Sprintf("%s on connection [%d] for subscription on %q\n", err.Error(), cid, subject) } else { errStr = fmt.Sprintf("%s on connection [%d]\n", err.Error(), cid) } @@ -4451,12 +4459,13 @@ func (nc *Conn) resendSubscriptions() { continue } } + subj, queue, sid := s.Subject, s.Queue, s.sid s.mu.Unlock() - nc.bw.writeDirect(fmt.Sprintf(subProto, s.Subject, s.Queue, s.sid)) + nc.bw.writeDirect(fmt.Sprintf(subProto, subj, queue, sid)) if adjustedMax > 0 { maxStr := strconv.Itoa(int(adjustedMax)) - nc.bw.writeDirect(fmt.Sprintf(unsubProto, s.sid, maxStr)) + nc.bw.writeDirect(fmt.Sprintf(unsubProto, sid, maxStr)) } } }