diff --git a/server/monitor.go b/server/monitor.go index 7c0156e0..44c0d545 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -172,21 +172,21 @@ func (s *StanServer) startMonitoring(nOpts *natsd.Options) error { func (s *StanServer) HandleRootz(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, ` - + - NATS Streaming + NATS Streaming
server
store
clients
channels

- help + help `, ServerPath, StorePath, ClientsPath, ChannelsPath) } @@ -198,15 +198,6 @@ func (s *StanServer) HandleServerz(w http.ResponseWriter, r *http.Request) { http.Error(w, fmt.Sprintf("Error getting information about channels state: %v", err), http.StatusInternalServerError) return } - var role string - var nodeID string - s.mu.RLock() - state := s.state - if s.raft != nil { - role = s.raft.State().String() - nodeID = s.info.NodeID - } - s.mu.RUnlock() numSubs := s.numSubs() now := time.Now() @@ -227,6 +218,15 @@ func (s *StanServer) HandleServerz(w http.ResponseWriter, r *http.Request) { maxFDs = int(limits.OpenFiles) } + var role string + var nodeID string + + s.mu.RLock() + state := s.state + if s.raft != nil { + role = s.raft.State().String() + nodeID = s.info.NodeID + } serverz := &Serverz{ ClusterID: s.info.ClusterID, ServerID: s.serverID, @@ -250,6 +250,7 @@ func (s *StanServer) HandleServerz(w http.ResponseWriter, r *http.Request) { OpenFDs: fds, MaxFDs: maxFDs, } + s.mu.RUnlock() s.sendResponse(w, r, serverz) } @@ -293,6 +294,7 @@ func (s *StanServer) HandleStorez(w http.ResponseWriter, r *http.Request) { http.Error(w, fmt.Sprintf("Error getting information about channels state: %v", err), http.StatusInternalServerError) return } + s.mu.RLock() storez := &Storez{ ClusterID: s.info.ClusterID, ServerID: s.serverID, @@ -302,6 +304,7 @@ func (s *StanServer) HandleStorez(w http.ResponseWriter, r *http.Request) { TotalMsgs: count, TotalBytes: bytes, } + s.mu.RUnlock() s.sendResponse(w, r, storez) } @@ -353,6 +356,7 @@ func (s *StanServer) HandleClientsz(w http.ResponseWriter, r *http.Request) { } } carr = carr[0:carrSize] + s.mu.RLock() clientsz := &Clientsz{ ClusterID: s.info.ClusterID, ServerID: s.serverID, @@ -363,6 +367,7 @@ func (s *StanServer) HandleClientsz(w http.ResponseWriter, r *http.Request) { Count: len(carr), Clients: carr, } + s.mu.RUnlock() s.sendResponse(w, r, clientsz) } } @@ -499,6 +504,7 @@ func (s *StanServer) HandleChannelsz(w http.ResponseWriter, r *http.Request) { channels := s.channels.getAll() totalChannels := len(channels) minoff, maxoff := getMinMaxOffset(offset, limit, totalChannels) + s.mu.RLock() channelsz := &Channelsz{ ClusterID: s.info.ClusterID, ServerID: s.serverID, @@ -507,6 +513,7 @@ func (s *StanServer) HandleChannelsz(w http.ResponseWriter, r *http.Request) { Limit: limit, Total: totalChannels, } + s.mu.RUnlock() if subsOption == 1 { carr := make([]*Channelz, 0, totalChannels) for cn := range channels { diff --git a/server/monitor_test.go b/server/monitor_test.go index aa5f695d..be9fda5f 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -24,6 +24,7 @@ import ( "reflect" "runtime" "strings" + "sync" "sync/atomic" "testing" "time" @@ -1448,3 +1449,67 @@ func TestMonitorInOutMsgs(t *testing.T) { t.Fatalf("Expected 15 outbound messages, got %v - %v", sz.InMsgs, sz.InBytes) } } + +func TestMonitorNoPanicOnServerRestart(t *testing.T) { + resetPreviousHTTPConnections() + cleanupDatastore(t) + defer cleanupDatastore(t) + opts := getTestDefaultOptsForPersistentStore() + + ns := natsdTest.RunDefaultServer() + defer ns.Shutdown() + + opts.NATSServerURL = "nats://127.0.0.1:4222" + s := runMonitorServer(t, opts) + defer s.Shutdown() + + sc := NewDefaultConnection(t) + defer sc.Close() + + for i := 0; i < 100; i++ { + if _, err := sc.Subscribe(fmt.Sprintf("foo.%d", i+1), + func(_ *stan.Msg) {}, stan.DurableName("dur")); err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + } + + endpoints := []string{ + "channelsz?subs=1", + "serverz", + "storez", + "clientsz", + "isFTActive", + } + for _, e := range endpoints { + s.Shutdown() + + wg := sync.WaitGroup{} + wg.Add(1) + done := make(chan struct{}) + go func() { + defer wg.Done() + url := fmt.Sprintf("http://%s:%d/streaming/", monitorHost, monitorPort) + for { + resp, err := http.DefaultClient.Get(url + e) + if err != nil { + continue + } + ioutil.ReadAll(resp.Body) + resp.Body.Close() + select { + case <-done: + return + default: + } + } + }() + + s = runMonitorServer(t, opts) + defer s.Shutdown() + + time.Sleep(100 * time.Millisecond) + close(done) + wg.Wait() + } + sc.Close() +} diff --git a/server/server.go b/server/server.go index be883ca7..fc945cdb 100644 --- a/server/server.go +++ b/server/server.go @@ -2525,7 +2525,9 @@ func (s *StanServer) processRecoveredChannels(channels map[string]*stores.Recove allSubs := make([]*subState, 0, 16) for channelName, recoveredChannel := range channels { + s.channels.Lock() channel, err := s.channels.create(s, channelName, recoveredChannel.Channel) + s.channels.Unlock() if err != nil { return nil, err }