Skip to content

Commit

Permalink
Merge pull request #347 from nats-io/fix_sub_req_timeout_handling
Browse files Browse the repository at this point in the history
[FIXED] Subscribe timeout should send a close request
  • Loading branch information
kozlovic committed Feb 25, 2021
2 parents cebf22c + 6e44911 commit f8343e0
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 2 deletions.
47 changes: 46 additions & 1 deletion stan_test.go
@@ -1,4 +1,4 @@
// Copyright 2016-2018 The NATS Authors
// Copyright 2016-2021 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -2665,3 +2665,48 @@ func TestPublishAsyncTimeout(t *testing.T) {
t.Fatalf("Ack handler was invoked only %v out of %v", c, total)
}
}

func TestSubTimeout(t *testing.T) {
ns := natsd.RunDefaultServer()
defer ns.Shutdown()

opts := server.GetDefaultOptions()
opts.NATSServerURL = nats.DefaultURL
opts.ID = clusterName
s := runServerWithOpts(opts)
defer s.Shutdown()

sc, err := Connect(clusterName, clientName, ConnectWait(250*time.Millisecond))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer sc.Close()

// Setup a subscription on the close subscription subject so we can
// check that the library sends a close request on subscribe timeout.
nc := sc.NatsConn()
scc := sc.(*conn)
scc.Lock()
subj := scc.subCloseRequests
scc.Unlock()
sub, err := nc.SubscribeSync(subj)
if err != nil {
t.Fatalf("Error on unsubscribe: %v", err)
}

// Now shutdown STAN server just before trying to create a subscription.
s.Shutdown()
if _, err := sc.Subscribe("foo", func(_ *Msg) {}); err != ErrSubReqTimeout {
t.Fatalf("Expected %v, got %v", ErrSubReqTimeout, err)
}
// Now check that we got the subscription close request
msg, err := sub.NextMsg(250 * time.Millisecond)
if err != nil {
t.Fatalf("Error getting subscription close request: %v", err)
}
req := &pb.UnsubscribeRequest{}
req.Unmarshal(msg.Data)
if req.ClientID != clientName || req.Subject != "foo" || req.Inbox == "" {
t.Fatalf("Unexpected sub close request: %+v", req)
}
}
18 changes: 17 additions & 1 deletion sub.go
@@ -1,4 +1,4 @@
// Copyright 2016-2018 The NATS Authors
// Copyright 2016-2021 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -287,6 +287,22 @@ func (sc *conn) subscribe(subject, qgroup string, cb MsgHandler, options ...Subs
if err != nil {
sub.inboxSub.Unsubscribe()
if err == nats.ErrTimeout {
// On timeout, we don't know if the server got the request or
// not. So we will do best effort and send a "subscription close"
// request. However, since we don't have the AckInbox that is
// normally used to close a subscription, we will use the sub's
// inbox. Newer servers will fallback to lookup by inbox if they
// don't find the sub from the "AckInbox" lookup.
scr := &pb.UnsubscribeRequest{
ClientID: sc.clientID,
Subject: subject,
Inbox: sub.inbox,
}
b, _ := scr.Marshal()
// Send to the subscription close request, not the unsubscribe subject.
sc.nc.Publish(sc.subCloseRequests, b)

// Report this error to the user.
err = ErrSubReqTimeout
}
return nil, err
Expand Down

0 comments on commit f8343e0

Please sign in to comment.