Skip to content

Commit

Permalink
Merge pull request #976 from nats-io/fix_974
Browse files Browse the repository at this point in the history
[ADDED] In/Out Msgs/Bytes in /serverz monitoring endpoint
  • Loading branch information
kozlovic committed Nov 25, 2019
2 parents 4fc65ea + 08cc501 commit b95714e
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 0 deletions.
9 changes: 9 additions & 0 deletions server/monitor.go
Expand Up @@ -21,6 +21,7 @@ import (
"runtime"
"sort"
"strconv"
"sync/atomic"
"time"

natsd "github.com/nats-io/nats-server/v2/server"
Expand Down Expand Up @@ -55,6 +56,10 @@ type Serverz struct {
Channels int `json:"channels"`
TotalMsgs int `json:"total_msgs"`
TotalBytes uint64 `json:"total_bytes"`
InMsgs int64 `json:"in_msgs"`
InBytes int64 `json:"in_bytes"`
OutMsgs int64 `json:"out_msgs"`
OutBytes int64 `json:"out_bytes"`
OpenFDs int `json:"open_fds,omitempty"`
MaxFDs int `json:"max_fds,omitempty"`
}
Expand Down Expand Up @@ -230,6 +235,10 @@ func (s *StanServer) handleServerz(w http.ResponseWriter, r *http.Request) {
Subscriptions: numSubs,
TotalMsgs: count,
TotalBytes: bytes,
InMsgs: atomic.LoadInt64(&s.stats.inMsgs),
InBytes: atomic.LoadInt64(&s.stats.inBytes),
OutMsgs: atomic.LoadInt64(&s.stats.outMsgs),
OutBytes: atomic.LoadInt64(&s.stats.outBytes),
OpenFDs: fds,
MaxFDs: maxFDs,
}
Expand Down
64 changes: 64 additions & 0 deletions server/monitor_test.go
Expand Up @@ -24,6 +24,7 @@ import (
"reflect"
"runtime"
"strings"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -1326,3 +1327,66 @@ func TestMonitorNumSubs(t *testing.T) {
dur.Unsubscribe()
checkNumSubs(t, 0)
}

func TestMonitorInOutMsgs(t *testing.T) {
resetPreviousHTTPConnections()
s := runMonitorServer(t, GetDefaultOptions())
defer s.Shutdown()

sc := NewDefaultConnection(t)
defer sc.Close()

mch := make(chan *stan.Msg, 5)
count := int32(0)
if _, err := sc.Subscribe("foo", func(m *stan.Msg) {
atomic.AddInt32(&count, 1)
if !m.Redelivered {
select {
case mch <- m:
default:
}
}
},
stan.SetManualAckMode(),
stan.AckWait(ackWaitInMs(500))); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

if _, err := sc.Subscribe("foo", func(_ *stan.Msg) {
atomic.AddInt32(&count, 1)
}); err != nil {
t.Fatalf("Error on subscribe: %v", err)
}

for i := 0; i < 5; i++ {
sc.Publish("foo", []byte("msg"))
}

resp, body := getBody(t, ServerPath, expectedJSON)
resp.Body.Close()
sz := Serverz{}
if err := json.Unmarshal(body, &sz); err != nil {
t.Fatalf("Got an error unmarshalling the body: %v", err)
}
if sz.InMsgs != 5 || sz.InBytes == 0 {
t.Fatalf("Expected 5 inbound messages, got %v - %v", sz.InMsgs, sz.InBytes)
}
if sz.OutMsgs != 10 || sz.OutBytes == 0 {
t.Fatalf("Expected 10 outbound messages, got %v - %v", sz.InMsgs, sz.InBytes)
}

time.Sleep(700 * time.Millisecond)

resp, body = getBody(t, ServerPath, expectedJSON)
resp.Body.Close()
sz = Serverz{}
if err := json.Unmarshal(body, &sz); err != nil {
t.Fatalf("Got an error unmarshalling the body: %v", err)
}
if sz.InMsgs != 5 || sz.InBytes == 0 {
t.Fatalf("Expected 5 inbound messages, got %v - %v", sz.InMsgs, sz.InBytes)
}
if sz.OutMsgs != 15 || sz.OutBytes == 0 {
t.Fatalf("Expected 15 outbound messages, got %v - %v", sz.InMsgs, sz.InBytes)
}
}
10 changes: 10 additions & 0 deletions server/server.go
Expand Up @@ -633,6 +633,12 @@ type StanServer struct {
// atomic.* functions crash on 32bit machines if operand is not aligned
// at 64bit. See https://github.com/golang/go/issues/599
ioChannelStatsMaxBatchSize int64 // stats of the max number of messages than went into a single batch
stats struct {
inMsgs int64
inBytes int64
outMsgs int64
outBytes int64
}

mu sync.RWMutex
shutdown bool
Expand Down Expand Up @@ -3152,6 +3158,8 @@ func (s *StanServer) processClientPublish(m *nats.Msg) {
}
// else we will report an error below...
}
atomic.AddInt64(&s.stats.inMsgs, 1)
atomic.AddInt64(&s.stats.inBytes, int64(len(m.Data)))

// Make sure we have a guid and valid channel name.
if pm.Guid == "" || !util.IsChannelNameValid(pm.Subject, false) {
Expand Down Expand Up @@ -3866,6 +3874,8 @@ func (s *StanServer) sendMsgToSub(sub *subState, m *pb.MsgProto, force bool) (bo
sub.ClientID, sub.ID, m.Subject, m.Sequence, err)
return false, false
}
atomic.AddInt64(&s.stats.outMsgs, 1)
atomic.AddInt64(&s.stats.outBytes, int64(len(b)))

// Setup the ackTimer as needed now. I don't want to use defer in this
// function, and want to make sure that if we exit before the end, the
Expand Down

0 comments on commit b95714e

Please sign in to comment.