Skip to content

Commit

Permalink
[FIXED] Conn and Subscription Close()/Unubscribe() could not be retried
Browse files Browse the repository at this point in the history
If a connection is closed, or a subscription is closed or unsubscribed,
and an error occurs at the protocol level, for instance getting a
timeout or the NATS connection is currently disconnected, etc..
the Close()/Unsubscribe() calls would return an error but the user
would not be able to invoke them again, in the sense that these
calls would bail out early because those objects were already marked
as closed.

This was problematic because if an application closes a connection for
instance but gets a timeout, then there is no way for the application
to really try to tell the server that the connection should be closed.
(same for subscription).

This PR let the user call Close()/Unsubscribe() until the call is
completely successful. This is not restricted to only timeout error,
and it is left to the user to decide which error should be re-tryable.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Oct 28, 2021
1 parent 9a6e4f2 commit ced5928
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 22 deletions.
31 changes: 18 additions & 13 deletions stan.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ type conn struct {
pubNUID *nuid.NUID // NUID generator for published messages.
connLostCB ConnectionLostHandler
closed bool
fullyClosed bool
ping pingInfo
}

Expand Down Expand Up @@ -672,21 +673,17 @@ func (sc *conn) Close() error {
sc.Lock()
defer sc.Unlock()

if sc.closed {
// We are already closed.
// If we are fully closed, simply return.
if sc.fullyClosed {
return nil
}
// Signals we are closed.
sc.closed = true

// Capture for NATS calls below.
if sc.ncOwned {
defer sc.nc.Close()
// If this is the very first Close() call, do some internal cleanup,
// otherwise, simply send the close protocol message.
if !sc.closed {
sc.closed = true
sc.cleanupOnClose(ErrConnectionClosed)
}

// Now close ourselves.
sc.cleanupOnClose(ErrConnectionClosed)

req := &pb.CloseRequest{ClientID: sc.clientID}
b, _ := req.Marshal()
reply, err := sc.nc.Request(sc.closeRequests, b, sc.opts.ConnectTimeout)
Expand All @@ -704,6 +701,11 @@ func (sc *conn) Close() error {
if cr.Error != "" {
return errors.New(cr.Error)
}
// Success, set this flag and close the underlying NATS connection if owned.
sc.fullyClosed = true
if sc.ncOwned {
sc.nc.Close()
}
return nil
}

Expand Down Expand Up @@ -900,14 +902,17 @@ func (sc *conn) processMsg(raw *nats.Msg) {
msg.Sub = sub

sub.RLock()
if sub.closed {
sub.RUnlock()
return
}
cb := sub.cb
ackSubject := sub.ackInbox
isManualAck := sub.opts.ManualAcks
subsc := sub.sc // Can be nil if sub has been unsubscribed.
sub.RUnlock()

// Perform the callback
if cb != nil && subsc != nil {
if cb != nil {
cb(msg)
}

Expand Down
142 changes: 142 additions & 0 deletions stan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,87 @@ func TestClose(t *testing.T) {
}
}

func TestConnCloseError(t *testing.T) {
s := RunServer(clusterName)
defer s.Shutdown()

sc := NewDefaultConnection(t)
defer sc.Close()

nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()

// Override the clientID so that we get an error on close
scc := sc.(*conn)
scc.Lock()
scc.clientID = "foobar"
closeSubj := scc.closeRequests
scc.Unlock()
if err := sc.Close(); err == nil || !strings.Contains(err.Error(), "unknown") {
t.Fatalf("Expected error about unknown clientID, got %v", err)
}

checkInternalConnClosed := func(expectedClosed bool) {
t.Helper()
scc.RLock()
defer scc.RUnlock()
closed := scc.nc.IsClosed()
if expectedClosed && !closed {
t.Fatalf("Expected internal NATS connection to be closed, but it wasn't")
} else if !expectedClosed && closed {
t.Fatalf("Expected internal NATS connection to be not be closed, but it was")
}
}
// Internal NATS connection should not have been closed
checkInternalConnClosed(false)

// Now setup a subscription to check if library is sending the close protocol.
crsub, err := nc.SubscribeSync(closeSubj)
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
// Flush since this is a different connection than the one used by sc.
nc.Flush()

// We should be able to call Close() again
if err := sc.Close(); err == nil {
t.Fatal("Expected error, did not get one")
}
// Connection still not closed
checkInternalConnClosed(false)
// Make sure we send the protocol
if _, err := crsub.NextMsg(time.Second); err != nil {
t.Fatalf("Did not get close protocol: %v", err)
}

// Fix clientID
scc.Lock()
scc.clientID = clientName
scc.Unlock()
// Now close should work.
if err := sc.Close(); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Now internal connection should have been closed
checkInternalConnClosed(true)

// Check protocol was sent
if _, err := crsub.NextMsg(time.Second); err != nil {
t.Fatalf("Did not get close protocol: %v", err)
}
// Now, another call to Close() should just return nil
if err := sc.Close(); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// And this time, no protocol should be sent
if _, err := crsub.NextMsg(100 * time.Millisecond); err == nil {
t.Fatal("Close protocol should not have been sent")
}
}

func TestDoubleClose(t *testing.T) {
s := RunServer(clusterName)
defer s.Shutdown()
Expand Down Expand Up @@ -2737,6 +2818,67 @@ func TestSubTimeout(t *testing.T) {
}
}

func TestSubCloseError(t *testing.T) {
s := RunServer(clusterName)
defer s.Shutdown()

sc := NewDefaultConnection(t)
defer sc.Close()

sub, err := sc.Subscribe("foo", func(_ *Msg) {})
if err != nil {
t.Fatalf("Error on sub: %v", err)
}

nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
t.Fatalf("Error on connect: ")
}
defer nc.Close()

scc := sc.(*conn)
scc.Lock()
closeSubj := scc.subCloseRequests
// alter the subCloseRequests so that the sub close fails
scc.subCloseRequests = "dummy"
scc.Unlock()

// Now setup a subscription to check if library is sending the close protocol.
crsub, err := nc.SubscribeSync(closeSubj)
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
// Flush since this is a different connection than the one used by sc.
nc.Flush()

if err := sub.Close(); err == nil {
t.Fatal("Expected error, got none")
}

// Fix close subject
scc.Lock()
scc.subCloseRequests = closeSubj
scc.Unlock()

// Try again, it should work
if err := sub.Close(); err != nil {
t.Fatalf("Error on close: %v", err)
}
// Check protocol was sent
if _, err := crsub.NextMsg(time.Second); err != nil {
t.Fatalf("Did not get close protocol: %v", err)
}

// Now another call to Close() should return BadSubscription
if err := sub.Close(); err != ErrBadSubscription {
t.Fatalf("Expected %v, got %v", ErrBadSubscription, err)
}
// And this time, no protocol should be sent
if _, err := crsub.NextMsg(100 * time.Millisecond); err == nil {
t.Fatal("Close protocol should not have been sent")
}
}

func TestNatsOptions(t *testing.T) {
snopts := natsd.DefaultTestOptions
snopts.Username = "foo"
Expand Down
37 changes: 28 additions & 9 deletions sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ type subscription struct {
inboxSub *nats.Subscription
opts SubscriptionOptions
cb MsgHandler
// closed indicate that sub.Close() was invoked, but fullyClosed
// is only set if the close/unsub protocol was successful. This
// allow the user to be able to call sub.Close() several times
// in case an error is returned.
closed bool
fullyClosed bool
}

// SubscriptionOption is a function on the options for a subscription.
Expand Down Expand Up @@ -414,24 +420,32 @@ func (sub *subscription) SetPendingLimits(msgLimit, bytesLimit int) error {
// given boolean.
func (sub *subscription) closeOrUnsubscribe(doClose bool) error {
sub.Lock()
sc := sub.sc
if sc == nil {
// Already closed.
// If we are fully closed, return error indicating that the
// subscription is invalid. Note that conn.Close() in this case
// returns nil, but keeping behavior same so we don't have breaking change.
if sub.fullyClosed {
sub.Unlock()
return ErrBadSubscription
}
sub.sc = nil
sub.inboxSub.Unsubscribe()
sub.inboxSub = nil
wasClosed := sub.closed
// If this is the very first Close() call, do some internal cleanup,
// otherwise, simply send the close protocol message.
if !wasClosed {
sub.closed = true
sub.inboxSub.Unsubscribe()
sub.inboxSub = nil
}
sc := sub.sc
sub.Unlock()

sc.Lock()
if sc.closed {
sc.Unlock()
return ErrConnectionClosed
}

delete(sc.subMap, sub.inbox)
if !wasClosed {
delete(sc.subMap, sub.inbox)
}
reqSubject := sc.unsubRequests
if doClose {
reqSubject = sc.subCloseRequests
Expand Down Expand Up @@ -468,6 +482,10 @@ func (sub *subscription) closeOrUnsubscribe(doClose bool) error {
return errors.New(r.Error)
}

// Success, set this flag so we no longer accept a Close()
sub.Lock()
sub.fullyClosed = true
sub.Unlock()
return nil
}

Expand All @@ -493,13 +511,14 @@ func (msg *Msg) Ack() error {
ackSubject := sub.ackInbox
isManualAck := sub.opts.ManualAcks
sc := sub.sc
closed := sub.closed
sub.RUnlock()

// Check for error conditions.
if !isManualAck {
return ErrManualAck
}
if sc == nil {
if closed {
return ErrBadSubscription
}

Expand Down

0 comments on commit ced5928

Please sign in to comment.