Skip to content

Commit

Permalink
Merge d9a98e1 into b35e2b6
Browse files Browse the repository at this point in the history
  • Loading branch information
Waldemar Quevedo committed Jul 17, 2015
2 parents b35e2b6 + d9a98e1 commit 885264c
Show file tree
Hide file tree
Showing 2 changed files with 260 additions and 1 deletion.
63 changes: 62 additions & 1 deletion server/monitor.go
Expand Up @@ -8,6 +8,7 @@ import (
"net"
"net/http"
"runtime"
"sort"
"strconv"
"time"

Expand Down Expand Up @@ -46,6 +47,24 @@ type ConnInfo struct {
Subs []string `json:"subscriptions_list,omitempty"`
}

// Helper types to sort by ConnInfo values
type Pair struct {
Key int
Val int
}

type Pairs []Pair

func (d Pairs) Len() int {
return len(d)
}
func (d Pairs) Swap(i, j int) {
d[i], d[j] = d[j], d[i]
}
func (d Pairs) Less(i, j int) bool {
return d[i].Val < d[j].Val
}

const DefaultConnListSize = 1024

// HandleConnz process HTTP requests for connection information.
Expand All @@ -56,6 +75,8 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) {
subs, _ := strconv.Atoi(r.URL.Query().Get("subs"))
c.Offset, _ = strconv.Atoi(r.URL.Query().Get("offset"))
c.Limit, _ = strconv.Atoi(r.URL.Query().Get("limit"))
sortOpt := r.URL.Query().Get("sort")

if c.Limit == 0 {
c.Limit = DefaultConnListSize
}
Expand All @@ -64,8 +85,48 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) {
s.mu.Lock()
c.NumConns = len(s.clients)

// Filter key value pairs used for sorting in another structure
pairs := Pairs{}
switch sortOpt {
case "cid":
for i, c := range s.clients {
pairs = append(pairs, Pair{Key: int(i), Val: int(c.cid)})
}
case "subs":
for i, c := range s.clients {
pairs = append(pairs, Pair{Key: int(i), Val: int(c.subs.Count())})
}
case "msgs_to":
for i, c := range s.clients {
pairs = append(pairs, Pair{Key: int(i), Val: int(c.outMsgs)})
}
case "msgs_from":
for i, c := range s.clients {
pairs = append(pairs, Pair{Key: int(i), Val: int(c.inMsgs)})
}
case "bytes_to":
for i, c := range s.clients {
pairs = append(pairs, Pair{Key: int(i), Val: int(c.outBytes)})
}
case "bytes_from":
for i, c := range s.clients {
pairs = append(pairs, Pair{Key: int(i), Val: int(c.inBytes)})
}
default:
// Just get the unsorted keys
for i, _ := range s.clients {
pairs = append(pairs, Pair{Key: int(i)})
}
}

// Return in descending order
if len(pairs) > 0 {
sort.Sort(sort.Reverse(pairs))
}

i := 0
for _, client := range s.clients {
for _, pair := range pairs {
client := s.clients[uint64(pair.Key)]
if i >= c.Offset+c.Limit {
break
}
Expand Down
198 changes: 198 additions & 0 deletions server/monitor_test.go
Expand Up @@ -325,6 +325,204 @@ func TestConnzWithOffsetAndLimit(t *testing.T) {
}
}

func TestConnzSortedByCid(t *testing.T) {
s := runMonitorServer(DEFAULT_HTTP_PORT)
defer s.Shutdown()

clients := make([]*nats.Conn, 4)
for i, _ := range clients {
clients[i] = createClientConnSubscribeAndPublish(t)
defer clients[i].Close()
}

url := fmt.Sprintf("http://localhost:%d/", DEFAULT_HTTP_PORT)
resp, err := http.Get(url + "connz?sort=cid")
if err != nil {
t.Fatalf("Expected no error: Got %v\n", err)
}
if resp.StatusCode != 200 {
t.Fatalf("Expected a 200 response, got %d\n", resp.StatusCode)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Got an error reading the body: %v\n", err)
}

c := Connz{}
if err := json.Unmarshal(body, &c); err != nil {
t.Fatalf("Got an error unmarshalling the body: %v\n", err)
}

if c.Conns[0].Cid < c.Conns[1].Cid ||
c.Conns[1].Cid < c.Conns[2].Cid ||
c.Conns[2].Cid < c.Conns[3].Cid {
t.Fatalf("Expected conns sorted in descending order by cid, got %v < %v\n", c.Conns[0].Cid, c.Conns[3].Cid)
}
}

func TestConnzSortedByBytesAndMsgs(t *testing.T) {
s := runMonitorServer(DEFAULT_HTTP_PORT)
defer s.Shutdown()

// Create a connection and make it send more messages than others
firstClient := createClientConnSubscribeAndPublish(t)
for i := 0; i < 100; i++ {
firstClient.Publish("foo", []byte("Hello World"))
}
defer firstClient.Close()

clients := make([]*nats.Conn, 3)
for i, _ := range clients {
clients[i] = createClientConnSubscribeAndPublish(t)
defer clients[i].Close()
}

url := fmt.Sprintf("http://localhost:%d/", DEFAULT_HTTP_PORT)
resp, err := http.Get(url + "connz?sort=bytes_to")
if err != nil {
t.Fatalf("Expected no error: Got %v\n", err)
}
if resp.StatusCode != 200 {
t.Fatalf("Expected a 200 response, got %d\n", resp.StatusCode)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Got an error reading the body: %v\n", err)
}

c := Connz{}
if err := json.Unmarshal(body, &c); err != nil {
t.Fatalf("Got an error unmarshalling the body: %v\n", err)
}

if c.Conns[0].OutBytes < c.Conns[1].OutBytes ||
c.Conns[0].OutBytes < c.Conns[2].OutBytes ||
c.Conns[0].OutBytes < c.Conns[3].OutBytes {
t.Fatalf("Expected conns sorted in descending order by bytes from, got %v < one of [%v, %v, %v]\n",
c.Conns[0].OutBytes, c.Conns[1].OutBytes, c.Conns[2].OutBytes, c.Conns[3].OutBytes)
}

url = fmt.Sprintf("http://localhost:%d/", DEFAULT_HTTP_PORT)
resp, err = http.Get(url + "connz?sort=msgs_to")
if err != nil {
t.Fatalf("Expected no error: Got %v\n", err)
}
if resp.StatusCode != 200 {
t.Fatalf("Expected a 200 response, got %d\n", resp.StatusCode)
}
defer resp.Body.Close()
body, err = ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Got an error reading the body: %v\n", err)
}

c = Connz{}
if err := json.Unmarshal(body, &c); err != nil {
t.Fatalf("Got an error unmarshalling the body: %v\n", err)
}

if c.Conns[0].OutMsgs < c.Conns[1].OutMsgs ||
c.Conns[0].OutMsgs < c.Conns[2].OutMsgs ||
c.Conns[0].OutMsgs < c.Conns[3].OutMsgs {
t.Fatalf("Expected conns sorted in descending order by msgs from, got %v < one of [%v, %v, %v]\n",
c.Conns[0].OutMsgs, c.Conns[1].OutMsgs, c.Conns[2].OutMsgs, c.Conns[3].OutMsgs)
}

url = fmt.Sprintf("http://localhost:%d/", DEFAULT_HTTP_PORT)
resp, err = http.Get(url + "connz?sort=bytes_from")
if err != nil {
t.Fatalf("Expected no error: Got %v\n", err)
}
if resp.StatusCode != 200 {
t.Fatalf("Expected a 200 response, got %d\n", resp.StatusCode)
}
defer resp.Body.Close()
body, err = ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Got an error reading the body: %v\n", err)
}

c = Connz{}
if err := json.Unmarshal(body, &c); err != nil {
t.Fatalf("Got an error unmarshalling the body: %v\n", err)
}

if c.Conns[0].InBytes < c.Conns[1].InBytes ||
c.Conns[0].InBytes < c.Conns[2].InBytes ||
c.Conns[0].InBytes < c.Conns[3].InBytes {
t.Fatalf("Expected conns sorted in descending order by bytes from, got %v < one of [%v, %v, %v]\n",
c.Conns[0].InBytes, c.Conns[1].InBytes, c.Conns[2].InBytes, c.Conns[3].InBytes)
}

url = fmt.Sprintf("http://localhost:%d/", DEFAULT_HTTP_PORT)
resp, err = http.Get(url + "connz?sort=msgs_from")
if err != nil {
t.Fatalf("Expected no error: Got %v\n", err)
}
if resp.StatusCode != 200 {
t.Fatalf("Expected a 200 response, got %d\n", resp.StatusCode)
}
defer resp.Body.Close()
body, err = ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Got an error reading the body: %v\n", err)
}

c = Connz{}
if err := json.Unmarshal(body, &c); err != nil {
t.Fatalf("Got an error unmarshalling the body: %v\n", err)
}

if c.Conns[0].InMsgs < c.Conns[1].InMsgs ||
c.Conns[0].InMsgs < c.Conns[2].InMsgs ||
c.Conns[0].InMsgs < c.Conns[3].InMsgs {
t.Fatalf("Expected conns sorted in descending order by msgs from, got %v < one of [%v, %v, %v]\n",
c.Conns[0].InMsgs, c.Conns[1].InMsgs, c.Conns[2].InMsgs, c.Conns[3].InMsgs)
}
}

func TestConnzSortedBySubs(t *testing.T) {
s := runMonitorServer(DEFAULT_HTTP_PORT)
defer s.Shutdown()

firstClient := createClientConnSubscribeAndPublish(t)
firstClient.Subscribe("hello.world", func(m *nats.Msg) {})
clients := make([]*nats.Conn, 3)
for i, _ := range clients {
clients[i] = createClientConnSubscribeAndPublish(t)
defer clients[i].Close()
}
defer firstClient.Close()

url := fmt.Sprintf("http://localhost:%d/", DEFAULT_HTTP_PORT)
resp, err := http.Get(url + "connz?sort=subs")
if err != nil {
t.Fatalf("Expected no error: Got %v\n", err)
}
if resp.StatusCode != 200 {
t.Fatalf("Expected a 200 response, got %d\n", resp.StatusCode)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Got an error reading the body: %v\n", err)
}

c := Connz{}
if err := json.Unmarshal(body, &c); err != nil {
t.Fatalf("Got an error unmarshalling the body: %v\n", err)
}

if c.Conns[0].NumSubs < c.Conns[1].NumSubs ||
c.Conns[0].NumSubs < c.Conns[2].NumSubs ||
c.Conns[0].NumSubs < c.Conns[3].NumSubs {
t.Fatalf("Expected conns sorted in descending order by number of subs, got %v < one of [%v, %v, %v]\n",
c.Conns[0].NumSubs, c.Conns[1].NumSubs, c.Conns[2].NumSubs, c.Conns[3].NumSubs)
}
}

func TestConnzWithRoutes(t *testing.T) {
s := runMonitorServer(DEFAULT_HTTP_PORT)
defer s.Shutdown()
Expand Down

0 comments on commit 885264c

Please sign in to comment.