diff --git a/server/client.go b/server/client.go index 93ded2aa4d0..4c8e7407806 100644 --- a/server/client.go +++ b/server/client.go @@ -18,6 +18,7 @@ import ( "crypto/tls" "encoding/json" "fmt" + "io" "math/rand" "net" "sync" @@ -99,6 +100,33 @@ func (cf *clientFlag) setIfNotSet(c clientFlag) bool { return false } +// Reason client was closed. This will be passed into +// calls to clearConnection, but will only be stored +// in ConnInfo for monitoring. +type ClosedState int + +const ( + ClientClosed = ClosedState(iota + 1) + AuthenticationTimeout + AuthenticationViolation + TLSHandshakeError + SlowConsumerPendingBytes + SlowConsumerWriteDeadline + WriteError + ReadError + ParseError + StaleConnection + ProtocolViolation + BadClientProtocolVersion + WrongPort + MaxConnectionsExceeded + MaxPayloadExceeded + MaxControlLineExceeded + DuplicateRoute + RouteRemoved + ServerShutdown +) + type client struct { // Here first because of use of atomics, and memory alignment. stats @@ -357,7 +385,11 @@ func (c *client) readLoop() { for { n, err := nc.Read(b) if err != nil { - c.closeConnection() + if err == io.EOF { + c.closeConnection(ClientClosed) + } else { + c.closeConnection(ReadError) + } return } @@ -375,7 +407,7 @@ func (c *client) readLoop() { // handled inline if err != ErrMaxPayload && err != ErrAuthorization { c.Errorf("%s", err.Error()) - c.closeConnection() + c.closeConnection(ProtocolViolation) } return } @@ -530,11 +562,12 @@ func (c *client) flushOutbound() bool { if n == 0 { c.out.pb -= attempted } - c.clearConnection() if ne, ok := err.(net.Error); ok && ne.Timeout() { atomic.AddInt64(&srv.slowConsumers, 1) + c.clearConnection(SlowConsumerWriteDeadline) c.Noticef("Slow Consumer Detected: WriteDeadline of %v Exceeded", c.out.wdl) } else { + c.clearConnection(WriteError) c.Debugf("Error flushing: %v", err) } return true @@ -627,7 +660,7 @@ func (c *client) processErr(errStr string) { case ROUTER: c.Errorf("Route Error %s", errStr) } - c.closeConnection() + c.closeConnection(ParseError) } func (c *client) processConnect(arg []byte) error { @@ -689,13 +722,13 @@ func (c *client) processConnect(arg []byte) error { // Check client protocol request if it exists. if typ == CLIENT && (proto < ClientProtoZero || proto > ClientProtoInfo) { c.sendErr(ErrBadClientProtocol.Error()) - c.closeConnection() + c.closeConnection(BadClientProtocolVersion) return ErrBadClientProtocol } else if typ == ROUTER && lang != "" { // Way to detect clients that incorrectly connect to the route listen // port. Client provide Lang in the CONNECT protocol while ROUTEs don't. c.sendErr(ErrClientConnectedToRoutePort.Error()) - c.closeConnection() + c.closeConnection(WrongPort) return ErrClientConnectedToRoutePort } @@ -715,7 +748,7 @@ func (c *client) processConnect(arg []byte) error { func (c *client) authTimeout() { c.sendErr(ErrAuthTimeout.Error()) c.Debugf("Authorization Timeout") - c.closeConnection() + c.closeConnection(AuthenticationTimeout) } func (c *client) authViolation() { @@ -727,19 +760,19 @@ func (c *client) authViolation() { c.Errorf(ErrAuthorization.Error()) } c.sendErr("Authorization Violation") - c.closeConnection() + c.closeConnection(AuthenticationViolation) } func (c *client) maxConnExceeded() { c.Errorf(ErrTooManyConnections.Error()) c.sendErr(ErrTooManyConnections.Error()) - c.closeConnection() + c.closeConnection(MaxConnectionsExceeded) } func (c *client) maxPayloadViolation(sz int, max int64) { c.Errorf("%s: %d vs %d", ErrMaxPayload.Error(), sz, max) c.sendErr("Maximum Payload Violation") - c.closeConnection() + c.closeConnection(MaxPayloadExceeded) } // queueOutbound queues data for client/route connections. @@ -752,7 +785,7 @@ func (c *client) queueOutbound(data []byte) { // Check for slow consumer via pending bytes limit. // ok to return here, client is going away. if c.out.pb > c.out.mp { - c.clearConnection() + c.clearConnection(SlowConsumerPendingBytes) atomic.AddInt64(&c.srv.slowConsumers, 1) c.Noticef("Slow Consumer Detected: MaxPending of %d Exceeded", c.out.mp) return @@ -1517,7 +1550,7 @@ func (c *client) processPingTimer() { if c.ping.out+1 > c.srv.getOpts().MaxPingsOut { c.Debugf("Stale Client Connection - Closing") c.sendProto([]byte(fmt.Sprintf("-ERR '%s'\r\n", "Stale Connection")), true) - c.clearConnection() + c.clearConnection(StaleConnection) return } @@ -1575,11 +1608,12 @@ func (c *client) isAuthTimerSet() bool { } // Lock should be held -func (c *client) clearConnection() { +func (c *client) clearConnection(reason ClosedState) { if c.flags.isSet(clearConnection) { return } c.flags.set(clearConnection) + nc := c.nc if nc == nil || c.srv == nil { return @@ -1599,6 +1633,11 @@ func (c *client) clearConnection() { nc.Close() // Do this always to also kick out any IO writes. nc.SetWriteDeadline(time.Time{}) + + // Save off the connection if its a client. + if c.typ == CLIENT && c.srv != nil { + go c.srv.saveClosedClient(c, nc, reason) + } } func (c *client) typeString() string { @@ -1611,7 +1650,7 @@ func (c *client) typeString() string { return "Unknown Type" } -func (c *client) closeConnection() { +func (c *client) closeConnection(reason ClosedState) { c.mu.Lock() if c.nc == nil { c.mu.Unlock() @@ -1622,7 +1661,7 @@ func (c *client) closeConnection() { c.clearAuthTimer() c.clearPingTimer() - c.clearConnection() + c.clearConnection(reason) c.nc = nil // Snapshot for use. diff --git a/server/client_test.go b/server/client_test.go index f867d36df01..72cb8192c60 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -549,7 +549,7 @@ func TestClientRemoveSubsOnDisconnect(t *testing.T) { if s.sl.Count() != 2 { t.Fatalf("Should have 2 subscriptions, got %d\n", s.sl.Count()) } - c.closeConnection() + c.closeConnection(ClientClosed) if s.sl.Count() != 0 { t.Fatalf("Should have no subscriptions after close, got %d\n", s.sl.Count()) } @@ -557,7 +557,7 @@ func TestClientRemoveSubsOnDisconnect(t *testing.T) { func TestClientDoesNotAddSubscriptionsWhenConnectionClosed(t *testing.T) { s, c, _ := setupClient() - c.closeConnection() + c.closeConnection(ClientClosed) subs := []byte("SUB foo 1\r\nSUB bar 2\r\n") ch := make(chan bool) @@ -767,7 +767,7 @@ func TestTLSCloseClientConnection(t *testing.T) { } }() // Close the client - cli.closeConnection() + cli.closeConnection(ClientClosed) ch <- true } diff --git a/server/closed_conns_test.go b/server/closed_conns_test.go new file mode 100644 index 00000000000..24a451d1f24 --- /dev/null +++ b/server/closed_conns_test.go @@ -0,0 +1,391 @@ +// Copyright 2018 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "fmt" + "net" + "strings" + "testing" + "time" + + nats "github.com/nats-io/go-nats" +) + +func closedConnsEqual(s *Server, num int, wait time.Duration) bool { + end := time.Now().Add(wait) + for time.Now().Before(end) { + if s.numClosedConns() == num { + break + } + time.Sleep(5 * time.Millisecond) + } + n := s.numClosedConns() + return n == num +} + +func totalClosedConnsEqual(s *Server, num uint64, wait time.Duration) bool { + end := time.Now().Add(wait) + for time.Now().Before(end) { + if s.totalClosedConns() == num { + break + } + time.Sleep(5 * time.Millisecond) + } + n := s.totalClosedConns() + return n == num +} + +func TestClosedConnsAccounting(t *testing.T) { + opts := DefaultOptions() + opts.MaxClosedClients = 10 + + s := RunServer(opts) + defer s.Shutdown() + + wait := 20 * time.Millisecond + + nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + nc.Close() + + if !closedConnsEqual(s, 1, wait) { + t.Fatalf("Closed conns expected to be 1, got %d\n", s.numClosedConns()) + } + + conns := s.closedClients() + if lc := len(conns); lc != 1 { + t.Fatalf("len(conns) expected to be %d, got %d\n", 1, lc) + } + if conns[0].Cid != 1 { + t.Fatalf("Expected CID to be 1, got %d\n", conns[0].Cid) + } + + // Now create 21 more + for i := 0; i < 21; i++ { + nc, err = nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + nc.Close() + } + + if !closedConnsEqual(s, opts.MaxClosedClients, wait) { + t.Fatalf("Closed conns expected to be %d, got %d\n", + opts.MaxClosedClients, + s.numClosedConns()) + } + + if !totalClosedConnsEqual(s, 22, wait) { + t.Fatalf("Closed conns expected to be 22, got %d\n", + s.numClosedConns()) + } + + conns = s.closedClients() + if lc := len(conns); lc != opts.MaxClosedClients { + t.Fatalf("len(conns) expected to be %d, got %d\n", + opts.MaxClosedClients, lc) + } + + // Set it to the start after overflow. + cid := uint64(22 - opts.MaxClosedClients) + for _, ci := range conns { + cid++ + if ci.Cid != cid { + t.Fatalf("Expected cid of %d, got %d\n", cid, ci.Cid) + } + } +} + +func TestClosedConnsSubsAccounting(t *testing.T) { + opts := DefaultOptions() + s := RunServer(opts) + defer s.Shutdown() + + url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port) + + nc, err := nats.Connect(url) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + + // Now create some subscriptions + numSubs := 10 + for i := 0; i < numSubs; i++ { + subj := fmt.Sprintf("foo.%d", i) + nc.Subscribe(subj, func(m *nats.Msg) {}) + } + nc.Flush() + nc.Close() + + if !closedConnsEqual(s, 1, 20*time.Millisecond) { + t.Fatalf("Closed conns expected to be 1, got %d\n", + s.numClosedConns()) + } + conns := s.closedClients() + if lc := len(conns); lc != 1 { + t.Fatalf("len(conns) expected to be 1, got %d\n", lc) + } + ci := conns[0] + + if len(ci.subs) != numSubs { + t.Fatalf("Expected number of Subs to be %d, got %d\n", numSubs, len(ci.subs)) + } +} + +func checkReason(t *testing.T, reason string, expected ClosedState) { + if !strings.Contains(reason, expected.String()) { + t.Fatalf("Expected closed connection with `%s` state, got `%s`\n", + expected, reason) + } +} + +func TestClosedAuthorizationTimeout(t *testing.T) { + serverOptions := DefaultOptions() + serverOptions.Authorization = "my_token" + serverOptions.AuthTimeout = 0.4 + s := RunServer(serverOptions) + defer s.Shutdown() + + conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", serverOptions.Host, serverOptions.Port)) + if err != nil { + t.Fatalf("Error dialing server: %v\n", err) + } + defer conn.Close() + + if !closedConnsEqual(s, 1, 2*time.Second) { + t.Fatalf("Closed conns expected to be 1, got %d\n", s.numClosedConns()) + } + conns := s.closedClients() + if lc := len(conns); lc != 1 { + t.Fatalf("len(conns) expected to be %d, got %d\n", 1, lc) + } + checkReason(t, conns[0].Reason, AuthenticationTimeout) +} + +func TestClosedAuthorizationViolation(t *testing.T) { + serverOptions := DefaultOptions() + serverOptions.Authorization = "my_token" + s := RunServer(serverOptions) + defer s.Shutdown() + + opts := s.getOpts() + url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port) + + nc, err := nats.Connect(url) + if err == nil { + nc.Close() + t.Fatal("Expected failure for connection") + } + + if !closedConnsEqual(s, 1, 2*time.Second) { + t.Fatalf("Closed conns expected to be 1, got %d\n", s.numClosedConns()) + } + conns := s.closedClients() + if lc := len(conns); lc != 1 { + t.Fatalf("len(conns) expected to be %d, got %d\n", 1, lc) + } + checkReason(t, conns[0].Reason, AuthenticationViolation) +} + +func TestClosedUPAuthorizationViolation(t *testing.T) { + serverOptions := DefaultOptions() + serverOptions.Username = "my_user" + serverOptions.Password = "my_secret" + s := RunServer(serverOptions) + defer s.Shutdown() + + opts := s.getOpts() + url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port) + + nc, err := nats.Connect(url) + if err == nil { + nc.Close() + t.Fatal("Expected failure for connection") + } + + url2 := fmt.Sprintf("nats://my_user:wrong_pass@%s:%d", opts.Host, opts.Port) + nc, err = nats.Connect(url2) + if err == nil { + nc.Close() + t.Fatal("Expected failure for connection") + } + + if !closedConnsEqual(s, 2, 2*time.Second) { + t.Fatalf("Closed conns expected to be 2, got %d\n", s.numClosedConns()) + } + conns := s.closedClients() + if lc := len(conns); lc != 2 { + t.Fatalf("len(conns) expected to be %d, got %d\n", 2, lc) + } + checkReason(t, conns[0].Reason, AuthenticationViolation) + checkReason(t, conns[1].Reason, AuthenticationViolation) +} + +func TestClosedMaxPayload(t *testing.T) { + serverOptions := DefaultOptions() + serverOptions.MaxPayload = 100 + + s := RunServer(serverOptions) + defer s.Shutdown() + + opts := s.getOpts() + endpoint := fmt.Sprintf("%s:%d", opts.Host, opts.Port) + + conn, err := net.DialTimeout("tcp", endpoint, time.Second) + if err != nil { + t.Fatalf("Could not make a raw connection to the server: %v", err) + } + defer conn.Close() + + // This should trigger it. + pub := fmt.Sprintf("PUB foo.bar 1024\r\n") + conn.Write([]byte(pub)) + + if !closedConnsEqual(s, 1, 2*time.Second) { + t.Fatalf("Closed conns expected to be 1, got %d\n", s.numClosedConns()) + } + conns := s.closedClients() + if lc := len(conns); lc != 1 { + t.Fatalf("len(conns) expected to be %d, got %d\n", 1, lc) + } + checkReason(t, conns[0].Reason, MaxPayloadExceeded) +} + +func TestClosedSlowConsumerWriteDeadline(t *testing.T) { + opts := DefaultOptions() + opts.WriteDeadline = 10 * time.Millisecond // Make very small to trip. + opts.MaxPending = 500 * 1024 * 1024 // Set high so it will not trip here. + s := RunServer(opts) + defer s.Shutdown() + + c, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", opts.Host, opts.Port), 3*time.Second) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer c.Close() + if _, err := c.Write([]byte("CONNECT {}\r\nPING\r\nSUB foo 1\r\n")); err != nil { + t.Fatalf("Error sending protocols to server: %v", err) + } + // Reduce socket buffer to increase reliability of data backing up in the server destined + // for our subscribed client. + c.(*net.TCPConn).SetReadBuffer(128) + + url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port) + sender, err := nats.Connect(url) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer sender.Close() + + payload := make([]byte, 1024*1024) + for i := 0; i < 100; i++ { + if err := sender.Publish("foo", payload); err != nil { + t.Fatalf("Error on publish: %v", err) + } + } + + // Flush sender connection to ensure that all data has been sent. + if err := sender.Flush(); err != nil { + t.Fatalf("Error on flush: %v", err) + } + + // At this point server should have closed connection c. + if !closedConnsEqual(s, 1, 2*time.Second) { + t.Fatalf("Closed conns expected to be 1, got %d\n", s.numClosedConns()) + } + conns := s.closedClients() + if lc := len(conns); lc != 1 { + t.Fatalf("len(conns) expected to be %d, got %d\n", 1, lc) + } + checkReason(t, conns[0].Reason, SlowConsumerWriteDeadline) +} + +func TestClosedSlowConsumerPendingBytes(t *testing.T) { + opts := DefaultOptions() + opts.WriteDeadline = 30 * time.Second // Wait for long time so write deadline does not trigger slow consumer. + opts.MaxPending = 1 * 1024 * 1024 // Set to low value (1MB) to allow SC to trip. + s := RunServer(opts) + defer s.Shutdown() + + c, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", opts.Host, opts.Port), 3*time.Second) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer c.Close() + if _, err := c.Write([]byte("CONNECT {}\r\nPING\r\nSUB foo 1\r\n")); err != nil { + t.Fatalf("Error sending protocols to server: %v", err) + } + // Reduce socket buffer to increase reliability of data backing up in the server destined + // for our subscribed client. + c.(*net.TCPConn).SetReadBuffer(128) + + url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port) + sender, err := nats.Connect(url) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer sender.Close() + + payload := make([]byte, 1024*1024) + for i := 0; i < 100; i++ { + if err := sender.Publish("foo", payload); err != nil { + t.Fatalf("Error on publish: %v", err) + } + } + + // Flush sender connection to ensure that all data has been sent. + if err := sender.Flush(); err != nil { + t.Fatalf("Error on flush: %v", err) + } + + // At this point server should have closed connection c. + if !closedConnsEqual(s, 1, 2*time.Second) { + t.Fatalf("Closed conns expected to be 1, got %d\n", s.numClosedConns()) + } + conns := s.closedClients() + if lc := len(conns); lc != 1 { + t.Fatalf("len(conns) expected to be %d, got %d\n", 1, lc) + } + checkReason(t, conns[0].Reason, SlowConsumerPendingBytes) +} + +func TestClosedTLSHandshake(t *testing.T) { + opts, err := ProcessConfigFile("./configs/tls.conf") + if err != nil { + t.Fatalf("Error processing config file: %v", err) + } + opts.TLSVerify = true + opts.NoLog = true + opts.NoSigs = true + s := RunServer(opts) + defer s.Shutdown() + + nc, err := nats.Connect(fmt.Sprintf("tls://%s:%d", opts.Host, opts.Port)) + if err == nil { + nc.Close() + t.Fatal("Expected failure for connection") + } + + if !closedConnsEqual(s, 1, 2*time.Second) { + t.Fatalf("Closed conns expected to be 1, got %d\n", s.numClosedConns()) + } + conns := s.closedClients() + if lc := len(conns); lc != 1 { + t.Fatalf("len(conns) expected to be %d, got %d\n", 1, lc) + } + checkReason(t, conns[0].Reason, TLSHandshakeError) +} diff --git a/server/const.go b/server/const.go index 5112230f728..bb4a30127e9 100644 --- a/server/const.go +++ b/server/const.go @@ -112,4 +112,7 @@ const ( // DEFAULT_REMOTE_QSUBS_SWEEPER DEFAULT_REMOTE_QSUBS_SWEEPER = 30 * time.Second + + // DEFAULT_MAX_CLOSED_CLIENTS + DEFAULT_MAX_CLOSED_CLIENTS = 10000 ) diff --git a/server/monitor.go b/server/monitor.go index 62782c2a474..4fb971ee21d 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -22,6 +22,7 @@ import ( "runtime" "sort" "strconv" + "strings" "sync/atomic" "time" @@ -37,13 +38,13 @@ func init() { // Connz represents detailed information on current client connections. type Connz struct { - ID string `json:"server_id"` - Now time.Time `json:"now"` - NumConns int `json:"num_connections"` - Total int `json:"total"` - Offset int `json:"offset"` - Limit int `json:"limit"` - Conns []ConnInfo `json:"connections"` + ID string `json:"server_id"` + Now time.Time `json:"now"` + NumConns int `json:"num_connections"` + Total int `json:"total"` + Offset int `json:"offset"` + Limit int `json:"limit"` + Conns []*ConnInfo `json:"connections"` } // ConnzOptions are the options passed to Connz() @@ -67,31 +68,45 @@ type ConnzOptions struct { // Filter for this explicit client connection. CID uint64 `json:"cid"` + + // Filter by connection state. + State ConnState `json:"state"` } +// For filtering states of connections. We will only have two, open and closed. +type ConnState int + +const ( + ConnOpen = ConnState(iota) + ConnClosed + ConnAll +) + // ConnInfo has detailed information on a per connection basis. type ConnInfo struct { - Cid uint64 `json:"cid"` - IP string `json:"ip"` - Port int `json:"port"` - Start time.Time `json:"start"` - LastActivity time.Time `json:"last_activity"` - RTT string `json:"rtt,omitempty"` - Uptime string `json:"uptime"` - Idle string `json:"idle"` - Pending int `json:"pending_bytes"` - InMsgs int64 `json:"in_msgs"` - OutMsgs int64 `json:"out_msgs"` - InBytes int64 `json:"in_bytes"` - OutBytes int64 `json:"out_bytes"` - NumSubs uint32 `json:"subscriptions"` - Name string `json:"name,omitempty"` - Lang string `json:"lang,omitempty"` - Version string `json:"version,omitempty"` - TLSVersion string `json:"tls_version,omitempty"` - TLSCipher string `json:"tls_cipher_suite,omitempty"` - AuthorizedUser string `json:"authorized_user,omitempty"` - Subs []string `json:"subscriptions_list,omitempty"` + Cid uint64 `json:"cid"` + IP string `json:"ip"` + Port int `json:"port"` + Start time.Time `json:"start"` + LastActivity time.Time `json:"last_activity"` + Stop *time.Time `json:"stop,omitempty"` + Reason string `json:"reason,omitempty"` + RTT string `json:"rtt,omitempty"` + Uptime string `json:"uptime"` + Idle string `json:"idle"` + Pending int `json:"pending_bytes"` + InMsgs int64 `json:"in_msgs"` + OutMsgs int64 `json:"out_msgs"` + InBytes int64 `json:"in_bytes"` + OutBytes int64 `json:"out_bytes"` + NumSubs uint32 `json:"subscriptions"` + Name string `json:"name,omitempty"` + Lang string `json:"lang,omitempty"` + Version string `json:"version,omitempty"` + TLSVersion string `json:"tls_version,omitempty"` + TLSCipher string `json:"tls_cipher_suite,omitempty"` + AuthorizedUser string `json:"authorized_user,omitempty"` + Subs []string `json:"subscriptions_list,omitempty"` } // DefaultConnListSize is the default size of the connection list. @@ -108,6 +123,7 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) { offset int limit = DefaultConnListSize cid = uint64(0) + state = ConnOpen ) if opts != nil { @@ -130,6 +146,10 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) { if limit <= 0 { limit = DefaultConnListSize } + // state + state = opts.State + + // If searching by CID if opts.CID > 0 { cid = opts.CID limit = 1 @@ -142,173 +162,218 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) { Now: time.Now(), } - // Walk the list + // Open clients + var openClients []*client + // Hold for closed clients if requested. + var closedClients []*closedClient + + // Walk the open client list with server lock held. s.mu.Lock() - tlsRequired := s.info.TLSRequired // copy the server id for monitoring c.ID = s.info.ID - // number total of clients. The resulting ConnInfo array + // Number of total clients. The resulting ConnInfo array // may be smaller if pagination is used. - totalClients := len(s.clients) - c.Total = totalClients + switch state { + case ConnOpen: + c.Total = len(s.clients) + case ConnClosed: + c.Total = s.closed.len() + closedClients = s.closed.closedClients() + case ConnAll: + c.Total = len(s.clients) + s.closed.len() + closedClients = s.closed.closedClients() + } - var pairs Pairs + totalClients := c.Total + if cid > 0 { // Meaning we only want 1. + totalClients = 1 + } + if state == ConnOpen || state == ConnAll { + openClients = make([]*client, 0, totalClients) + } + + // Data structures for results. + var conns []ConnInfo // Limits allocs for actual ConnInfos. + var pconns ConnInfos + + switch state { + case ConnOpen: + conns = make([]ConnInfo, totalClients) + pconns = make(ConnInfos, totalClients) + case ConnClosed: + pconns = make(ConnInfos, totalClients) + case ConnAll: + conns = make([]ConnInfo, cap(openClients)) + pconns = make(ConnInfos, totalClients) + } + + // Search by individual CID. if cid > 0 { - client := s.clients[cid] - if client != nil { - pairs = append(pairs, Pair{Key: client, Val: int64(cid)}) + if state == ConnClosed || state == ConnAll { + copyClosed := closedClients + closedClients = nil + for _, cc := range copyClosed { + if cc.Cid == cid { + closedClients = []*closedClient{cc} + break + } + } + } else if state == ConnOpen || state == ConnAll { + client := s.clients[cid] + if client != nil { + openClients = append(openClients, client) + } } } else { - i := 0 - pairs = make(Pairs, totalClients) - for _, client := range s.clients { - client.mu.Lock() - switch sortOpt { - case ByCid: - pairs[i] = Pair{Key: client, Val: int64(client.cid)} - case BySubs: - pairs[i] = Pair{Key: client, Val: int64(len(client.subs))} - case ByPending: - pairs[i] = Pair{Key: client, Val: int64(client.out.pb)} - case ByOutMsgs: - pairs[i] = Pair{Key: client, Val: client.outMsgs} - case ByInMsgs: - pairs[i] = Pair{Key: client, Val: atomic.LoadInt64(&client.inMsgs)} - case ByOutBytes: - pairs[i] = Pair{Key: client, Val: client.outBytes} - case ByInBytes: - pairs[i] = Pair{Key: client, Val: atomic.LoadInt64(&client.inBytes)} - case ByLast: - pairs[i] = Pair{Key: client, Val: client.last.UnixNano()} - case ByIdle: - pairs[i] = Pair{Key: client, Val: c.Now.Sub(client.last).Nanoseconds()} + // Gather all open clients. + if state == ConnOpen || state == ConnAll { + for _, client := range s.clients { + openClients = append(openClients, client) } - client.mu.Unlock() - i++ } } s.mu.Unlock() - if totalClients > 0 && len(pairs) > 1 { - if sortOpt == ByCid { - // Return in ascending order - sort.Sort(pairs) - } else { - // Return in descending order - sort.Sort(sort.Reverse(pairs)) - } - } - - minoff := c.Offset - maxoff := c.Offset + c.Limit - - // Make sure these are sane. - if minoff > totalClients { - minoff = totalClients - } - if maxoff > totalClients { - maxoff = totalClients - } - if pairs != nil { - pairs = pairs[minoff:maxoff] + // Just return with empty array if nothing here. + if len(openClients) == 0 && len(closedClients) == 0 { + c.Conns = ConnInfos{} + return c, nil } - // Now we have the real number of ConnInfo objects, we can set c.NumConns - // and allocate the array - c.NumConns = len(pairs) - c.Conns = make([]ConnInfo, c.NumConns) + // Now whip through and generate ConnInfo entries + // Open Clients i := 0 - for _, pair := range pairs { - - client := pair.Key - + for _, client := range openClients { client.mu.Lock() - - // First, fill ConnInfo with current client's values. We will - // then overwrite the field used for the sort with what was stored - // in 'pair'. - ci := &c.Conns[i] - - ci.Cid = client.cid - ci.Start = client.start - ci.LastActivity = client.last - ci.Uptime = myUptime(c.Now.Sub(client.start)) - ci.Idle = myUptime(c.Now.Sub(client.last)) - ci.RTT = client.getRTT() - ci.OutMsgs = client.outMsgs - ci.OutBytes = client.outBytes - ci.NumSubs = uint32(len(client.subs)) - ci.Pending = int(client.out.pb) - ci.Name = client.opts.Name - ci.Lang = client.opts.Lang - ci.Version = client.opts.Version - // inMsgs and inBytes are updated outside of the client's lock, so - // we need to use atomic here. - ci.InMsgs = atomic.LoadInt64(&client.inMsgs) - ci.InBytes = atomic.LoadInt64(&client.inBytes) - - // Now overwrite the field that was used as the sort key, so results - // still look sorted even if the value has changed since sort occurred. - sortValue := pair.Val - switch sortOpt { - case BySubs: - ci.NumSubs = uint32(sortValue) - case ByPending: - ci.Pending = int(sortValue) - case ByOutMsgs: - ci.OutMsgs = sortValue - case ByInMsgs: - ci.InMsgs = sortValue - case ByOutBytes: - ci.OutBytes = sortValue - case ByInBytes: - ci.InBytes = sortValue - case ByLast: - ci.LastActivity = time.Unix(0, sortValue) - case ByIdle: - ci.Idle = myUptime(time.Duration(sortValue)) - } - - // If the connection is gone, too bad, we won't set TLSVersion and TLSCipher. - // Exclude clients that are still doing handshake so we don't block in - // ConnectionState(). - if tlsRequired && client.flags.isSet(handshakeComplete) && client.nc != nil { - conn := client.nc.(*tls.Conn) - cs := conn.ConnectionState() - ci.TLSVersion = tlsVersion(cs.Version) - ci.TLSCipher = tlsCipher(cs.CipherSuite) - } - - switch conn := client.nc.(type) { - case *net.TCPConn, *tls.Conn: - addr := conn.RemoteAddr().(*net.TCPAddr) - ci.Port = addr.Port - ci.IP = addr.IP.String() - } - + ci := &conns[i] + ci.fill(client, client.nc, c.Now) // Fill in subscription data if requested. - if subs { - sublist := make([]*subscription, 0, len(client.subs)) + if subs && len(client.subs) > 0 { + ci.Subs = make([]string, 0, len(client.subs)) for _, sub := range client.subs { - sublist = append(sublist, sub) + ci.Subs = append(ci.Subs, string(sub.subject)) } - ci.Subs = castToSliceString(sublist) } - // Fill in user if auth requested. if auth { ci.AuthorizedUser = client.opts.Username } - client.mu.Unlock() + pconns[i] = ci + i++ + } + // Closed Clients + var needCopy bool + if subs || auth { + needCopy = true + } + for _, cc := range closedClients { + // Copy if needed for any changes to the ConnInfo + if needCopy { + cx := *cc + cc = &cx + } + // Fill in subscription data if requested. + if subs && len(cc.subs) > 0 { + cc.Subs = cc.subs + } + // Fill in user if auth requested. + if auth { + cc.AuthorizedUser = cc.user + } + pconns[i] = &cc.ConnInfo i++ } + + switch sortOpt { + case ByCid: + sort.Sort(byCid{pconns}) + case BySubs: + sort.Sort(sort.Reverse(bySubs{pconns})) + case ByPending: + sort.Sort(sort.Reverse(byPending{pconns})) + case ByOutMsgs: + sort.Sort(sort.Reverse(byOutMsgs{pconns})) + case ByInMsgs: + sort.Sort(sort.Reverse(byInMsgs{pconns})) + case ByOutBytes: + sort.Sort(sort.Reverse(byOutBytes{pconns})) + case ByInBytes: + sort.Sort(sort.Reverse(byInBytes{pconns})) + case ByLast: + sort.Sort(sort.Reverse(byLast{pconns})) + case ByIdle: + sort.Sort(sort.Reverse(byIdle{pconns})) + case ByUptime: + sort.Sort(sort.Reverse(byCid{pconns})) + } + + minoff := c.Offset + maxoff := c.Offset + c.Limit + + maxIndex := totalClients + + // Make sure these are sane. + if minoff > maxIndex { + minoff = maxIndex + } + if maxoff > maxIndex { + maxoff = maxIndex + } + + // Now pare down to the requested size. + // TODO(dlc) - for very large number of connections we + // could save the whole list in a hash, send hash on first + // request and allow users to use has for subsequent pages. + // Low TTL, say < 1sec. + c.Conns = pconns[minoff:maxoff] + c.NumConns = len(c.Conns) + return c, nil } +// Fills in the ConnInfo from the client. +// client should be locked. +func (ci *ConnInfo) fill(client *client, nc net.Conn, now time.Time) { + ci.Cid = client.cid + ci.Start = client.start + ci.LastActivity = client.last + ci.Uptime = myUptime(now.Sub(client.start)) + ci.Idle = myUptime(now.Sub(client.last)) + ci.RTT = client.getRTT() + ci.OutMsgs = client.outMsgs + ci.OutBytes = client.outBytes + ci.NumSubs = uint32(len(client.subs)) + ci.Pending = int(client.out.pb) + ci.Name = client.opts.Name + ci.Lang = client.opts.Lang + ci.Version = client.opts.Version + // inMsgs and inBytes are updated outside of the client's lock, so + // we need to use atomic here. + ci.InMsgs = atomic.LoadInt64(&client.inMsgs) + ci.InBytes = atomic.LoadInt64(&client.inBytes) + + // If the connection is gone, too bad, we won't set TLSVersion and TLSCipher. + // Exclude clients that are still doing handshake so we don't block in + // ConnectionState(). + if client.flags.isSet(handshakeComplete) && nc != nil { + conn := nc.(*tls.Conn) + cs := conn.ConnectionState() + ci.TLSVersion = tlsVersion(cs.Version) + ci.TLSCipher = tlsCipher(cs.CipherSuite) + } + + switch conn := nc.(type) { + case *net.TCPConn, *tls.Conn: + addr := conn.RemoteAddr().(*net.TCPAddr) + ci.Port = addr.Port + ci.IP = addr.IP.String() + } +} + // Assume lock is held func (c *client) getRTT() string { if c.rtt == 0 { @@ -325,7 +390,7 @@ func (c *client) getRTT() string { } else { rtt = c.rtt.Truncate(time.Millisecond) } - return fmt.Sprintf("%v", rtt) + return rtt.String() } func decodeBool(w http.ResponseWriter, r *http.Request, param string) (bool, error) { @@ -370,6 +435,26 @@ func decodeInt(w http.ResponseWriter, r *http.Request, param string) (int, error return val, nil } +func decodeState(w http.ResponseWriter, r *http.Request) (ConnState, error) { + str := r.URL.Query().Get("state") + if str == "" { + return ConnOpen, nil + } + switch strings.ToLower(str) { + case "open": + return ConnOpen, nil + case "closed": + return ConnClosed, nil + case "any", "all": + return ConnAll, nil + } + // We do not understand intended state here. + w.WriteHeader(http.StatusBadRequest) + err := fmt.Errorf("Error decoding state for %s", str) + w.Write([]byte(err.Error())) + return 0, err +} + // HandleConnz process HTTP requests for connection information. func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) { sortOpt := SortOpt(r.URL.Query().Get("sort")) @@ -389,11 +474,14 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) { if err != nil { return } - cid, err := decodeUint64(w, r, "cid") if err != nil { return } + state, err := decodeState(w, r) + if err != nil { + return + } connzOpts := &ConnzOptions{ Sort: sortOpt, @@ -402,6 +490,7 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) { Offset: offset, Limit: limit, CID: cid, + State: state, } s.mu.Lock() @@ -423,14 +512,6 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) { ResponseHandler(w, r, b) } -func castToSliceString(input []*subscription) []string { - output := make([]string, 0, len(input)) - for _, line := range input { - output = append(output, string(line.subject)) - } - return output -} - // Subsz represents detail information on current connections. type Subsz struct { *SublistStats @@ -499,12 +580,11 @@ func (s *Server) Routez(routezOpts *RoutezOptions) (*Routez, error) { NumSubs: uint32(len(r.subs)), } - if subs { - sublist := make([]*subscription, 0, len(r.subs)) + if subs && len(r.subs) > 0 { + ri.Subs = make([]string, 0, len(r.subs)) for _, sub := range r.subs { - sublist = append(sublist, sub) + ri.Subs = append(ri.Subs, string(sub.subject)) } - ri.Subs = castToSliceString(sublist) } switch conn := r.nc.(type) { case *net.TCPConn, *tls.Conn: @@ -754,3 +834,47 @@ func ResponseHandler(w http.ResponseWriter, r *http.Request, data []byte) { w.Write(data) } } + +func (reason ClosedState) String() string { + switch reason { + case ClientClosed: + return "Client" + case AuthenticationTimeout: + return "Authentication Timeout" + case AuthenticationViolation: + return "Authentication Failure" + case TLSHandshakeError: + return "TLS Handshake Failure" + case SlowConsumerPendingBytes: + return "Slow Consumer (Pending Bytes)" + case SlowConsumerWriteDeadline: + return "Slow Consumer (Write Deadline)" + case WriteError: + return "Write Error" + case ReadError: + return "Read Error" + case ParseError: + return "Parse Error" + case StaleConnection: + return "Stale Connection" + case ProtocolViolation: + return "Protocol Violation" + case BadClientProtocolVersion: + return "Bad Client Protocol Version" + case WrongPort: + return "Incorrect Port" + case MaxConnectionsExceeded: + return "Maximum Connections Exceeded" + case MaxPayloadExceeded: + return "Maximum Message Payload Exceeded" + case MaxControlLineExceeded: + return "Maximum Control Line Exceeded" + case DuplicateRoute: + return "Duplicate Route" + case RouteRemoved: + return "Route Removed" + case ServerShutdown: + return "Server Shutdown" + } + return "Unknown State" +} diff --git a/server/monitor_sort_opts.go b/server/monitor_sort_opts.go index 950ef3355ea..3fd1f08956f 100644 --- a/server/monitor_sort_opts.go +++ b/server/monitor_sort_opts.go @@ -13,7 +13,14 @@ package server -// SortOpt is a helper type to sort by ConnInfo values +// Represents a connection info list. We use pointers since it will be sorted. +type ConnInfos []*ConnInfo + +// For sorting +func (cl ConnInfos) Len() int { return len(cl) } +func (cl ConnInfos) Swap(i, j int) { cl[i], cl[j] = cl[j], cl[i] } + +// SortOpt is a helper type to sort clients type SortOpt string // Possible sort options @@ -30,6 +37,54 @@ const ( ByUptime SortOpt = "uptime" // By the amount of time connections exist ) +// Individual sort options provide the Less for sort.Interface. Len and Swap are on cList. +// CID +type byCid struct{ ConnInfos } + +func (l byCid) Less(i, j int) bool { return l.ConnInfos[i].Cid < l.ConnInfos[j].Cid } + +// Number of Subscriptions +type bySubs struct{ ConnInfos } + +func (l bySubs) Less(i, j int) bool { return l.ConnInfos[i].NumSubs < l.ConnInfos[j].NumSubs } + +// Pending Bytes +type byPending struct{ ConnInfos } + +func (l byPending) Less(i, j int) bool { return l.ConnInfos[i].Pending < l.ConnInfos[j].Pending } + +// Outbound Msgs +type byOutMsgs struct{ ConnInfos } + +func (l byOutMsgs) Less(i, j int) bool { return l.ConnInfos[i].OutMsgs < l.ConnInfos[j].OutMsgs } + +// Inbound Msgs +type byInMsgs struct{ ConnInfos } + +func (l byInMsgs) Less(i, j int) bool { return l.ConnInfos[i].InMsgs < l.ConnInfos[j].InMsgs } + +// Outbound Bytes +type byOutBytes struct{ ConnInfos } + +func (l byOutBytes) Less(i, j int) bool { return l.ConnInfos[i].OutBytes < l.ConnInfos[j].OutBytes } + +// Inbound Bytes +type byInBytes struct{ ConnInfos } + +func (l byInBytes) Less(i, j int) bool { return l.ConnInfos[i].InBytes < l.ConnInfos[j].InBytes } + +// Last Activity +type byLast struct{ ConnInfos } + +func (l byLast) Less(i, j int) bool { + return l.ConnInfos[i].LastActivity.UnixNano() < l.ConnInfos[j].LastActivity.UnixNano() +} + +// Idle time +type byIdle struct{ ConnInfos } + +func (l byIdle) Less(i, j int) bool { return l.ConnInfos[i].Idle < l.ConnInfos[j].Idle } + // IsValid determines if a sort option is valid func (s SortOpt) IsValid() bool { switch s { @@ -39,24 +94,3 @@ func (s SortOpt) IsValid() bool { return false } } - -// Pair type is internally used. -type Pair struct { - Key *client - Val int64 -} - -// Pairs type is internally used. -type Pairs []Pair - -func (d Pairs) Len() int { - return len(d) -} - -func (d Pairs) Swap(i, j int) { - d[i], d[j] = d[j], d[i] -} - -func (d Pairs) Less(i, j int) bool { - return d[i].Val < d[j].Val -} diff --git a/server/monitor_test.go b/server/monitor_test.go index 6601f2c1a31..fded4bb911f 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -19,6 +19,7 @@ import ( "io/ioutil" "net/http" "net/url" + "runtime" "strings" "sync" "testing" @@ -232,6 +233,19 @@ func waitForClientConnCount(t *testing.T, s *Server, count int) { stackFatalf(t, "The number of expected connections was %v, got %v", count, s.NumClients()) } +func waitForClosedClientConnCount(t *testing.T, s *Server, count int) { + timeout := time.Now().Add(2 * time.Second) + for time.Now().Before(timeout) { + if s.numClosedConns() == count { + return + } + time.Sleep(15 * time.Millisecond) + } + stackFatalf(t, + "The number of expected closed connections was %v, got %v", + count, s.numClosedConns()) +} + func TestConnz(t *testing.T) { s := runMonitorServer() defer s.Shutdown() @@ -346,6 +360,7 @@ func TestConnzBadParams(t *testing.T) { readBodyEx(t, url+"subs=xxx", http.StatusBadRequest, textPlain) readBodyEx(t, url+"offset=xxx", http.StatusBadRequest, textPlain) readBodyEx(t, url+"limit=xxx", http.StatusBadRequest, textPlain) + readBodyEx(t, url+"state=xxx", http.StatusBadRequest, textPlain) } func TestConnzWithSubs(t *testing.T) { @@ -418,15 +433,15 @@ func TestConnzWithCID(t *testing.T) { } // Helper to map to connection name -func createConnMap(t *testing.T, cz *Connz) map[string]ConnInfo { - cm := make(map[string]ConnInfo) +func createConnMap(t *testing.T, cz *Connz) map[string]*ConnInfo { + cm := make(map[string]*ConnInfo) for _, c := range cz.Conns { cm[c.Name] = c } return cm } -func getFooAndBar(t *testing.T, cm map[string]ConnInfo) (ConnInfo, ConnInfo) { +func getFooAndBar(t *testing.T, cm map[string]*ConnInfo) (*ConnInfo, *ConnInfo) { return cm["foo"], cm["bar"] } @@ -532,7 +547,7 @@ func TestConnzLastActivity(t *testing.T) { sub, _ := ncFoo.Subscribe("hello.world", func(m *nats.Msg) {}) ensureServerActivityRecorded(t, ncFoo) - ciFoo, ciBar = getFooAndBar(t, createConnMap(t, pollConz(t, s, mode, url, opts))) + ciFoo, _ = getFooAndBar(t, createConnMap(t, pollConz(t, s, mode, url, opts))) nextLA := ciFoo.LastActivity if fooLA.Equal(nextLA) { t.Fatalf("Subscribe should have triggered update to LastActivity %+v\n", ciFoo) @@ -564,7 +579,7 @@ func TestConnzLastActivity(t *testing.T) { sub.Unsubscribe() ensureServerActivityRecorded(t, ncFoo) - ciFoo, ciBar = getFooAndBar(t, createConnMap(t, pollConz(t, s, mode, url, opts))) + ciFoo, _ = getFooAndBar(t, createConnMap(t, pollConz(t, s, mode, url, opts))) nextLA = ciFoo.LastActivity if fooLA.Equal(nextLA) { t.Fatalf("Message delivery should have triggered update to LastActivity\n") @@ -777,12 +792,13 @@ func TestConnzSortedBySubs(t *testing.T) { firstClient := createClientConnSubscribeAndPublish(t, s) firstClient.Subscribe("hello.world", func(m *nats.Msg) {}) + defer firstClient.Close() + clients := make([]*nats.Conn, 3) for i := range clients { clients[i] = createClientConnSubscribeAndPublish(t, s) defer clients[i].Close() } - defer firstClient.Close() url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().Port) for mode := 0; mode < 2; mode++ { @@ -1121,6 +1137,126 @@ func TestConnzWithNamedClient(t *testing.T) { } } +func TestConnzWithStateForClosedConns(t *testing.T) { + s := runMonitorServer() + defer s.Shutdown() + + numEach := 10 + // Create 10 closed, and 10 to leave open. + for i := 0; i < numEach; i++ { + nc := createClientConnSubscribeAndPublish(t, s) + nc.Subscribe("hello.closed.conns", func(m *nats.Msg) {}) + nc.Close() + nc = createClientConnSubscribeAndPublish(t, s) + nc.Subscribe("hello.open.conns", func(m *nats.Msg) {}) + defer nc.Close() + } + + url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().Port) + + for mode := 0; mode < 2; mode++ { + // Look at all open + c := pollConz(t, s, mode, url+"connz?state=open", &ConnzOptions{State: ConnOpen}) + if lc := len(c.Conns); lc != numEach { + t.Fatalf("Expected %d connections in array, got %d\n", numEach, lc) + } + // Look at all closed + c = pollConz(t, s, mode, url+"connz?state=closed", &ConnzOptions{State: ConnClosed}) + if lc := len(c.Conns); lc != numEach { + t.Fatalf("Expected %d connections in array, got %d\n", numEach, lc) + } + // Look at all + c = pollConz(t, s, mode, url+"connz?state=ALL", &ConnzOptions{State: ConnAll}) + if lc := len(c.Conns); lc != numEach*2 { + t.Fatalf("Expected %d connections in array, got %d\n", 2*numEach, lc) + } + // Look at CID #1, which is in closed. + c = pollConz(t, s, mode, url+"connz?cid=1&state=open", &ConnzOptions{CID: 1, State: ConnOpen}) + if lc := len(c.Conns); lc != 0 { + t.Fatalf("Expected no connections in open array, got %d\n", lc) + } + c = pollConz(t, s, mode, url+"connz?cid=1&state=closed", &ConnzOptions{CID: 1, State: ConnClosed}) + if lc := len(c.Conns); lc != 1 { + t.Fatalf("Expected a connection in closed array, got %d\n", lc) + } + c = pollConz(t, s, mode, url+"connz?cid=1&state=ALL", &ConnzOptions{CID: 1, State: ConnAll}) + if lc := len(c.Conns); lc != 1 { + t.Fatalf("Expected a connection in closed array, got %d\n", lc) + } + c = pollConz(t, s, mode, url+"connz?cid=1&state=closed&subs=true", + &ConnzOptions{CID: 1, State: ConnClosed, Subscriptions: true}) + if lc := len(c.Conns); lc != 1 { + t.Fatalf("Expected a connection in closed array, got %d\n", lc) + } + ci := c.Conns[0] + if ci.NumSubs != 1 { + t.Fatalf("Expected NumSubs to be 1, got %d\n", ci.NumSubs) + } + if len(ci.Subs) != 1 { + t.Fatalf("Expected len(ci.Subs) to be 1 also, got %d\n", len(ci.Subs)) + } + // Now ask for same thing without subs and make sure they are not returned. + c = pollConz(t, s, mode, url+"connz?cid=1&state=closed&subs=false", + &ConnzOptions{CID: 1, State: ConnClosed, Subscriptions: false}) + if lc := len(c.Conns); lc != 1 { + t.Fatalf("Expected a connection in closed array, got %d\n", lc) + } + ci = c.Conns[0] + if ci.NumSubs != 1 { + t.Fatalf("Expected NumSubs to be 1, got %d\n", ci.NumSubs) + } + if len(ci.Subs) != 0 { + t.Fatalf("Expected len(ci.Subs) to be 0 since subs=false, got %d\n", len(ci.Subs)) + } + + // CID #2 is in open + c = pollConz(t, s, mode, url+"connz?cid=2&state=open", &ConnzOptions{CID: 2, State: ConnOpen}) + if lc := len(c.Conns); lc != 1 { + t.Fatalf("Expected a connection in open array, got %d\n", lc) + } + c = pollConz(t, s, mode, url+"connz?cid=2&state=closed", &ConnzOptions{CID: 2, State: ConnClosed}) + if lc := len(c.Conns); lc != 0 { + t.Fatalf("Expected no connections in closed array, got %d\n", lc) + } + } +} + +// Make sure options for ConnInfo like subs=1, authuser, etc do not cause a +// race. +func TestConnzClosedConnsRace(t *testing.T) { + s := runMonitorServer() + defer s.Shutdown() + + // Create 100 closed connections. + for i := 0; i < 100; i++ { + nc := createClientConnSubscribeAndPublish(t, s) + nc.Close() + } + + urlWithoutSubs := fmt.Sprintf("http://localhost:%d/connz?state=closed", s.MonitorAddr().Port) + urlWithSubs := urlWithoutSubs + "&subs=true" + + waitForClosedClientConnCount(t, s, 100) + + wg := &sync.WaitGroup{} + + fn := func(url string) { + deadline := time.Now().Add(1 * time.Second) + for time.Now().Before(deadline) { + c := pollConz(t, s, 0, url, nil) + if len(c.Conns) != 100 { + t.Errorf("Incorrect Results: %+v\n", c) + } + } + wg.Done() + } + + wg.Add(2) + go fn(urlWithSubs) + go fn(urlWithoutSubs) + wg.Wait() +} + // Create a connection to test ConnInfo func createClientConnSubscribeAndPublish(t *testing.T, s *Server) *nats.Conn { natsURL := fmt.Sprintf("nats://localhost:%d", s.Addr().(*net.TCPAddr).Port) @@ -1364,3 +1500,58 @@ func TestHttpStatsNoUpdatedWhenUsingServerFuncs(t *testing.T) { } } } + +func TestClusterEmptyWhenNotDefined(t *testing.T) { + s := runMonitorServer() + defer s.Shutdown() + + body := readBody(t, fmt.Sprintf("http://localhost:%d/varz", s.MonitorAddr().Port)) + var v map[string]interface{} + if err := json.Unmarshal(body, &v); err != nil { + stackFatalf(t, "Got an error unmarshalling the body: %v\n", err) + } + // Cluster can empty, or be defined but that needs to be empty. + c, ok := v["cluster"] + if !ok { + return + } + if len(c.(map[string]interface{})) != 0 { + t.Fatalf("Expected an empty cluster definition, instead got %+v\n", c) + } +} + +// Benchmark our Connz generation. Don't use HTTP here, just measure server endpoint. +func Benchmark_Connz(b *testing.B) { + runtime.MemProfileRate = 0 + + s := runMonitorServerNoHTTPPort() + defer s.Shutdown() + + opts := s.getOpts() + url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port) + + // Create 250 connections with 100 subs each. + for i := 0; i < 250; i++ { + nc, err := nats.Connect(url) + if err != nil { + b.Fatalf("Error on connection[%d] to %s: %v", i, url, err) + } + for x := 0; x < 100; x++ { + subj := fmt.Sprintf("foo.%d", x) + nc.Subscribe(subj, func(m *nats.Msg) {}) + } + nc.Flush() + defer nc.Close() + } + + b.ResetTimer() + runtime.MemProfileRate = 1 + + copts := &ConnzOptions{Subscriptions: false} + for i := 0; i < b.N; i++ { + _, err := s.Connz(copts) + if err != nil { + b.Fatalf("Error on Connz(): %v", err) + } + } +} diff --git a/server/opts.go b/server/opts.go index 9040622a9be..59cbf019c6e 100644 --- a/server/opts.go +++ b/server/opts.go @@ -38,7 +38,7 @@ type ClusterOpts struct { Username string `json:"-"` Password string `json:"-"` AuthTimeout float64 `json:"auth_timeout,omitempty"` - Permissions *RoutePermissions `json:"permissions"` + Permissions *RoutePermissions `json:"-"` TLSTimeout float64 `json:"-"` TLSConfig *tls.Config `json:"-"` ListenStr string `json:"-"` @@ -49,46 +49,47 @@ type ClusterOpts struct { // Options block for gnatsd server. type Options struct { - ConfigFile string `json:"-"` - Host string `json:"addr"` - Port int `json:"port"` - ClientAdvertise string `json:"-"` - Trace bool `json:"-"` - Debug bool `json:"-"` - NoLog bool `json:"-"` - NoSigs bool `json:"-"` - Logtime bool `json:"-"` - MaxConn int `json:"max_connections"` - Users []*User `json:"-"` - Username string `json:"-"` - Password string `json:"-"` - Authorization string `json:"-"` - PingInterval time.Duration `json:"ping_interval"` - MaxPingsOut int `json:"ping_max"` - HTTPHost string `json:"http_host"` - HTTPPort int `json:"http_port"` - HTTPSPort int `json:"https_port"` - AuthTimeout float64 `json:"auth_timeout"` - MaxControlLine int `json:"max_control_line"` - MaxPayload int `json:"max_payload"` - MaxPending int64 `json:"max_pending"` - Cluster ClusterOpts `json:"cluster,omitempty"` - ProfPort int `json:"-"` - PidFile string `json:"-"` - LogFile string `json:"-"` - Syslog bool `json:"-"` - RemoteSyslog string `json:"-"` - Routes []*url.URL `json:"-"` - RoutesStr string `json:"-"` - TLSTimeout float64 `json:"tls_timeout"` - TLS bool `json:"-"` - TLSVerify bool `json:"-"` - TLSCert string `json:"-"` - TLSKey string `json:"-"` - TLSCaCert string `json:"-"` - TLSConfig *tls.Config `json:"-"` - WriteDeadline time.Duration `json:"-"` - RQSubsSweep time.Duration `json:"-"` + ConfigFile string `json:"-"` + Host string `json:"addr"` + Port int `json:"port"` + ClientAdvertise string `json:"-"` + Trace bool `json:"-"` + Debug bool `json:"-"` + NoLog bool `json:"-"` + NoSigs bool `json:"-"` + Logtime bool `json:"-"` + MaxConn int `json:"max_connections"` + Users []*User `json:"-"` + Username string `json:"-"` + Password string `json:"-"` + Authorization string `json:"-"` + PingInterval time.Duration `json:"ping_interval"` + MaxPingsOut int `json:"ping_max"` + HTTPHost string `json:"http_host"` + HTTPPort int `json:"http_port"` + HTTPSPort int `json:"https_port"` + AuthTimeout float64 `json:"auth_timeout"` + MaxControlLine int `json:"max_control_line"` + MaxPayload int `json:"max_payload"` + MaxPending int64 `json:"max_pending"` + Cluster ClusterOpts `json:"cluster,omitempty"` + ProfPort int `json:"-"` + PidFile string `json:"-"` + LogFile string `json:"-"` + Syslog bool `json:"-"` + RemoteSyslog string `json:"-"` + Routes []*url.URL `json:"-"` + RoutesStr string `json:"-"` + TLSTimeout float64 `json:"tls_timeout"` + TLS bool `json:"-"` + TLSVerify bool `json:"-"` + TLSCert string `json:"-"` + TLSKey string `json:"-"` + TLSCaCert string `json:"-"` + TLSConfig *tls.Config `json:"-"` + WriteDeadline time.Duration `json:"-"` + RQSubsSweep time.Duration `json:"-"` + MaxClosedClients int `json:"-"` CustomClientAuthentication Authentication `json:"-"` CustomRouterAuthentication Authentication `json:"-"` @@ -968,6 +969,9 @@ func processOptions(opts *Options) { if opts.RQSubsSweep == time.Duration(0) { opts.RQSubsSweep = DEFAULT_REMOTE_QSUBS_SWEEPER } + if opts.MaxClosedClients == 0 { + opts.MaxClosedClients = DEFAULT_MAX_CLOSED_CLIENTS + } } // ConfigureOptions accepts a flag set and augment it with NATS Server diff --git a/server/opts_test.go b/server/opts_test.go index 8b54091bd93..55d37738b62 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -28,19 +28,20 @@ import ( func TestDefaultOptions(t *testing.T) { golden := &Options{ - Host: DEFAULT_HOST, - Port: DEFAULT_PORT, - MaxConn: DEFAULT_MAX_CONNECTIONS, - HTTPHost: DEFAULT_HOST, - PingInterval: DEFAULT_PING_INTERVAL, - MaxPingsOut: DEFAULT_PING_MAX_OUT, - TLSTimeout: float64(TLS_TIMEOUT) / float64(time.Second), - AuthTimeout: float64(AUTH_TIMEOUT) / float64(time.Second), - MaxControlLine: MAX_CONTROL_LINE_SIZE, - MaxPayload: MAX_PAYLOAD_SIZE, - MaxPending: MAX_PENDING_SIZE, - WriteDeadline: DEFAULT_FLUSH_DEADLINE, - RQSubsSweep: DEFAULT_REMOTE_QSUBS_SWEEPER, + Host: DEFAULT_HOST, + Port: DEFAULT_PORT, + MaxConn: DEFAULT_MAX_CONNECTIONS, + HTTPHost: DEFAULT_HOST, + PingInterval: DEFAULT_PING_INTERVAL, + MaxPingsOut: DEFAULT_PING_MAX_OUT, + TLSTimeout: float64(TLS_TIMEOUT) / float64(time.Second), + AuthTimeout: float64(AUTH_TIMEOUT) / float64(time.Second), + MaxControlLine: MAX_CONTROL_LINE_SIZE, + MaxPayload: MAX_PAYLOAD_SIZE, + MaxPending: MAX_PENDING_SIZE, + WriteDeadline: DEFAULT_FLUSH_DEADLINE, + RQSubsSweep: DEFAULT_REMOTE_QSUBS_SWEEPER, + MaxClosedClients: DEFAULT_MAX_CLOSED_CLIENTS, } opts := &Options{} diff --git a/server/parser.go b/server/parser.go index 44bf8e7e8fd..088894fb78e 100644 --- a/server/parser.go +++ b/server/parser.go @@ -667,7 +667,7 @@ func (c *client) parse(buf []byte) error { // catching here should prevent memory exhaustion attacks. if len(c.argBuf) > mcl { c.sendErr("Maximum Control Line Exceeded") - c.closeConnection() + c.closeConnection(MaxControlLineExceeded) return ErrMaxControlLine } } diff --git a/server/reload.go b/server/reload.go index 05cd2a238bd..66a9b9e541f 100644 --- a/server/reload.go +++ b/server/reload.go @@ -281,7 +281,7 @@ func (r *routesOption) Apply(server *Server) { if client.route.url == remove { // Do not attempt to reconnect when route is removed. client.setRouteNoReconnectOnClose() - client.closeConnection() + client.closeConnection(RouteRemoved) server.Noticef("Removed route %v", remove) } } diff --git a/server/ring.go b/server/ring.go new file mode 100644 index 00000000000..b9232ca9955 --- /dev/null +++ b/server/ring.go @@ -0,0 +1,75 @@ +// Copyright 2018 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +// We wrap to hold onto optional items for /connz. +type closedClient struct { + ConnInfo + subs []string + user string +} + +// Fixed sized ringbuffer for closed connections. +type closedRingBuffer struct { + total uint64 + conns []*closedClient +} + +// Create a new ring buffer with at most max items. +func newClosedRingBuffer(max int) *closedRingBuffer { + rb := &closedRingBuffer{} + rb.conns = make([]*closedClient, max) + return rb +} + +// Adds in a new closed connection. If there is no more room, +// remove the oldest. +func (rb *closedRingBuffer) append(cc *closedClient) { + rb.conns[rb.next()] = cc + rb.total++ +} + +func (rb *closedRingBuffer) next() int { + return int(rb.total % uint64(cap(rb.conns))) +} + +func (rb *closedRingBuffer) len() int { + if rb.total > uint64(cap(rb.conns)) { + return cap(rb.conns) + } + return int(rb.total) +} + +func (rb *closedRingBuffer) totalConns() uint64 { + return rb.total +} + +// This will not be sorted. Will return a copy of the list +// which recipient can modify. If the contents of the client +// itself need to be modified, meaning swapping in any optional items, +// a copy should be made. We could introduce a new lock and hold that +// but since we return this list inside monitor which allows programatic +// access, we do not know when it would be done. +func (rb *closedRingBuffer) closedClients() []*closedClient { + dup := make([]*closedClient, rb.len()) + if rb.total <= uint64(cap(rb.conns)) { + copy(dup, rb.conns[:rb.len()]) + } else { + first := rb.next() + next := cap(rb.conns) - first + copy(dup, rb.conns[first:]) + copy(dup[next:], rb.conns[:next]) + } + return dup +} diff --git a/server/route.go b/server/route.go index 0fe1d7eee5c..9d938bad955 100644 --- a/server/route.go +++ b/server/route.go @@ -269,7 +269,7 @@ func (c *client) sendConnect(tlsRequired bool) { b, err := json.Marshal(cinfo) if err != nil { c.Errorf("Error marshaling CONNECT to route: %v\n", err) - c.closeConnection() + c.closeConnection(ProtocolViolation) return } c.sendProto([]byte(fmt.Sprintf(ConProto, b)), true) @@ -306,7 +306,7 @@ func (c *client) processRouteInfo(info *Info) { // Detect route to self. if c.route.remoteID == s.info.ID { c.mu.Unlock() - c.closeConnection() + c.closeConnection(DuplicateRoute) return } @@ -323,7 +323,7 @@ func (c *client) processRouteInfo(info *Info) { if err != nil { c.Errorf("Error parsing URL from INFO: %v\n", err) c.mu.Unlock() - c.closeConnection() + c.closeConnection(ParseError) return } c.route.url = url @@ -366,7 +366,7 @@ func (c *client) processRouteInfo(info *Info) { } } else { c.Debugf("Detected duplicate remote route %q", info.ID) - c.closeConnection() + c.closeConnection(DuplicateRoute) } } @@ -595,7 +595,7 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client { if err := conn.Handshake(); err != nil { c.Errorf("TLS route handshake error: %v", err) c.sendErr("Secure Connection - TLS Required") - c.closeConnection() + c.closeConnection(TLSHandshakeError) return nil } // Reset the read deadline @@ -631,7 +631,7 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client { if !running { c.mu.Unlock() c.setRouteNoReconnectOnClose() - c.closeConnection() + c.closeConnection(ServerShutdown) return nil } diff --git a/server/server.go b/server/server.go index 0ecd9afebf4..10fa85fdd9f 100644 --- a/server/server.go +++ b/server/server.go @@ -72,6 +72,7 @@ type Server struct { remotes map[string]*client users map[string]*User totalClients uint64 + closed *closedRingBuffer done chan bool start time.Time http net.Listener @@ -179,6 +180,9 @@ func New(opts *Options) *Server { // For tracking clients s.clients = make(map[uint64]*client) + // For tracking closed clients. + s.closed = newClosedRingBuffer(opts.MaxClosedClients) + // For tracking connections that are not yet registered // in s.routes, but for which readLoop has started. s.grTmpClients = make(map[uint64]*client) @@ -392,7 +396,7 @@ func (s *Server) Shutdown() { // Close client and route connections for _, c := range conns { - c.closeConnection() + c.closeConnection(ServerShutdown) } // Block until the accept loops exit @@ -785,8 +789,7 @@ func (s *Server) createClient(conn net.Conn) *client { c.mu.Unlock() if err := conn.Handshake(); err != nil { c.Errorf("TLS handshake error: %v", err) - c.sendErr("Secure Connection - TLS Required") - c.closeConnection() + c.closeConnection(TLSHandshakeError) return nil } // Reset the read deadline @@ -834,6 +837,35 @@ func (s *Server) createClient(conn net.Conn) *client { return c } +// This will save off a closed client in a ring buffer such that +// /connz can inspect. Useful for debugging, etc. +func (s *Server) saveClosedClient(c *client, nc net.Conn, reason ClosedState) { + now := time.Now() + + c.mu.Lock() + + cc := &closedClient{} + cc.fill(c, nc, now) + cc.Stop = &now + cc.Reason = reason.String() + + // Do subs, do not place by default in main ConnInfo + if len(c.subs) > 0 { + cc.subs = make([]string, 0, len(c.subs)) + for _, sub := range c.subs { + cc.subs = append(cc.subs, string(sub.subject)) + } + } + // Hold user as well. + cc.user = c.opts.Username + c.mu.Unlock() + + // Place in the ring buffer + s.mu.Lock() + s.closed.append(cc) + s.mu.Unlock() +} + // Adds the given array of urls to the server's INFO.ClientConnectURLs // array. The server INFO JSON is regenerated. // Note that a check is made to ensure that given URLs are not @@ -901,7 +933,7 @@ func tlsTimeout(c *client, conn *tls.Conn) { if !cs.HandshakeComplete { c.Errorf("TLS handshake timeout") c.sendErr("Secure Connection - TLS Required") - c.closeConnection() + c.closeConnection(TLSHandshakeError) } } @@ -1089,6 +1121,24 @@ func (s *Server) startGoRoutine(f func()) { s.grMu.Unlock() } +func (s *Server) numClosedConns() int { + s.mu.Lock() + defer s.mu.Unlock() + return s.closed.len() +} + +func (s *Server) totalClosedConns() uint64 { + s.mu.Lock() + defer s.mu.Unlock() + return s.closed.totalConns() +} + +func (s *Server) closedClients() []*closedClient { + s.mu.Lock() + defer s.mu.Unlock() + return s.closed.closedClients() +} + // getClientConnectURLs returns suitable URLs for clients to connect to the listen // port based on the server options' Host and Port. If the Host corresponds to // "any" interfaces, this call returns the list of resolved IP addresses. diff --git a/test/configs/tls.conf b/test/configs/tls.conf index 6dab9429072..6d503b9456b 100644 --- a/test/configs/tls.conf +++ b/test/configs/tls.conf @@ -15,6 +15,6 @@ tls { authorization { user: derek - password: boo + password: monkey timeout: 1 }