Skip to content

Commit

Permalink
Merge pull request #793 from nats-io/fix_subject_rewrite
Browse files Browse the repository at this point in the history
Fixed issue where JS would change subscription subject
  • Loading branch information
kozlovic committed Aug 13, 2021
2 parents d1955c8 + c91c104 commit 9c00d13
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 11 deletions.
10 changes: 2 additions & 8 deletions js.go
Expand Up @@ -818,6 +818,7 @@ type jsSub struct {
// For pull subscribers, this is the next message subject to send requests to.
nms string

psubj string // the subject that was passed by user to the subscribe calls
consumer string
stream string
deliver string
Expand Down Expand Up @@ -1114,13 +1115,10 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
dseq: 1,
pull: isPullMode,
nms: nms,
psubj: subj,
}

sub, err = nc.subscribe(deliver, queue, cb, ch, isSync, jsi)
// Since JetStream sends on different subject, make sure this reflects the user's intentions.
sub.mu.Lock()
sub.Subject = subj
sub.mu.Unlock()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1211,10 +1209,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
if err != nil {
return nil, err
}
// Since JetStream sends on different subject, make sure this reflects the user's intentions.
sub.mu.Lock()
sub.Subject = subj
sub.mu.Unlock()
}
} else {
if cinfo.Error.Code == 404 {
Expand Down
86 changes: 86 additions & 0 deletions js_test.go
Expand Up @@ -409,3 +409,89 @@ func TestJetStreamConcurrentDurablePushConsumers(t *testing.T) {
}
}
}

func TestJetStreamSubscribeReconnect(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}

rch := make(chan struct{}, 1)
nc, err := Connect(s.ClientURL(),
ReconnectWait(50*time.Millisecond),
ReconnectHandler(func(_ *Conn) {
select {
case rch <- struct{}{}:
default:
}
}))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

js, err := nc.JetStream(MaxWait(250 * time.Millisecond))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Create the stream using our client API.
_, err = js.AddStream(&StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

sub, err := js.SubscribeSync("foo", Durable("bar"))
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

sendAndReceive := func(msgContent string) {
t.Helper()
var ok bool
var err error
for i := 0; i < 5; i++ {
if _, err = js.Publish("foo", []byte(msgContent)); err != nil {
time.Sleep(250 * time.Millisecond)
continue
}
ok = true
break
}
if !ok {
t.Fatalf("Error on publish: %v", err)
}
msg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatal("Did not get message")
}
if string(msg.Data) != msgContent {
t.Fatalf("Unexpected content: %q", msg.Data)
}
if err := msg.AckSync(); err != nil {
t.Fatalf("Error on ack: %v", err)
}
}

sendAndReceive("msg1")

// Cause a disconnect...
nc.mu.Lock()
nc.conn.Close()
nc.mu.Unlock()

// Wait for reconnect
select {
case <-rch:
case <-time.After(time.Second):
t.Fatal("Did not reconnect")
}

// Make sure we can send and receive the msg
sendAndReceive("msg2")
}
15 changes: 12 additions & 3 deletions nats.go
Expand Up @@ -1270,7 +1270,15 @@ func defaultErrHandler(nc *Conn, sub *Subscription, err error) {
}
var errStr string
if sub != nil {
errStr = fmt.Sprintf("%s on connection [%d] for subscription on %q\n", err.Error(), cid, sub.Subject)
var subject string
sub.mu.Lock()
if sub.jsi != nil {
subject = sub.jsi.psubj
} else {
subject = sub.Subject
}
sub.mu.Unlock()
errStr = fmt.Sprintf("%s on connection [%d] for subscription on %q\n", err.Error(), cid, subject)
} else {
errStr = fmt.Sprintf("%s on connection [%d]\n", err.Error(), cid)
}
Expand Down Expand Up @@ -4451,12 +4459,13 @@ func (nc *Conn) resendSubscriptions() {
continue
}
}
subj, queue, sid := s.Subject, s.Queue, s.sid
s.mu.Unlock()

nc.bw.writeDirect(fmt.Sprintf(subProto, s.Subject, s.Queue, s.sid))
nc.bw.writeDirect(fmt.Sprintf(subProto, subj, queue, sid))
if adjustedMax > 0 {
maxStr := strconv.Itoa(int(adjustedMax))
nc.bw.writeDirect(fmt.Sprintf(unsubProto, s.sid, maxStr))
nc.bw.writeDirect(fmt.Sprintf(unsubProto, sid, maxStr))
}
}
}
Expand Down

0 comments on commit 9c00d13

Please sign in to comment.