Skip to content

Commit

Permalink
Support CID in client INFO, allow filtering /connz by CID
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Jun 21, 2018
1 parent a05b9e1 commit 17fecd4
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 94 deletions.
19 changes: 18 additions & 1 deletion server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package server

import (
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -831,6 +832,22 @@ func (c *client) sendPing() {
c.sendProto([]byte("PING\r\n"), true)
}

// Generates the INFO to be sent to the client with the client ID included.
// info arg will be copied since passed by value.
// Assume lock is held.
func (c *client) generateClientInfoJSON(info Info) []byte {
info.CID = c.cid
// Generate the info json
b, err := json.Marshal(info)
if err != nil {
c.Errorf("Error marshaling INFO JSON: %+v\n", err)
return nil
}
pcs := [][]byte{[]byte("INFO"), b, []byte(CR_LF)}
json := bytes.Join(pcs, []byte(" "))
return json
}

// Assume the lock is held upon entry.
func (c *client) sendInfo(info []byte) {
c.sendProto(info, true)
Expand Down Expand Up @@ -896,7 +913,7 @@ func (c *client) processPing() {
// If there was a cluster update since this client was created,
// send an updated INFO protocol now.
if srv.lastCURLsUpdate >= c.start.UnixNano() {
c.sendInfo(srv.infoJSON)
c.sendInfo(c.generateClientInfoJSON(srv.info))
}
c.mu.Unlock()
srv.mu.Unlock()
Expand Down
123 changes: 88 additions & 35 deletions server/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ type ConnzOptions struct {

// Limit is the maximum number of connections that should be returned by Connz().
Limit int `json:"limit"`

// Filter for this explicit client connection.
CID uint64 `json:"cid"`
}

// ConnInfo has detailed information on a per connection basis.
Expand Down Expand Up @@ -104,6 +107,7 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
subs bool
offset int
limit = DefaultConnListSize
cid = uint64(0)
)

if opts != nil {
Expand All @@ -126,6 +130,10 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
if limit <= 0 {
limit = DefaultConnListSize
}
if opts.CID > 0 {
cid = opts.CID
limit = 1
}
}

c := &Connz{
Expand All @@ -146,36 +154,45 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
totalClients := len(s.clients)
c.Total = totalClients

i := 0
pairs := make(Pairs, totalClients)
for _, client := range s.clients {
client.mu.Lock()
switch sortOpt {
case ByCid:
pairs[i] = Pair{Key: client, Val: int64(client.cid)}
case BySubs:
pairs[i] = Pair{Key: client, Val: int64(len(client.subs))}
case ByPending:
pairs[i] = Pair{Key: client, Val: int64(client.out.pb)}
case ByOutMsgs:
pairs[i] = Pair{Key: client, Val: client.outMsgs}
case ByInMsgs:
pairs[i] = Pair{Key: client, Val: atomic.LoadInt64(&client.inMsgs)}
case ByOutBytes:
pairs[i] = Pair{Key: client, Val: client.outBytes}
case ByInBytes:
pairs[i] = Pair{Key: client, Val: atomic.LoadInt64(&client.inBytes)}
case ByLast:
pairs[i] = Pair{Key: client, Val: client.last.UnixNano()}
case ByIdle:
pairs[i] = Pair{Key: client, Val: c.Now.Sub(client.last).Nanoseconds()}
var pairs Pairs
if cid > 0 {
//pairs = make(Pairs, 0, 1)
client := s.clients[cid]
if client != nil {
pairs = append(pairs, Pair{Key: client, Val: int64(cid)})
}
} else {
i := 0
pairs = make(Pairs, totalClients)
for _, client := range s.clients {
client.mu.Lock()
switch sortOpt {
case ByCid:
pairs[i] = Pair{Key: client, Val: int64(client.cid)}
case BySubs:
pairs[i] = Pair{Key: client, Val: int64(len(client.subs))}
case ByPending:
pairs[i] = Pair{Key: client, Val: int64(client.out.pb)}
case ByOutMsgs:
pairs[i] = Pair{Key: client, Val: client.outMsgs}
case ByInMsgs:
pairs[i] = Pair{Key: client, Val: atomic.LoadInt64(&client.inMsgs)}
case ByOutBytes:
pairs[i] = Pair{Key: client, Val: client.outBytes}
case ByInBytes:
pairs[i] = Pair{Key: client, Val: atomic.LoadInt64(&client.inBytes)}
case ByLast:
pairs[i] = Pair{Key: client, Val: client.last.UnixNano()}
case ByIdle:
pairs[i] = Pair{Key: client, Val: c.Now.Sub(client.last).Nanoseconds()}
}
client.mu.Unlock()
i++
}
client.mu.Unlock()
i++
}
s.mu.Unlock()

if totalClients > 0 {
if totalClients > 0 && len(pairs) > 1 {
if sortOpt == ByCid {
// Return in ascending order
sort.Sort(pairs)
Expand All @@ -195,14 +212,16 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
if maxoff > totalClients {
maxoff = totalClients
}
pairs = pairs[minoff:maxoff]
if pairs != nil {
pairs = pairs[minoff:maxoff]
}

// Now we have the real number of ConnInfo objects, we can set c.NumConns
// and allocate the array
c.NumConns = len(pairs)
c.Conns = make([]ConnInfo, c.NumConns)

i = 0
i := 0
for _, pair := range pairs {

client := pair.Key
Expand Down Expand Up @@ -310,6 +329,34 @@ func (c *client) getRTT() string {
return fmt.Sprintf("%v", rtt)
}

func decodeBool(w http.ResponseWriter, r *http.Request, param string) (bool, error) {
str := r.URL.Query().Get(param)
if str == "" {
return false, nil
}
val, err := strconv.ParseBool(str)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(fmt.Sprintf("Error decoding boolean for '%s': %v", param, err)))
return false, err
}
return val, nil
}

func decodeUint64(w http.ResponseWriter, r *http.Request, param string) (uint64, error) {
str := r.URL.Query().Get(param)
if str == "" {
return 0, nil
}
val, err := strconv.ParseUint(str, 10, 64)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(fmt.Sprintf("Error decoding uint64 for '%s': %v", param, err)))
return 0, err
}
return val, nil
}

func decodeInt(w http.ResponseWriter, r *http.Request, param string) (int, error) {
str := r.URL.Query().Get(param)
if str == "" {
Expand All @@ -318,7 +365,7 @@ func decodeInt(w http.ResponseWriter, r *http.Request, param string) (int, error
val, err := strconv.Atoi(str)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(fmt.Sprintf("Error decoding %s: %v", param, err)))
w.Write([]byte(fmt.Sprintf("Error decoding int for '%s': %v", param, err)))
return 0, err
}
return val, nil
Expand All @@ -327,11 +374,11 @@ func decodeInt(w http.ResponseWriter, r *http.Request, param string) (int, error
// HandleConnz process HTTP requests for connection information.
func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) {
sortOpt := SortOpt(r.URL.Query().Get("sort"))
auth, err := decodeInt(w, r, "auth")
auth, err := decodeBool(w, r, "auth")
if err != nil {
return
}
subs, err := decodeInt(w, r, "subs")
subs, err := decodeBool(w, r, "subs")
if err != nil {
return
}
Expand All @@ -344,12 +391,18 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) {
return
}

cid, err := decodeUint64(w, r, "cid")
if err != nil {
return
}

connzOpts := &ConnzOptions{
Sort: sortOpt,
Username: auth == 1,
Subscriptions: subs == 1,
Username: auth,
Subscriptions: subs,
Offset: offset,
Limit: limit,
CID: cid,
}

s.mu.Lock()
Expand Down Expand Up @@ -469,12 +522,12 @@ func (s *Server) Routez(routezOpts *RoutezOptions) (*Routez, error) {

// HandleRoutez process HTTP requests for route information.
func (s *Server) HandleRoutez(w http.ResponseWriter, r *http.Request) {
subs, err := decodeInt(w, r, "subs")
subs, err := decodeBool(w, r, "subs")
if err != nil {
return
}
var opts *RoutezOptions
if subs == 1 {
if subs {
opts = &RoutezOptions{Subscriptions: true}
}

Expand Down
50 changes: 49 additions & 1 deletion server/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,54 @@ func TestConnzWithSubs(t *testing.T) {
}
}

func TestConnzWithCID(t *testing.T) {
s := runMonitorServer()
defer s.Shutdown()

// The one we will request
cid := 5
total := 10

// Create 10
for i := 1; i <= total; i++ {
nc := createClientConnSubscribeAndPublish(t, s)
defer nc.Close()
if i == cid {
nc.Subscribe("hello.foo", func(m *nats.Msg) {})
nc.Subscribe("hello.bar", func(m *nats.Msg) {})
ensureServerActivityRecorded(t, nc)
}
}

url := fmt.Sprintf("http://localhost:%d/connz?cid=%d", s.MonitorAddr().Port, cid)
for mode := 0; mode < 2; mode++ {
c := pollConz(t, s, mode, url, &ConnzOptions{CID: uint64(cid)})
// Test inside details of each connection
if len(c.Conns) != 1 {
t.Fatalf("Expected only one connection, but got %d\n", len(c.Conns))
}
if c.NumConns != 1 {
t.Fatalf("Expected NumConns to be 1, but got %d\n", c.NumConns)
}
ci := c.Conns[0]
if ci.Cid != uint64(cid) {
t.Fatalf("Expected to receive connection %v, but received %v\n", cid, ci.Cid)
}
if ci.NumSubs != 2 {
t.Fatalf("Expected to receive connection with %d subs, but received %d\n", 2, ci.NumSubs)
}
// Now test a miss
badUrl := fmt.Sprintf("http://localhost:%d/connz?cid=%d", s.MonitorAddr().Port, 100)
c = pollConz(t, s, mode, badUrl, &ConnzOptions{CID: uint64(100)})
if len(c.Conns) != 0 {
t.Fatalf("Expected no connections, got %d\n", len(c.Conns))
}
if c.NumConns != 0 {
t.Fatalf("Expected NumConns of 0, got %d\n", c.NumConns)
}
}
}

// Helper to map to connection name
func createConnMap(t *testing.T, cz *Connz) map[string]ConnInfo {
cm := make(map[string]ConnInfo)
Expand Down Expand Up @@ -433,7 +481,7 @@ func TestConnzRTT(t *testing.T) {
if rtt <= 0 {
t.Fatal("Expected RTT to be valid and non-zero\n")
}
if rtt > 5*time.Millisecond || rtt < 100*time.Nanosecond {
if rtt > 20*time.Millisecond || rtt < 100*time.Nanosecond {
t.Fatalf("Invalid RTT of %s\n", ci.RTT)
}
}
Expand Down
3 changes: 0 additions & 3 deletions server/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ func (t *tlsOption) Apply(server *Server) {
server.info.TLSVerify = (t.newValue.ClientAuth == tls.RequireAndVerifyClientCert)
message = "enabled"
}
server.generateServerInfoJSON()
server.mu.Unlock()
server.Noticef("Reloaded: tls = %s", message)
}
Expand Down Expand Up @@ -375,7 +374,6 @@ type maxPayloadOption struct {
func (m *maxPayloadOption) Apply(server *Server) {
server.mu.Lock()
server.info.MaxPayload = m.newValue
server.generateServerInfoJSON()
for _, client := range server.clients {
atomic.StoreInt64(&client.mpay, int64(m.newValue))
}
Expand Down Expand Up @@ -622,7 +620,6 @@ func (s *Server) applyOptions(opts []option) {
func (s *Server) reloadAuthorization() {
s.mu.Lock()
s.configureAuthorization()
s.generateServerInfoJSON()
clients := make(map[uint64]*client, len(s.clients))
for i, client := range s.clients {
clients[i] = client
Expand Down
9 changes: 0 additions & 9 deletions server/reload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1650,20 +1650,11 @@ func TestConfigReloadClientAdvertise(t *testing.T) {
verify := func(expectedHost string, expectedPort int) {
s.mu.Lock()
info := s.info
infoJSON := Info{clientConnectURLs: make(map[string]struct{})}
err := json.Unmarshal(s.infoJSON[5:len(s.infoJSON)-2], &infoJSON) // Skip INFO
s.mu.Unlock()
if err != nil {
stackFatalf(t, "Error on Unmarshal: %v", err)
}
if info.Host != expectedHost || info.Port != expectedPort {
stackFatalf(t, "Expected host/port to be %s:%d, got %s:%d",
expectedHost, expectedPort, info.Host, info.Port)
}
// Check that server infoJSON was updated too
if !reflect.DeepEqual(info, infoJSON) {
stackFatalf(t, "Expected infoJSON to be %+v, got %+v", info, infoJSON)
}
}

// Update config with ClientAdvertise (port specified)
Expand Down
2 changes: 1 addition & 1 deletion server/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func (s *Server) sendAsyncInfoToClients() {
if c.opts.Protocol >= ClientProtoInfo && c.flags.isSet(firstPongSent) {
// sendInfo takes care of checking if the connection is still
// valid or not, so don't duplicate tests here.
c.sendInfo(s.infoJSON)
c.sendInfo(c.generateClientInfoJSON(s.info))
}
c.mu.Unlock()
}
Expand Down
Loading

0 comments on commit 17fecd4

Please sign in to comment.