diff --git a/server/server.go b/server/server.go index 74c40abc..70b5e048 100644 --- a/server/server.go +++ b/server/server.go @@ -3245,19 +3245,19 @@ func (s *StanServer) checkClientHealth(clientID string) { client.fhb++ // If we have reached the max number of failures if client.fhb > s.opts.ClientHBFailCount { - s.log.Errorf("[Client:%s] Timed out on heartbeats", clientID) - // close the client (connection). This locks the - // client object internally so unlock here. client.Unlock() - // If clustered, thread operations through Raft. - if s.isClustered { - if err := s.replicateConnClose(&pb.CloseRequest{ClientID: clientID}, false); err != nil { - s.log.Errorf("[Client:%s] Failed to replicate disconnect on heartbeat expiration: %v", - clientID, err) + s.barrier(func() { + s.log.Errorf("[Client:%s] Timed out on heartbeats", clientID) + // If clustered, thread operations through Raft. + if s.isClustered { + if err := s.replicateConnClose(&pb.CloseRequest{ClientID: clientID}, false); err != nil { + s.log.Errorf("[Client:%s] Failed to replicate disconnect on heartbeat expiration: %v", + clientID, err) + } + } else { + s.closeClient(clientID) } - } else { - s.closeClient(clientID) - } + }) return } } else { @@ -3298,14 +3298,16 @@ func (s *StanServer) closeClient(clientID string) error { return ErrUnknownClient } - // Remove all non-durable subscribers. - s.removeAllNonDurableSubscribers(client) - - // Remove from our clientStore. + // Remove from our clientStore before removing subs. + // This prevent race when the same client ID is just + // reconnecting and registering a durable. if _, err := s.clients.unregister(clientID); err != nil { s.log.Errorf("Error unregistering client %q: %v", clientID, err) } + // Remove all non-durable subscribers. + s.removeAllNonDurableSubscribers(client) + if s.debug { client.RLock() hbInbox := client.info.HbInbox