Skip to content

Commit

Permalink
[FIXED] Possible panic on startup when monitoring endpoint inspected
Browse files Browse the repository at this point in the history
If some monitoring endpoints were inspected during the start process
it could result in a panic due to a map read during a map set.

Resolves #1235

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Jan 30, 2022
1 parent a17a5b6 commit c6f1e64
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 12 deletions.
31 changes: 19 additions & 12 deletions server/monitor.go
Expand Up @@ -172,21 +172,21 @@ func (s *StanServer) startMonitoring(nOpts *natsd.Options) error {
func (s *StanServer) HandleRootz(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, `<html lang="en">
<head>
<link rel="shortcut icon" href="http://nats.io/img/favicon.ico">
<link rel="shortcut icon" href="https://nats.io/img/favicon.ico">
<style type="text/css">
body { font-family: "Century Gothic", CenturyGothic, AppleGothic, sans-serif; font-size: 22; }
a { margin-left: 32px; }
</style>
</head>
<body>
<img src="http://nats.io/img/logo.png" alt="NATS Streaming">
<img src="https://nats.io/img/logo.png" alt="NATS Streaming">
<br/>
<a href=.%s>server</a><br/>
<a href=.%s>store</a><br/>
<a href=.%s>clients</a><br/>
<a href=.%s>channels</a><br/>
<br/>
<a href=http://nats.io/documentation/server/gnatsd-monitoring/>help</a>
<a href=https://docs.nats.io/legacy/stan/intro/monitoring/>help</a>
</body>
</html>`, ServerPath, StorePath, ClientsPath, ChannelsPath)
}
Expand All @@ -198,15 +198,6 @@ func (s *StanServer) HandleServerz(w http.ResponseWriter, r *http.Request) {
http.Error(w, fmt.Sprintf("Error getting information about channels state: %v", err), http.StatusInternalServerError)
return
}
var role string
var nodeID string
s.mu.RLock()
state := s.state
if s.raft != nil {
role = s.raft.State().String()
nodeID = s.info.NodeID
}
s.mu.RUnlock()

numSubs := s.numSubs()
now := time.Now()
Expand All @@ -227,6 +218,15 @@ func (s *StanServer) HandleServerz(w http.ResponseWriter, r *http.Request) {
maxFDs = int(limits.OpenFiles)
}

var role string
var nodeID string

s.mu.RLock()
state := s.state
if s.raft != nil {
role = s.raft.State().String()
nodeID = s.info.NodeID
}
serverz := &Serverz{
ClusterID: s.info.ClusterID,
ServerID: s.serverID,
Expand All @@ -250,6 +250,7 @@ func (s *StanServer) HandleServerz(w http.ResponseWriter, r *http.Request) {
OpenFDs: fds,
MaxFDs: maxFDs,
}
s.mu.RUnlock()
s.sendResponse(w, r, serverz)
}

Expand Down Expand Up @@ -293,6 +294,7 @@ func (s *StanServer) HandleStorez(w http.ResponseWriter, r *http.Request) {
http.Error(w, fmt.Sprintf("Error getting information about channels state: %v", err), http.StatusInternalServerError)
return
}
s.mu.RLock()
storez := &Storez{
ClusterID: s.info.ClusterID,
ServerID: s.serverID,
Expand All @@ -302,6 +304,7 @@ func (s *StanServer) HandleStorez(w http.ResponseWriter, r *http.Request) {
TotalMsgs: count,
TotalBytes: bytes,
}
s.mu.RUnlock()
s.sendResponse(w, r, storez)
}

Expand Down Expand Up @@ -353,6 +356,7 @@ func (s *StanServer) HandleClientsz(w http.ResponseWriter, r *http.Request) {
}
}
carr = carr[0:carrSize]
s.mu.RLock()
clientsz := &Clientsz{
ClusterID: s.info.ClusterID,
ServerID: s.serverID,
Expand All @@ -363,6 +367,7 @@ func (s *StanServer) HandleClientsz(w http.ResponseWriter, r *http.Request) {
Count: len(carr),
Clients: carr,
}
s.mu.RUnlock()
s.sendResponse(w, r, clientsz)
}
}
Expand Down Expand Up @@ -499,6 +504,7 @@ func (s *StanServer) HandleChannelsz(w http.ResponseWriter, r *http.Request) {
channels := s.channels.getAll()
totalChannels := len(channels)
minoff, maxoff := getMinMaxOffset(offset, limit, totalChannels)
s.mu.RLock()
channelsz := &Channelsz{
ClusterID: s.info.ClusterID,
ServerID: s.serverID,
Expand All @@ -507,6 +513,7 @@ func (s *StanServer) HandleChannelsz(w http.ResponseWriter, r *http.Request) {
Limit: limit,
Total: totalChannels,
}
s.mu.RUnlock()
if subsOption == 1 {
carr := make([]*Channelz, 0, totalChannels)
for cn := range channels {
Expand Down
65 changes: 65 additions & 0 deletions server/monitor_test.go
Expand Up @@ -24,6 +24,7 @@ import (
"reflect"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -1448,3 +1449,67 @@ func TestMonitorInOutMsgs(t *testing.T) {
t.Fatalf("Expected 15 outbound messages, got %v - %v", sz.InMsgs, sz.InBytes)
}
}

func TestMonitorNoPanicOnServerRestart(t *testing.T) {
resetPreviousHTTPConnections()
cleanupDatastore(t)
defer cleanupDatastore(t)
opts := getTestDefaultOptsForPersistentStore()

ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

opts.NATSServerURL = "nats://127.0.0.1:4222"
s := runMonitorServer(t, opts)
defer s.Shutdown()

sc := NewDefaultConnection(t)
defer sc.Close()

for i := 0; i < 100; i++ {
if _, err := sc.Subscribe(fmt.Sprintf("foo.%d", i+1),
func(_ *stan.Msg) {}, stan.DurableName("dur")); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
}

endpoints := []string{
"channelsz?subs=1",
"serverz",
"storez",
"clientsz",
"isFTActive",
}
for _, e := range endpoints {
s.Shutdown()

wg := sync.WaitGroup{}
wg.Add(1)
done := make(chan struct{})
go func() {
defer wg.Done()
url := fmt.Sprintf("http://%s:%d/streaming/", monitorHost, monitorPort)
for {
resp, err := http.DefaultClient.Get(url + e)
if err != nil {
continue
}
ioutil.ReadAll(resp.Body)
resp.Body.Close()
select {
case <-done:
return
default:
}
}
}()

s = runMonitorServer(t, opts)
defer s.Shutdown()

time.Sleep(100 * time.Millisecond)
close(done)
wg.Wait()
}
sc.Close()
}
2 changes: 2 additions & 0 deletions server/server.go
Expand Up @@ -2525,7 +2525,9 @@ func (s *StanServer) processRecoveredChannels(channels map[string]*stores.Recove
allSubs := make([]*subState, 0, 16)

for channelName, recoveredChannel := range channels {
s.channels.Lock()
channel, err := s.channels.create(s, channelName, recoveredChannel.Channel)
s.channels.Unlock()
if err != nil {
return nil, err
}
Expand Down

0 comments on commit c6f1e64

Please sign in to comment.