From b8e7b9b68e13a381ffdaa6916a405ccd4a173783 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 4 Feb 2019 17:07:49 -0800 Subject: [PATCH 1/2] Some Optimizations 1. Change outbound client structure to be smaller and more cache friendly. 2. Snapshot MaxControlLine into client structure (mcl) to avoid server opts lookup. Signed-off-by: Derek Collison --- server/client.go | 36 ++++++++++++++++++++++-------------- server/const.go | 2 +- server/parser.go | 17 ++++++++--------- server/parser_test.go | 4 ++-- server/split_test.go | 20 ++++++++++---------- 5 files changed, 43 insertions(+), 36 deletions(-) diff --git a/server/client.go b/server/client.go index fa57ebc5cf5..7b0fa52be6d 100644 --- a/server/client.go +++ b/server/client.go @@ -146,6 +146,7 @@ type client struct { stats mpay int32 msubs int + mcl int mu sync.Mutex kind int cid uint64 @@ -199,14 +200,14 @@ type outbound struct { nb net.Buffers // net.Buffers for writev IO sz int // limit size per []byte, uses variable BufSize constants, start, min, max. sws int // Number of short writes, used for dynamic resizing. - pb int64 // Total pending/queued bytes. - pm int64 // Total pending/queued messages. + pb int // Total pending/queued bytes. + pm int // Total pending/queued messages. sg *sync.Cond // Flusher conditional for signaling. - sgw bool // Indicate flusher is waiting on condition wait. wdl time.Duration // Snapshot fo write deadline. - mp int64 // snapshot of max pending. + mp int // snapshot of max pending. fsp int // Flush signals that are pending from readLoop's pcd. lft time.Duration // Last flush time. + sgw bool // Indicate flusher is waiting on condition wait. } type perm struct { @@ -353,7 +354,7 @@ func (c *client) initClient() { opts := s.getOpts() // Snapshots to avoid mutex access in fast paths. c.out.wdl = opts.WriteDeadline - c.out.mp = opts.MaxPending + c.out.mp = int(opts.MaxPending) c.subs = make(map[string]*subscription) c.echo = true @@ -656,6 +657,15 @@ func (c *client) readLoop() { nc := c.nc s := c.srv c.in.rsz = startBufSize + // Snapshot max control line since currently can not be changed on reload and we + // were checking it on each call to parse. If this changes and we allow MaxControlLine + // to be reloaded without restart, this code will need to change. + c.mcl = MAX_CONTROL_LINE_SIZE + if s != nil { + if opts := s.getOpts(); opts != nil { + c.mcl = opts.MaxControlLine + } + } defer s.grWG.Done() c.mu.Unlock() @@ -845,11 +855,11 @@ func (c *client) flushOutbound() bool { c.out.lft = lft // Subtract from pending bytes and messages. - c.out.pb -= n + c.out.pb -= int(n) c.out.pm -= apm // FIXME(dlc) - this will not be totally accurate. // Check for partial writes - if n != attempted && n > 0 { + if n != int64(attempted) && n > 0 { c.handlePartialWrite(nb) } else if n >= int64(c.out.sz) { c.out.sws = 0 @@ -892,10 +902,10 @@ func (c *client) flushOutbound() bool { } // Adjust based on what we wrote plus any pending. - pt := int(n + c.out.pb) + pt := int(n) + c.out.pb // Adjust sz as needed downward, keeping power of 2. - // We do this at a slower rate, hence the pt*4. + // We do this at a slower rate. if pt < c.out.sz && c.out.sz > minBufSize { c.out.sws++ if c.out.sws > shortsToShrink { @@ -1234,7 +1244,7 @@ func (c *client) queueOutbound(data []byte) bool { // Assume data will not be referenced referenced := false // Add to pending bytes total. - c.out.pb += int64(len(data)) + c.out.pb += len(data) // Check for slow consumer via pending bytes limit. // ok to return here, client is going away. @@ -2540,11 +2550,9 @@ func (c *client) clearAuthTimer() bool { // We may reuse atmr for expiring user jwts, // so check connectReceived. +// Lock assume held on entry. func (c *client) awaitingAuth() bool { - c.mu.Lock() - authSet := !c.flags.isSet(connectReceived) && c.atmr != nil - c.mu.Unlock() - return authSet + return !c.flags.isSet(connectReceived) && c.atmr != nil } // This will set the atmr for the JWT expiration time. diff --git a/server/const.go b/server/const.go index 535c2067d1b..10524d24fb2 100644 --- a/server/const.go +++ b/server/const.go @@ -40,7 +40,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.0.0-RC2" + VERSION = "2.0.0-RC3" // PROTO is the currently supported protocol. // 0 was the original diff --git a/server/parser.go b/server/parser.go index e4a28e1d904..2cb3d9f1ace 100644 --- a/server/parser.go +++ b/server/parser.go @@ -109,17 +109,14 @@ func (c *client) parse(buf []byte) error { var i int var b byte - // FIXME(dlc) - This is wasteful, only can change on reload. - mcl := MAX_CONTROL_LINE_SIZE - if c.srv != nil { - if opts := c.srv.getOpts(); opts != nil { - mcl = opts.MaxControlLine - } - } - - // Snapshot this, and reset when we receive a + // Snapshots + c.mu.Lock() + // Snapshot and then reset when we receive a // proper CONNECT if needed. authSet := c.awaitingAuth() + // Snapshot max control line as well. + mcl := c.mcl + c.mu.Unlock() // Move to loop instead of range syntax to allow jumping of i for i = 0; i < len(buf); i++ { @@ -606,7 +603,9 @@ func (c *client) parse(buf []byte) error { } c.drop, c.state = 0, OP_START // Reset notion on authSet + c.mu.Lock() authSet = c.awaitingAuth() + c.mu.Unlock() default: if c.argBuf != nil { c.argBuf = append(c.argBuf, b) diff --git a/server/parser_test.go b/server/parser_test.go index 4748d92781a..caf930ab350 100644 --- a/server/parser_test.go +++ b/server/parser_test.go @@ -19,7 +19,7 @@ import ( ) func dummyClient() *client { - return &client{srv: New(&defaultServerOptions), msubs: -1, mpay: -1} + return &client{srv: New(&defaultServerOptions), msubs: -1, mpay: -1, mcl: MAX_CONTROL_LINE_SIZE} } func dummyRouteClient() *client { @@ -578,7 +578,7 @@ func TestParseOK(t *testing.T) { func TestMaxControlLine(t *testing.T) { c := dummyClient() - c.srv.opts.MaxControlLine = 8 + c.mcl = 8 pub := []byte("PUB foo.bar 11\r") err := c.parse(pub) diff --git a/server/split_test.go b/server/split_test.go index 0459c383257..7c901646e1d 100644 --- a/server/split_test.go +++ b/server/split_test.go @@ -30,7 +30,7 @@ func TestSplitBufferSubOp(t *testing.T) { } s := &Server{gacc: NewAccount(globalAccountName), accounts: make(map[string]*Account), gateway: gws} s.registerAccount(s.gacc) - c := &client{srv: s, acc: s.gacc, msubs: -1, mpay: -1, subs: make(map[string]*subscription), nc: cli} + c := &client{srv: s, acc: s.gacc, msubs: -1, mpay: -1, mcl: 1024, subs: make(map[string]*subscription), nc: cli} subop := []byte("SUB foo 1\r\n") subop1 := subop[:6] @@ -67,7 +67,7 @@ func TestSplitBufferSubOp(t *testing.T) { func TestSplitBufferUnsubOp(t *testing.T) { s := &Server{gacc: NewAccount(globalAccountName), accounts: make(map[string]*Account), gateway: &srvGateway{}} s.registerAccount(s.gacc) - c := &client{srv: s, acc: s.gacc, msubs: -1, mpay: -1, subs: make(map[string]*subscription)} + c := &client{srv: s, acc: s.gacc, msubs: -1, mpay: -1, mcl: 1024, subs: make(map[string]*subscription)} subop := []byte("SUB foo 1024\r\n") if err := c.parse(subop); err != nil { @@ -100,7 +100,7 @@ func TestSplitBufferUnsubOp(t *testing.T) { } func TestSplitBufferPubOp(t *testing.T) { - c := &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription)} + c := &client{msubs: -1, mpay: -1, mcl: 1024, subs: make(map[string]*subscription)} pub := []byte("PUB foo.bar INBOX.22 11\r\nhello world\r") pub1 := pub[:2] pub2 := pub[2:9] @@ -166,7 +166,7 @@ func TestSplitBufferPubOp(t *testing.T) { } func TestSplitBufferPubOp2(t *testing.T) { - c := &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription)} + c := &client{msubs: -1, mpay: -1, mcl: 1024, subs: make(map[string]*subscription)} pub := []byte("PUB foo.bar INBOX.22 11\r\nhello world\r\n") pub1 := pub[:30] pub2 := pub[30:] @@ -186,7 +186,7 @@ func TestSplitBufferPubOp2(t *testing.T) { } func TestSplitBufferPubOp3(t *testing.T) { - c := &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription)} + c := &client{msubs: -1, mpay: -1, mcl: 1024, subs: make(map[string]*subscription)} pubAll := []byte("PUB foo bar 11\r\nhello world\r\n") pub := pubAll[:16] @@ -212,7 +212,7 @@ func TestSplitBufferPubOp3(t *testing.T) { } func TestSplitBufferPubOp4(t *testing.T) { - c := &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription)} + c := &client{msubs: -1, mpay: -1, mcl: 1024, subs: make(map[string]*subscription)} pubAll := []byte("PUB foo 11\r\nhello world\r\n") pub := pubAll[:12] @@ -238,7 +238,7 @@ func TestSplitBufferPubOp4(t *testing.T) { } func TestSplitBufferPubOp5(t *testing.T) { - c := &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription)} + c := &client{msubs: -1, mpay: -1, mcl: 1024, subs: make(map[string]*subscription)} pubAll := []byte("PUB foo 11\r\nhello world\r\n") // Splits need to be on MSG_END_R now too, so make sure we check that. @@ -257,7 +257,7 @@ func TestSplitBufferPubOp5(t *testing.T) { } func TestSplitConnectArg(t *testing.T) { - c := &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription)} + c := &client{msubs: -1, mpay: -1, mcl: 1024, subs: make(map[string]*subscription)} connectAll := []byte("CONNECT {\"verbose\":false,\"tls_required\":false," + "\"user\":\"test\",\"pedantic\":true,\"pass\":\"pass\"}\r\n") @@ -306,7 +306,7 @@ func TestSplitConnectArg(t *testing.T) { func TestSplitDanglingArgBuf(t *testing.T) { s := New(&defaultServerOptions) - c := &client{srv: s, acc: s.gacc, msubs: -1, mpay: -1, subs: make(map[string]*subscription)} + c := &client{srv: s, acc: s.gacc, msubs: -1, mpay: -1, mcl: 1024, subs: make(map[string]*subscription)} // We test to make sure we do not dangle any argBufs after processing // since that could lead to performance issues. @@ -445,7 +445,7 @@ func TestSplitRoutedMsgArg(t *testing.T) { } func TestSplitBufferMsgOp(t *testing.T) { - c := &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription), kind: ROUTER} + c := &client{msubs: -1, mpay: -1, mcl: 1024, subs: make(map[string]*subscription), kind: ROUTER} msg := []byte("RMSG $G foo.bar _INBOX.22 11\r\nhello world\r") msg1 := msg[:2] msg2 := msg[2:9] From af78552549743e9b8400b8ec6a5f6204468adaef Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 5 Feb 2019 14:57:59 -0800 Subject: [PATCH 2/2] Move ints to proper sizes for all Signed-off-by: Derek Collison --- server/accounts.go | 40 ++++++++++++------------ server/client.go | 71 ++++++++++++++++++++++--------------------- server/client_test.go | 14 ++++----- server/events.go | 8 ++--- server/gateway.go | 4 +-- server/monitor.go | 6 ++-- server/opts.go | 20 +++++++++--- server/opts_test.go | 30 +++++++++++++++++- server/parser.go | 2 +- server/reload.go | 10 +++--- server/route.go | 8 ++--- server/server.go | 6 ++-- 12 files changed, 130 insertions(+), 89 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index eb9162b68e7..155ac62df10 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -1,4 +1,4 @@ -// Copyright 2018 The NATS Authors +// Copyright 2018-2019 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -34,7 +34,7 @@ const globalAccountName = "$G" // Route Map Entry - used for efficient interest graph propagation. // TODO(dlc) - squeeze even more? type rme struct { - qi int // used to index into key from map for optional queue name + qi int32 // used to index into key from map for optional queue name n int32 // number of subscriptions directly matching, local subs only. } @@ -50,15 +50,15 @@ type Account struct { sl *Sublist etmr *time.Timer ctmr *time.Timer - strack map[string]int - nrclients int - sysclients int + strack map[string]int32 + nrclients int32 + sysclients int32 clients map[*client]*client rm map[string]*rme imports importMap exports exportMap limits - nae int + nae int32 pruning bool expired bool srv *Server // server this account is registered with (possibly nil) @@ -67,9 +67,9 @@ type Account struct { // Account based limits. type limits struct { mpay int32 - msubs int - mconns int - maxnae int + msubs int32 + mconns int32 + maxnae int32 maxaettl time.Duration } @@ -137,7 +137,7 @@ func (a *Account) shallowCopy() *Account { func (a *Account) NumConnections() int { a.mu.RLock() defer a.mu.RUnlock() - return len(a.clients) + a.nrclients + return len(a.clients) + int(a.nrclients) } // NumLocalClients returns active number of clients for this account @@ -150,7 +150,7 @@ func (a *Account) NumLocalConnections() int { // Do not account for the system accounts. func (a *Account) numLocalConnections() int { - return len(a.clients) - a.sysclients + return len(a.clients) - int(a.sysclients) } // MaxClientsReached returns if we have reached our limit for number of connections. @@ -162,7 +162,7 @@ func (a *Account) MaxTotalConnectionsReached() bool { func (a *Account) maxTotalConnectionsReached() bool { if a.mconns != jwt.NoLimit { - return len(a.clients)-a.sysclients+a.nrclients >= a.mconns + return len(a.clients)-int(a.sysclients)+int(a.nrclients) >= int(a.mconns) } return false } @@ -172,7 +172,7 @@ func (a *Account) maxTotalConnectionsReached() bool { func (a *Account) MaxActiveConnections() int { a.mu.RLock() defer a.mu.RUnlock() - return a.mconns + return int(a.mconns) } // RoutedSubs returns how many subjects we would send across a route when first @@ -315,7 +315,7 @@ func (a *Account) removeServiceImport(subject string) { func (a *Account) numAutoExpireResponseMaps() int { a.mu.RLock() defer a.mu.RUnlock() - return a.nae + return int(a.nae) } // MaxAutoExpireResponseMaps return the maximum number of @@ -323,14 +323,14 @@ func (a *Account) numAutoExpireResponseMaps() int { func (a *Account) MaxAutoExpireResponseMaps() int { a.mu.RLock() defer a.mu.RUnlock() - return a.maxnae + return int(a.maxnae) } // SetMaxAutoExpireResponseMaps sets the max outstanding auto expire response maps. func (a *Account) SetMaxAutoExpireResponseMaps(max int) { a.mu.Lock() defer a.mu.Unlock() - a.maxnae = max + a.maxnae = int32(max) } // AutoExpireTTL returns the ttl for response maps. @@ -415,7 +415,7 @@ func (a *Account) pruneAutoExpireResponseMaps() { } a.mu.RLock() - numOver := a.nae - a.maxnae + numOver := int(a.nae - a.maxnae) a.mu.RUnlock() if numOver <= 0 { @@ -931,9 +931,9 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) { } // Now do limits if they are present. - a.msubs = int(ac.Limits.Subs) + a.msubs = int32(ac.Limits.Subs) a.mpay = int32(ac.Limits.Payload) - a.mconns = int(ac.Limits.Conn) + a.mconns = int32(ac.Limits.Conn) clients := gatherClients() // Sort if we are over the limit. @@ -943,7 +943,7 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) { }) } for i, c := range clients { - if a.mconns != jwt.NoLimit && i >= a.mconns { + if a.mconns != jwt.NoLimit && i >= int(a.mconns) { c.maxAccountConnExceeded() continue } diff --git a/server/client.go b/server/client.go index 7b0fa52be6d..2ed4bda9dfe 100644 --- a/server/client.go +++ b/server/client.go @@ -1,4 +1,4 @@ -// Copyright 2012-2018 The NATS Authors +// Copyright 2012-2019 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -145,8 +145,8 @@ type client struct { // Here first because of use of atomics, and memory alignment. stats mpay int32 - msubs int - mcl int + msubs int32 + mcl int32 mu sync.Mutex kind int cid uint64 @@ -160,7 +160,7 @@ type client struct { acc *Account user *NkeyUser host string - port int + port uint16 subs map[string]*subscription perms *permissions mperms *msgDeny @@ -198,14 +198,14 @@ type outbound struct { p []byte // Primary write buffer s []byte // Secondary for use post flush nb net.Buffers // net.Buffers for writev IO - sz int // limit size per []byte, uses variable BufSize constants, start, min, max. - sws int // Number of short writes, used for dynamic resizing. - pb int // Total pending/queued bytes. - pm int // Total pending/queued messages. + sz int32 // limit size per []byte, uses variable BufSize constants, start, min, max. + sws int32 // Number of short writes, used for dynamic resizing. + pb int32 // Total pending/queued bytes. + pm int32 // Total pending/queued messages. sg *sync.Cond // Flusher conditional for signaling. wdl time.Duration // Snapshot fo write deadline. - mp int // snapshot of max pending. - fsp int // Flush signals that are pending from readLoop's pcd. + mp int32 // snapshot of max pending. + fsp int32 // Flush signals that are pending from readLoop's pcd. lft time.Duration // Last flush time. sgw bool // Indicate flusher is waiting on condition wait. } @@ -259,11 +259,14 @@ type readCache struct { rts []routeTarget prand *rand.Rand - msgs int - bytes int - subs int - rsz int // Read buffer size - srs int // Short reads, used for dynamic buffer resizing. + + // These are all temporary totals for an invocation of a read in readloop. + msgs int32 + bytes int32 + subs int32 + + rsz int32 // Read buffer size + srs int32 // Short reads, used for dynamic buffer resizing. } const ( @@ -354,7 +357,7 @@ func (c *client) initClient() { opts := s.getOpts() // Snapshots to avoid mutex access in fast paths. c.out.wdl = opts.WriteDeadline - c.out.mp = int(opts.MaxPending) + c.out.mp = int32(opts.MaxPending) c.subs = make(map[string]*subscription) c.echo = true @@ -377,7 +380,7 @@ func (c *client) initClient() { if ip, ok := c.nc.(*net.TCPConn); ok { addr := ip.RemoteAddr().(*net.TCPAddr) c.host = addr.IP.String() - c.port = addr.Port + c.port = uint16(addr.Port) conn = fmt.Sprintf("%s:%d", addr.IP, addr.Port) } @@ -448,12 +451,12 @@ func (c *client) registerWithAccount(acc *Account) error { // Helper to determine if we have exceeded max subs. func (c *client) subsExceeded() bool { - return c.msubs != jwt.NoLimit && len(c.subs) > c.msubs + return c.msubs != jwt.NoLimit && len(c.subs) > int(c.msubs) } // Helper to determine if we have met or exceeded max subs. func (c *client) subsAtLimit() bool { - return c.msubs != jwt.NoLimit && len(c.subs) >= c.msubs + return c.msubs != jwt.NoLimit && len(c.subs) >= int(c.msubs) } // Apply account limits @@ -481,9 +484,9 @@ func (c *client) applyAccountLimits() { } // We check here if the server has an option set that is lower than the account limit. - if c.msubs != jwt.NoLimit && opts.MaxSubs != 0 && opts.MaxSubs < c.acc.msubs { + if c.msubs != jwt.NoLimit && opts.MaxSubs != 0 && opts.MaxSubs < int(c.acc.msubs) { c.Errorf("Max Subscriptions set to %d from server config which overrides %d from account claims", opts.MaxSubs, c.acc.msubs) - c.msubs = opts.MaxSubs + c.msubs = int32(opts.MaxSubs) } if c.subsExceeded() { @@ -663,7 +666,7 @@ func (c *client) readLoop() { c.mcl = MAX_CONTROL_LINE_SIZE if s != nil { if opts := s.getOpts(); opts != nil { - c.mcl = opts.MaxControlLine + c.mcl = int32(opts.MaxControlLine) } } defer s.grWG.Done() @@ -758,11 +761,11 @@ func (c *client) readLoop() { // Update read buffer size as/if needed. if n >= cap(b) && cap(b) < maxBufSize { // Grow - c.in.rsz = cap(b) * 2 + c.in.rsz = int32(cap(b) * 2) b = make([]byte, c.in.rsz) } else if n < cap(b) && cap(b) > minBufSize && c.in.srs > shortsToShrink { // Shrink, for now don't accelerate, ping/pong will eventually sort it out. - c.in.rsz = cap(b) / 2 + c.in.rsz = int32(cap(b) / 2) b = make([]byte, c.in.rsz) } c.mu.Unlock() @@ -855,7 +858,7 @@ func (c *client) flushOutbound() bool { c.out.lft = lft // Subtract from pending bytes and messages. - c.out.pb -= int(n) + c.out.pb -= int32(n) c.out.pm -= apm // FIXME(dlc) - this will not be totally accurate. // Check for partial writes @@ -902,7 +905,7 @@ func (c *client) flushOutbound() bool { } // Adjust based on what we wrote plus any pending. - pt := int(n) + c.out.pb + pt := int32(n) + c.out.pb // Adjust sz as needed downward, keeping power of 2. // We do this at a slower rate. @@ -920,11 +923,11 @@ func (c *client) flushOutbound() bool { // Check to see if we can reuse buffers. if len(cnb) > 0 { oldp := cnb[0][:0] - if cap(oldp) >= c.out.sz { + if cap(oldp) >= int(c.out.sz) { // Replace primary or secondary if they are nil, reusing same buffer. if c.out.p == nil { c.out.p = oldp - } else if c.out.s == nil || cap(c.out.s) < c.out.sz { + } else if c.out.s == nil || cap(c.out.s) < int(c.out.sz) { c.out.s = oldp } } @@ -1244,7 +1247,7 @@ func (c *client) queueOutbound(data []byte) bool { // Assume data will not be referenced referenced := false // Add to pending bytes total. - c.out.pb += len(data) + c.out.pb += int32(len(data)) // Check for slow consumer via pending bytes limit. // ok to return here, client is going away. @@ -1259,7 +1262,7 @@ func (c *client) queueOutbound(data []byte) bool { if c.out.sz == 0 { c.out.sz = startBufSize } - if c.out.s != nil && cap(c.out.s) >= c.out.sz { + if c.out.s != nil && cap(c.out.s) >= int(c.out.sz) { c.out.p = c.out.s c.out.s = nil } else { @@ -1272,7 +1275,7 @@ func (c *client) queueOutbound(data []byte) bool { if len(data) > available { // We can fit into existing primary, but message will fit in next one // we allocate or utilize from the secondary. So copy what we can. - if available > 0 && len(data) < c.out.sz { + if available > 0 && len(data) < int(c.out.sz) { c.out.p = append(c.out.p, data[:available]...) data = data[available:] } @@ -1293,10 +1296,10 @@ func (c *client) queueOutbound(data []byte) bool { if (c.out.sz << 1) <= maxBufSize { c.out.sz <<= 1 } - if len(data) > c.out.sz { + if len(data) > int(c.out.sz) { c.out.p = make([]byte, 0, len(data)) } else { - if c.out.s != nil && cap(c.out.s) >= c.out.sz { // TODO(dlc) - Size mismatch? + if c.out.s != nil && cap(c.out.s) >= int(c.out.sz) { // TODO(dlc) - Size mismatch? c.out.p = c.out.s c.out.s = nil } else { @@ -2125,7 +2128,7 @@ func (c *client) processInboundClientMsg(msg []byte) { // Update statistics // The msg includes the CR_LF, so pull back out for accounting. c.in.msgs++ - c.in.bytes += len(msg) - LEN_CR_LF + c.in.bytes += int32(len(msg) - LEN_CR_LF) if c.trace { c.traceMsg(msg) diff --git a/server/client_test.go b/server/client_test.go index 7367ba61b71..208454855a2 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -1,4 +1,4 @@ -// Copyright 2012-2018 The NATS Authors +// Copyright 2012-2019 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -966,8 +966,8 @@ func TestDynamicBuffers(t *testing.T) { } // Create some helper functions and data structures. - done := make(chan bool) // Used to stop recording. - type maxv struct{ rsz, wsz int } // Used to hold max values. + done := make(chan bool) // Used to stop recording. + type maxv struct{ rsz, wsz int32 } // Used to hold max values. results := make(chan maxv) // stopRecording stops the recording ticker and releases go routine. @@ -976,14 +976,14 @@ func TestDynamicBuffers(t *testing.T) { return <-results } // max just grabs max values. - max := func(a, b int) int { + max := func(a, b int32) int32 { if a > b { return a } return b } // Returns current value of the buffer sizes. - getBufferSizes := func() (int, int) { + getBufferSizes := func() (int32, int32) { c.mu.Lock() defer c.mu.Unlock() return c.in.rsz, c.out.sz @@ -1013,7 +1013,7 @@ func TestDynamicBuffers(t *testing.T) { } } // Check that the current value is what we expected. - checkBuffers := func(ers, ews int) { + checkBuffers := func(ers, ews int32) { t.Helper() rsz, wsz := getBufferSizes() if rsz != ers { @@ -1025,7 +1025,7 @@ func TestDynamicBuffers(t *testing.T) { } // Check that the max was as expected. - checkResults := func(m maxv, rsz, wsz int) { + checkResults := func(m maxv, rsz, wsz int32) { t.Helper() if rsz != m.rsz { t.Fatalf("Expected read buffer of %d, but got %d\n", rsz, m.rsz) diff --git a/server/events.go b/server/events.go index b54d1dab418..e205fb0b2f8 100644 --- a/server/events.go +++ b/server/events.go @@ -1,4 +1,4 @@ -// Copyright 2018 The NATS Authors +// Copyright 2018-2019 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -632,12 +632,12 @@ func (s *Server) remoteConnsUpdate(sub *subscription, subject, reply string, msg // If we are here we have interest in tracking this account. Update our accounting. acc.mu.Lock() if acc.strack == nil { - acc.strack = make(map[string]int) + acc.strack = make(map[string]int32) } // This does not depend on receiving all updates since each one is idempotent. prev := acc.strack[m.Server.ID] - acc.strack[m.Server.ID] = m.Conns - acc.nrclients += (m.Conns - prev) + acc.strack[m.Server.ID] = int32(m.Conns) + acc.nrclients += int32(m.Conns) - prev acc.mu.Unlock() s.updateRemoteServer(&m.Server) diff --git a/server/gateway.go b/server/gateway.go index 7e505349aa9..801f1478eec 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -1,4 +1,4 @@ -// Copyright 2018 The NATS Authors +// Copyright 2018-2019 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -2070,7 +2070,7 @@ func (c *client) processInboundGatewayMsg(msg []byte) { // Update statistics c.in.msgs++ // The msg includes the CR_LF, so pull back out for accounting. - c.in.bytes += len(msg) - LEN_CR_LF + c.in.bytes += int32(len(msg) - LEN_CR_LF) if c.trace { c.traceMsg(msg) diff --git a/server/monitor.go b/server/monitor.go index 227a2d5f70a..55034b406f9 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1,4 +1,4 @@ -// Copyright 2013-2018 The NATS Authors +// Copyright 2013-2019 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -384,7 +384,7 @@ func (ci *ConnInfo) fill(client *client, nc net.Conn, now time.Time) { } if client.port != 0 { - ci.Port = client.port + ci.Port = int(client.port) ci.IP = client.host } } @@ -920,7 +920,7 @@ func (s *Server) Varz(varzOpts *VarzOptions) (*Varz, error) { // Snapshot server options. opts := s.getOpts() - v := &Varz{Info: &s.info, Options: opts, MaxPayload: opts.MaxPayload, Start: s.start} + v := &Varz{Info: &s.info, Options: opts, MaxPayload: int(opts.MaxPayload), Start: s.start} v.Now = time.Now() v.Uptime = myUptime(time.Since(s.start)) v.Port = v.Info.Port diff --git a/server/opts.go b/server/opts.go index 6aa0ede7603..c9695fcfd19 100644 --- a/server/opts.go +++ b/server/opts.go @@ -1,4 +1,4 @@ -// Copyright 2012-2018 The NATS Authors +// Copyright 2012-2019 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -105,8 +105,8 @@ type Options struct { HTTPPort int `json:"http_port"` HTTPSPort int `json:"https_port"` AuthTimeout float64 `json:"auth_timeout"` - MaxControlLine int `json:"max_control_line"` - MaxPayload int `json:"max_payload"` + MaxControlLine int32 `json:"max_control_line"` + MaxPayload int32 `json:"max_payload"` MaxPending int64 `json:"max_pending"` Cluster ClusterOpts `json:"cluster,omitempty"` Gateway GatewayOpts `json:"gateway,omitempty"` @@ -442,9 +442,19 @@ func (o *Options) ProcessConfigFile(configFile string) error { case "prof_port": o.ProfPort = int(v.(int64)) case "max_control_line": - o.MaxControlLine = int(v.(int64)) + if v.(int64) > 1<<31-1 { + err := &configErr{tk, fmt.Sprintf("%s value is too big", k)} + errors = append(errors, err) + continue + } + o.MaxControlLine = int32(v.(int64)) case "max_payload": - o.MaxPayload = int(v.(int64)) + if v.(int64) > 1<<31-1 { + err := &configErr{tk, fmt.Sprintf("%s value is too big", k)} + errors = append(errors, err) + continue + } + o.MaxPayload = int32(v.(int64)) case "max_pending": o.MaxPending = v.(int64) case "max_connections", "max_conn": diff --git a/server/opts_test.go b/server/opts_test.go index 301eaf621ae..7f490d40829 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -1,4 +1,4 @@ -// Copyright 2012-2018 The NATS Authors +// Copyright 2012-2019 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -1708,3 +1708,31 @@ func TestParsingGatewaysErrors(t *testing.T) { }) } } + +func TestLargeMaxControlLine(t *testing.T) { + confFileName := "big_mcl.conf" + defer os.Remove(confFileName) + content := ` + max_control_line = 3000000000 + ` + if err := ioutil.WriteFile(confFileName, []byte(content), 0666); err != nil { + t.Fatalf("Error writing config file: %v", err) + } + if _, err := ProcessConfigFile(confFileName); err == nil { + t.Fatalf("Expected an error from too large of a max_control_line entry") + } +} + +func TestLargeMaxPayload(t *testing.T) { + confFileName := "big_mp.conf" + defer os.Remove(confFileName) + content := ` + max_payload = 3000000000 + ` + if err := ioutil.WriteFile(confFileName, []byte(content), 0666); err != nil { + t.Fatalf("Error writing config file: %v", err) + } + if _, err := ProcessConfigFile(confFileName); err == nil { + t.Fatalf("Expected an error from too large of a max_payload entry") + } +} diff --git a/server/parser.go b/server/parser.go index 2cb3d9f1ace..0b397e5a60c 100644 --- a/server/parser.go +++ b/server/parser.go @@ -808,7 +808,7 @@ func (c *client) parse(buf []byte) error { // Check for violations of control line length here. Note that this is not // exact at all but the performance hit is too great to be precise, and // catching here should prevent memory exhaustion attacks. - if len(c.argBuf) > mcl { + if len(c.argBuf) > int(mcl) { c.sendErr("Maximum Control Line Exceeded") c.closeConnection(MaxControlLineExceeded) return ErrMaxControlLine diff --git a/server/reload.go b/server/reload.go index 7f5850937a2..932d0c44dfb 100644 --- a/server/reload.go +++ b/server/reload.go @@ -1,4 +1,4 @@ -// Copyright 2017-2018 The NATS Authors +// Copyright 2017-2019 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -399,7 +399,7 @@ func (p *portsFileDirOption) Apply(server *Server) { // `max_control_line` setting. type maxControlLineOption struct { noopOption - newValue int + newValue int32 } // Apply is a no-op because the max control line will be reloaded after options @@ -412,7 +412,7 @@ func (m *maxControlLineOption) Apply(server *Server) { // setting. type maxPayloadOption struct { noopOption - newValue int + newValue int32 } // Apply the setting by updating the server info and each client. @@ -661,9 +661,9 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) { case "portsfiledir": diffOpts = append(diffOpts, &portsFileDirOption{newValue: newValue.(string), oldValue: oldValue.(string)}) case "maxcontrolline": - diffOpts = append(diffOpts, &maxControlLineOption{newValue: newValue.(int)}) + diffOpts = append(diffOpts, &maxControlLineOption{newValue: newValue.(int32)}) case "maxpayload": - diffOpts = append(diffOpts, &maxPayloadOption{newValue: newValue.(int)}) + diffOpts = append(diffOpts, &maxPayloadOption{newValue: newValue.(int32)}) case "pinginterval": diffOpts = append(diffOpts, &pingIntervalOption{newValue: newValue.(time.Duration)}) case "maxpingsout": diff --git a/server/route.go b/server/route.go index c83a7e8876c..3b663eebeaf 100644 --- a/server/route.go +++ b/server/route.go @@ -1,4 +1,4 @@ -// Copyright 2013-2018 The NATS Authors +// Copyright 2013-2019 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -238,7 +238,7 @@ func (c *client) processInboundRoutedMsg(msg []byte) { // Update statistics c.in.msgs++ // The msg includes the CR_LF, so pull back out for accounting. - c.in.bytes += len(msg) - LEN_CR_LF + c.in.bytes += int32(len(msg) - LEN_CR_LF) if c.trace { c.traceMsg(msg) @@ -1268,14 +1268,14 @@ func (s *Server) updateRouteSubscriptionMap(acc *Account, sub *subscription, del var ( _rkey [1024]byte key []byte - qi int + qi int32 ) if sub.queue != nil { // Just make the key subject spc group, e.g. 'foo bar' key = _rkey[:0] key = append(key, sub.subject...) key = append(key, byte(' ')) - qi = len(key) + qi = int32(len(key)) key = append(key, sub.queue...) } else { key = sub.subject diff --git a/server/server.go b/server/server.go index b66c0644311..a688d4b7637 100644 --- a/server/server.go +++ b/server/server.go @@ -1,4 +1,4 @@ -// Copyright 2012-2018 The NATS Authors +// Copyright 2012-2019 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -60,7 +60,7 @@ type Info struct { AuthRequired bool `json:"auth_required,omitempty"` TLSRequired bool `json:"tls_required,omitempty"` TLSVerify bool `json:"tls_verify,omitempty"` - MaxPayload int `json:"max_payload"` + MaxPayload int32 `json:"max_payload"` IP string `json:"ip,omitempty"` CID uint64 `json:"client_id,omitempty"` Nonce string `json:"nonce,omitempty"` @@ -1375,7 +1375,7 @@ func (s *Server) createClient(conn net.Conn) *client { opts := s.getOpts() maxPay := int32(opts.MaxPayload) - maxSubs := opts.MaxSubs + maxSubs := int32(opts.MaxSubs) // For system, maxSubs of 0 means unlimited, so re-adjust here. if maxSubs == 0 { maxSubs = -1