Skip to content

Commit

Permalink
Don't hold server lock when placing outbound items on sendq
Browse files Browse the repository at this point in the history
Needed to change some things around but think this is close.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Dec 2, 2018
1 parent 744795e commit 4b1e535
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 34 deletions.
4 changes: 3 additions & 1 deletion server/accounts.go
Expand Up @@ -155,7 +155,9 @@ func (a *Account) TotalSubs() int {
func (a *Account) addClient(c *client) int {
a.mu.Lock()
n := len(a.clients)
a.clients[c] = c
if a.clients != nil {
a.clients[c] = c
}
a.mu.Unlock()
if c != nil && c.srv != nil && a != c.srv.gacc {
c.srv.accConnsUpdate(a)
Expand Down
54 changes: 36 additions & 18 deletions server/events.go
Expand Up @@ -45,6 +45,8 @@ type internal struct {
subs map[string]msgHandler
sendq chan *pubMsg
wg sync.WaitGroup
orphMax time.Duration
chkOrph time.Duration
}

// ConnectEventMsg is sent when a new connection is made that is part of an account.
Expand Down Expand Up @@ -177,8 +179,8 @@ func (s *Server) internalSendLoop(wg *sync.WaitGroup) {
// Will send a shutdown message.
func (s *Server) sendShutdownEvent() {
s.mu.Lock()
defer s.mu.Unlock()
if s.sys == nil || s.sys.sendq == nil {
s.mu.Unlock()
return
}
subj := fmt.Sprintf(shutdownEventSubj, s.info.ID)
Expand All @@ -188,16 +190,22 @@ func (s *Server) sendShutdownEvent() {
s.sys.sendq = nil
// Unhook all msgHandlers. Normal client cleanup will deal with subs, etc.
s.sys.subs = nil
s.mu.Unlock()
// Send to the internal queue and mark as last.
sendq <- &pubMsg{r, subj, _EMPTY_, nil, nil, true}
}

// This will queue up a message to be sent.
// Assumes lock is held on entry.
func (s *Server) sendInternalMsg(r *SublistResult, sub, rply string, si *ServerInfo, msg interface{}) {
if s.sys == nil || s.sys.sendq == nil {
return
}
s.sys.sendq <- &pubMsg{r, sub, rply, si, msg, false}
sendq := s.sys.sendq
// Don't hold lock while placing on the channel.
s.mu.Unlock()
sendq <- &pubMsg{r, sub, rply, si, msg, false}
s.mu.Lock()
}

// Locked version of checking if events system running. Also checks server.
Expand All @@ -211,11 +219,6 @@ func (s *Server) eventsEnabled() bool {
return s.sys != nil && s.sys.client != nil && s.sys.account != nil
}

// orphanServerDuration is how long we have to not hear from a remote server
// top consider it orphaned. We will remove any accounting associated with it.
var orphanServerDuration = 5 * connHBInterval
var checkRemoteServerInterval = 3 * connHBInterval

// Check for orphan servers who may have gone away without notification.
func (s *Server) checkRemoteServers() {
s.mu.Lock()
Expand All @@ -225,14 +228,16 @@ func (s *Server) checkRemoteServers() {
}
now := time.Now()
for sid, su := range s.sys.servers {
if now.Sub(su.ltime) > orphanServerDuration {
if now.Sub(su.ltime) > s.sys.orphMax {
s.Debugf("Detected orphan remote server: %q", sid)
// Simulate it going away.
s.processRemoteServerShutdown(sid)
delete(s.sys.servers, sid)
}
}
s.sys.sweeper.Reset(checkRemoteServerInterval)
if s.sys.sweeper != nil {
s.sys.sweeper.Reset(s.sys.chkOrph)
}
}

// Start a ticker that will fire periodically and check for orphaned servers.
Expand All @@ -242,7 +247,7 @@ func (s *Server) startRemoteServerSweepTimer() {
if !s.eventsEnabled() {
return
}
s.sys.sweeper = time.AfterFunc(checkRemoteServerInterval, s.checkRemoteServers)
s.sys.sweeper = time.AfterFunc(s.sys.chkOrph, s.checkRemoteServers)
}

// This will setup our system wide tracking subs.
Expand Down Expand Up @@ -297,7 +302,7 @@ func (s *Server) remoteServerShutdown(sub *subscription, subject, reply string,
return
}
toks := strings.Split(subject, tsep)
if len(toks) != shutdownEventTokens {
if len(toks) < shutdownEventTokens {
s.Debugf("Received remote server shutdown on bad subject %q", subject)
}
sid := toks[serverSubjectIndex]
Expand Down Expand Up @@ -329,16 +334,17 @@ func (s *Server) updateRemoteServer(ms *ServerInfo) {
}

// shutdownEventing will clean up all eventing state.
// Lock is held upon entry.
func (s *Server) shutdownEventing() {
if !s.eventsRunning() {
return
}

s.mu.Lock()
if s.sys.sweeper != nil {
s.sys.sweeper.Stop()
s.sys.sweeper = nil
}
s.mu.Unlock()

// We will queue up a shutdown event and wait for the
// internal send loop to exit.
Expand Down Expand Up @@ -370,7 +376,10 @@ func (s *Server) shutdownEventing() {
}

func (s *Server) connsRequest(sub *subscription, subject, reply string, msg []byte) {
if !s.eventsRunning() {
s.mu.Lock()
defer s.mu.Unlock()

if !s.eventsEnabled() {
return
}

Expand All @@ -379,7 +388,7 @@ func (s *Server) connsRequest(sub *subscription, subject, reply string, msg []by
s.sys.client.Errorf("Error unmarshalling account connections request message: %v", err)
return
}
acc := s.LookupAccount(m.Account)
acc := s.lookupAccount(m.Account)
if acc == nil {
return
}
Expand Down Expand Up @@ -429,6 +438,7 @@ func (s *Server) remoteConnsUpdate(sub *subscription, subject, reply string, msg

// Setup tracking for this account. This allows us to track globally
// account activity.
// Lock should be held on entry.
func (s *Server) enableAccountTracking(a *Account) {
if a == nil || !s.eventsEnabled() || a == s.sys.account {
return
Expand All @@ -447,10 +457,11 @@ func (s *Server) enableAccountTracking(a *Account) {
}

// FIXME(dlc) - make configurable.
const connHBInterval = 30 * time.Second
const AccountConnHBInterval = 30 * time.Second

// sendAccConnsUpdate is called to send out our information on the
// account's local connections.
// Lock should be held on entry.
func (s *Server) sendAccConnsUpdate(a *Account, subj string) {
if !s.eventsEnabled() || a == nil || a == s.sys.account || a == s.gacc {
return
Expand All @@ -476,9 +487,9 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj string) {
}
// Check to see if we have an HB running and update.
if a.ctmr == nil {
a.etmr = time.AfterFunc(connHBInterval, func() { s.accConnsUpdate(a) })
a.etmr = time.AfterFunc(AccountConnHBInterval, func() { s.accConnsUpdate(a) })
} else {
a.etmr.Reset(connHBInterval)
a.etmr.Reset(AccountConnHBInterval)
}
a.mu.Unlock()

Expand Down Expand Up @@ -529,7 +540,10 @@ func (s *Server) accountConnectEvent(c *client) {
},
}
c.mu.Unlock()

s.mu.Lock()
s.sendInternalMsg(r, subj, "", &m.Server, &m)
s.mu.Unlock()
}

// accountDisconnectEvent will send an account client disconnect event if there is interest.
Expand Down Expand Up @@ -574,18 +588,22 @@ func (s *Server) accountDisconnectEvent(c *client, now time.Time, reason string)
Reason: reason,
}
c.mu.Unlock()

s.mu.Lock()
s.sendInternalMsg(r, subj, "", &m.Server, &m)
s.mu.Unlock()
}

// Internal message callback. If the msg is needed past the callback it is
// required to be copied.
type msgHandler func(sub *subscription, subject, reply string, msg []byte)

func (s *Server) deliverInternalMsg(sub *subscription, subject, reply, msg []byte) {
s.mu.Lock()
if !s.eventsEnabled() || s.sys.subs == nil {
s.mu.Unlock()
return
}
s.mu.Lock()
cb := s.sys.subs[string(sub.sid)]
s.mu.Unlock()
if cb != nil {
Expand Down
29 changes: 14 additions & 15 deletions server/events_test.go
Expand Up @@ -325,7 +325,9 @@ func TestSystemAccountInternalSubscriptions(t *testing.T) {
// Now make sure we do not hear ourselves. We optimize this for internally
// generated messages.
r := SublistResult{psubs: []*subscription{sub}}
s.mu.Lock()
s.sendInternalMsg(&r, "foo", "", nil, msg.Data)
s.mu.Unlock()

select {
case <-received:
Expand Down Expand Up @@ -473,6 +475,7 @@ func TestSystemAccountConnectionLimitsServerShutdownGraceful(t *testing.T) {
// Test that the remote accounting works when a server goes away.
func TestSystemAccountConnectionLimitsServerShutdownForced(t *testing.T) {
sa, optsA, sb, optsB := runTrustedCluster(t)
defer sa.Shutdown()

// Let's create a user account.
okp, _ := nkeys.FromSeed(oSeed)
Expand All @@ -489,25 +492,19 @@ func TestSystemAccountConnectionLimitsServerShutdownForced(t *testing.T) {
urlB := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port)

for i := 0; i < 10; i++ {
_, err := nats.Connect(urlA, nats.NoReconnect(), createUserCreds(t, sa, akp))
c, err := nats.Connect(urlA, nats.NoReconnect(), createUserCreds(t, sa, akp))
if err != nil {
t.Fatalf("Expected to connect, got %v", err)
}
_, err = nats.Connect(urlB, nats.NoReconnect(), createUserCreds(t, sb, akp))
defer c.Close()
c, err = nats.Connect(urlB, nats.NoReconnect(), createUserCreds(t, sb, akp))
if err != nil {
t.Fatalf("Expected to connect, got %v", err)
}
defer c.Close()
}

// We are at capacity so both of these should fail.
if _, err := nats.Connect(urlA, createUserCreds(t, sa, akp)); err == nil {
t.Fatalf("Expected connection to fail due to max limit")
}
if _, err := nats.Connect(urlB, createUserCreds(t, sb, akp)); err == nil {
t.Fatalf("Expected connection to fail due to max limit")
}

// Now shutdown Server B. Do so such that now communications goo out.
// Now shutdown Server B. Do so such that now communications go out.
sb.mu.Lock()
sb.sys = nil
sb.mu.Unlock()
Expand All @@ -518,16 +515,18 @@ func TestSystemAccountConnectionLimitsServerShutdownForced(t *testing.T) {
}

// Let's speed up the checking process.
checkRemoteServerInterval = 10 * time.Millisecond
orphanServerDuration = 30 * time.Millisecond
sa.mu.Lock()
sa.sys.sweeper.Reset(checkRemoteServerInterval)
sa.sys.chkOrph = 10 * time.Millisecond
sa.sys.orphMax = 30 * time.Millisecond
sa.sys.sweeper.Reset(sa.sys.chkOrph)
sa.mu.Unlock()

// We should eventually be able to connect.
checkFor(t, 5*time.Second, 50*time.Millisecond, func() error {
if _, err := nats.Connect(urlA, createUserCreds(t, sa, akp)); err != nil {
if c, err := nats.Connect(urlA, createUserCreds(t, sa, akp)); err != nil {
return err
} else {
c.Close()
}
return nil
})
Expand Down
2 changes: 2 additions & 0 deletions server/server.go
Expand Up @@ -533,6 +533,8 @@ func (s *Server) setSystemAccount(acc *Account) error {
servers: make(map[string]*serverUpdate),
subs: make(map[string]msgHandler),
sendq: make(chan *pubMsg, 128),
orphMax: 5 * AccountConnHBInterval,
chkOrph: 3 * AccountConnHBInterval,
}
s.sys.client.initClient()
s.sys.client.echo = false
Expand Down

0 comments on commit 4b1e535

Please sign in to comment.