Skip to content

Commit

Permalink
Merge pull request #1157 from nats-io/fix_client_health_and_pings
Browse files Browse the repository at this point in the history
[FIXED] Reduce contention with client heartbeats and prevent panic
  • Loading branch information
kozlovic committed Feb 17, 2021
2 parents 63341c8 + 59c7f99 commit b69ffa9
Showing 1 changed file with 13 additions and 3 deletions.
16 changes: 13 additions & 3 deletions server/server.go
Expand Up @@ -678,6 +678,7 @@ type StanServer struct {
nca *nats.Conn // used to receive subscriptions acks
ncr *nats.Conn // used for raft messages
ncsr *nats.Conn // used for raft snapshot replication
ncp *nats.Conn // used for sending client HBs and PINGs responses

wg sync.WaitGroup // Wait on go routines during shutdown

Expand Down Expand Up @@ -1604,6 +1605,9 @@ func (s *StanServer) createNatsConnections() error {
s.ncsr, err = s.createNatsClientConn("raft_snap")
}
}
if err == nil {
s.ncp, err = s.createNatsClientConn("hb")
}
return err
}

Expand Down Expand Up @@ -3176,7 +3180,7 @@ func (s *StanServer) checkClientHealth(clientID string) {

// Sends the HB request. This call blocks for ClientHBTimeout,
// do not hold the lock for that long!
_, err := s.nc.Request(hbInbox, nil, s.opts.ClientHBTimeout)
_, err := s.ncp.Request(hbInbox, nil, s.opts.ClientHBTimeout)
// Grab the lock now.
client.Lock()
// Client could have been unregistered, in which case
Expand Down Expand Up @@ -3390,7 +3394,9 @@ func (s *StanServer) processClientPings(m *nats.Msg) {
client.RUnlock()
if hasFailedHBs {
client.Lock()
client.hbt.Reset(time.Millisecond)
if client.hbt != nil {
client.hbt.Reset(time.Millisecond)
}
client.Unlock()
}
if s.pingResponseOKBytes == nil {
Expand All @@ -3406,7 +3412,7 @@ func (s *StanServer) processClientPings(m *nats.Msg) {
}
reply = s.pingResponseInvalidClientBytes
}
s.ncs.Publish(m.Reply, reply)
s.ncp.Publish(m.Reply, reply)
}

// CtrlMsg are no longer used to solve connection and subscription close/unsub
Expand Down Expand Up @@ -5690,6 +5696,7 @@ func (s *StanServer) Shutdown() {
nc := s.nc
ftnc := s.ftnc
nca := s.nca
ncp := s.ncp

// Stop processing subscriptions start requests
s.subStartQuit <- struct{}{}
Expand Down Expand Up @@ -5760,6 +5767,9 @@ func (s *StanServer) Shutdown() {
if nca != nil {
nca.Close()
}
if ncp != nil {
ncp.Close()
}
if ns != nil {
ns.Shutdown()
}
Expand Down

0 comments on commit b69ffa9

Please sign in to comment.