Skip to content

Commit

Permalink
Support for RTT - #643
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Jun 21, 2018
1 parent 760e41d commit 7e28af2
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 28 deletions.
40 changes: 28 additions & 12 deletions server/client.go
Expand Up @@ -116,19 +116,27 @@ type client struct {
in readCache
pcd map[*client]struct{}
atmr *time.Timer
ptmr *time.Timer
pout int
ping pinfo
msgb [msgScratchSize]byte
last time.Time
parseState

rtt time.Duration
rttStart time.Time

route *route
debug bool
trace bool

flags clientFlag // Compact booleans into a single field. Size will be increased when needed.
}

// Struct for PING initiation from the server.
type pinfo struct {
tmr *time.Timer
out int
}

// outbound holds pending data for a socket.
type outbound struct {
p []byte // Primary write buffer
Expand Down Expand Up @@ -818,7 +826,8 @@ func (c *client) sendPong() {

// Assume the lock is held upon entry.
func (c *client) sendPing() {
c.pout++
c.rttStart = time.Now()
c.ping.out++
c.traceOutOp("PING", nil)
c.sendProto([]byte("PING\r\n"), true)
}
Expand Down Expand Up @@ -898,7 +907,8 @@ func (c *client) processPing() {
func (c *client) processPong() {
c.traceInOp("PONG", nil)
c.mu.Lock()
c.pout = 0
c.ping.out = 0
c.rtt = time.Since(c.rttStart)
c.mu.Unlock()
}

Expand Down Expand Up @@ -1483,7 +1493,7 @@ func (c *client) pubPermissionViolation(subject []byte) {
func (c *client) processPingTimer() {
c.mu.Lock()
defer c.mu.Unlock()
c.ptmr = nil
c.ping.tmr = nil
// Check if connection is still opened
if c.nc == nil {
return
Expand All @@ -1492,15 +1502,21 @@ func (c *client) processPingTimer() {
c.Debugf("%s Ping Timer", c.typeString())

// Check for violation
if c.pout+1 > c.srv.getOpts().MaxPingsOut {
if c.ping.out+1 > c.srv.getOpts().MaxPingsOut {
c.Debugf("Stale Client Connection - Closing")
c.sendProto([]byte(fmt.Sprintf("-ERR '%s'\r\n", "Stale Connection")), true)
c.clearConnection()
return
}

// Send PING
c.sendPing()
// If we have had activity within the PingInterval no
// need to send a ping.
if delta := time.Since(c.last); delta < c.srv.getOpts().PingInterval {
c.Debugf("Delaying PING due to activity %v ago", delta.Round(time.Second))
} else {
// Send PING
c.sendPing()
}

// Reset to fire again.
c.setPingTimer()
Expand All @@ -1512,16 +1528,16 @@ func (c *client) setPingTimer() {
return
}
d := c.srv.getOpts().PingInterval
c.ptmr = time.AfterFunc(d, c.processPingTimer)
c.ping.tmr = time.AfterFunc(d, c.processPingTimer)
}

// Lock should be held
func (c *client) clearPingTimer() {
if c.ptmr == nil {
if c.ping.tmr == nil {
return
}
c.ptmr.Stop()
c.ptmr = nil
c.ping.tmr.Stop()
c.ping.tmr = nil
}

// Lock should be held
Expand Down
21 changes: 21 additions & 0 deletions server/monitor.go
Expand Up @@ -73,6 +73,7 @@ type ConnInfo struct {
Port int `json:"port"`
Start time.Time `json:"start"`
LastActivity time.Time `json:"last_activity"`
RTT string `json:"rtt,omitempty"`
Uptime string `json:"uptime"`
Idle string `json:"idle"`
Pending int `json:"pending_bytes"`
Expand Down Expand Up @@ -218,6 +219,7 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
ci.LastActivity = client.last
ci.Uptime = myUptime(c.Now.Sub(client.start))
ci.Idle = myUptime(c.Now.Sub(client.last))
ci.RTT = client.getRTT()
ci.OutMsgs = client.outMsgs
ci.OutBytes = client.outBytes
ci.NumSubs = uint32(len(client.subs))
Expand Down Expand Up @@ -289,6 +291,25 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
return c, nil
}

// Assume lock is held
func (c *client) getRTT() string {
if c.rtt == 0 {
// If a real client, go ahead and send ping now to get a value
// for RTT. For tests and telnet, etc skip.
if c.flags.isSet(connectReceived) && c.opts.Lang != "" {
c.sendPing()
}
return ""
}
var rtt time.Duration
if c.rtt > time.Microsecond && c.rtt < time.Millisecond {
rtt = c.rtt.Truncate(time.Microsecond)
} else {
rtt = c.rtt.Truncate(time.Millisecond)
}
return fmt.Sprintf("%v", rtt)
}

func decodeInt(w http.ResponseWriter, r *http.Request, param string) (int, error) {
str := r.URL.Query().Get(param)
if str == "" {
Expand Down
65 changes: 61 additions & 4 deletions server/monitor_test.go
Expand Up @@ -309,19 +309,22 @@ func TestConnz(t *testing.T) {
t.Fatalf("Expected OutBytes of 1, got %v\n", ci.OutBytes)
}
if ci.Start.IsZero() {
t.Fatalf("Expected Start to be valid\n")
t.Fatal("Expected Start to be valid\n")
}
if ci.Uptime == "" {
t.Fatalf("Expected Uptime to be valid\n")
t.Fatal("Expected Uptime to be valid\n")
}
if ci.LastActivity.IsZero() {
t.Fatalf("Expected LastActivity to be valid\n")
t.Fatal("Expected LastActivity to be valid\n")
}
if ci.LastActivity.UnixNano() < ci.Start.UnixNano() {
t.Fatalf("Expected LastActivity [%v] to be > Start [%v]\n", ci.LastActivity, ci.Start)
}
if ci.Idle == "" {
t.Fatalf("Expected Idle to be valid\n")
t.Fatal("Expected Idle to be valid\n")
}
if ci.RTT != "" {
t.Fatal("Expected RTT to NOT be set for new connection\n")
}
}

Expand Down Expand Up @@ -387,6 +390,60 @@ func ensureServerActivityRecorded(t *testing.T, nc *nats.Conn) {
}
}

func TestConnzRTT(t *testing.T) {
s := runMonitorServer()
defer s.Shutdown()

url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().Port)

testRTT := func(mode int) {
// Test with connections.
nc := createClientConnSubscribeAndPublish(t, s)
defer nc.Close()

c := pollConz(t, s, mode, url+"connz", nil)

if c.NumConns != 1 {
t.Fatalf("Expected 1 connection, got %d\n", c.NumConns)
}

// Send a server side PING to record RTT
s.mu.Lock()
ci := c.Conns[0]
sc := s.clients[ci.Cid]
if sc == nil {
t.Fatalf("Error looking up client %v\n", ci.Cid)
}
s.mu.Unlock()
sc.mu.Lock()
sc.sendPing()
sc.mu.Unlock()

// Wait for client to respond with PONG
time.Sleep(20 * time.Millisecond)

// Repoll for updated information.
c = pollConz(t, s, mode, url+"connz", nil)
ci = c.Conns[0]

rtt, err := time.ParseDuration(ci.RTT)
if err != nil {
t.Fatalf("Could not parse RTT properly, %v", err)
}
if rtt <= 0 {
t.Fatal("Expected RTT to be valid and non-zero\n")
}
if rtt > 5*time.Millisecond || rtt < 100*time.Nanosecond {
t.Fatalf("Invalid RTT of %s\n", ci.RTT)
}
}

for mode := 0; mode < 2; mode++ {
testRTT(mode)
waitForClientConnCount(t, s, 0)
}
}

func TestConnzLastActivity(t *testing.T) {
s := runMonitorServer()
defer s.Shutdown()
Expand Down
18 changes: 9 additions & 9 deletions server/parser_test.go
Expand Up @@ -104,15 +104,15 @@ func TestParsePong(t *testing.T) {
if err != nil || c.state != OP_START {
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
}
if c.pout != 0 {
t.Fatalf("Unexpected pout value: %d vs 0\n", c.pout)
if c.ping.out != 0 {
t.Fatalf("Unexpected ping.out value: %d vs 0\n", c.ping.out)
}
err = c.parse(pong)
if err != nil || c.state != OP_START {
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
}
if c.pout != 0 {
t.Fatalf("Unexpected pout value: %d vs 0\n", c.pout)
if c.ping.out != 0 {
t.Fatalf("Unexpected ping.out value: %d vs 0\n", c.ping.out)
}
// Should tolerate spaces
pong = []byte("PONG \r")
Expand All @@ -126,20 +126,20 @@ func TestParsePong(t *testing.T) {
if err != nil || c.state != OP_START {
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
}
if c.pout != 0 {
t.Fatalf("Unexpected pout value: %d vs 0\n", c.pout)
if c.ping.out != 0 {
t.Fatalf("Unexpected ping.out value: %d vs 0\n", c.ping.out)
}

// Should be adjusting c.pout (Pings Outstanding): reset to 0
c.state = OP_START
c.pout = 10
c.ping.out = 10
pong = []byte("PONG\r\n")
err = c.parse(pong)
if err != nil || c.state != OP_START {
t.Fatalf("Unexpected: %d : %v\n", c.state, err)
}
if c.pout != 0 {
t.Fatalf("Unexpected pout: %d vs 0\n", c.pout)
if c.ping.out != 0 {
t.Fatalf("Unexpected ping.out: %d vs 0\n", c.ping.out)
}
}

Expand Down
4 changes: 1 addition & 3 deletions test/monitor_test.go
Expand Up @@ -102,7 +102,7 @@ func TestNoMonitorPort(t *testing.T) {

// testEndpointDataRace tests a monitoring endpoint for data races by polling
// while client code acts to ensure statistics are updated. It is designed to
// run under the -race flag to catch violations. The caller must start the
// run under the -race flag to catch violations. The caller must start the
// NATS server.
func testEndpointDataRace(endpoint string, t *testing.T) {
var doneWg sync.WaitGroup
Expand Down Expand Up @@ -653,8 +653,6 @@ func TestHTTPHost(t *testing.T) {
// Create a connection to test ConnInfo
func createClientConnSubscribeAndPublish(t *testing.T) net.Conn {
cl := createClientConn(t, "localhost", CLIENT_PORT)

sendCommand(t, cl)
send, expect := setupConn(t, cl)
expectMsgs := expectMsgsCommand(t, expect)

Expand Down

0 comments on commit 7e28af2

Please sign in to comment.