Skip to content

Commit

Permalink
Merge pull request #897 from nats-io/speed
Browse files Browse the repository at this point in the history
Some Optimizations
  • Loading branch information
derekcollison committed Feb 5, 2019
2 parents cfa0685 + af78552 commit 36b9026
Show file tree
Hide file tree
Showing 15 changed files with 164 additions and 116 deletions.
40 changes: 20 additions & 20 deletions server/accounts.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2018 The NATS Authors
// Copyright 2018-2019 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -34,7 +34,7 @@ const globalAccountName = "$G"
// Route Map Entry - used for efficient interest graph propagation.
// TODO(dlc) - squeeze even more?
type rme struct {
qi int // used to index into key from map for optional queue name
qi int32 // used to index into key from map for optional queue name
n int32 // number of subscriptions directly matching, local subs only.
}

Expand All @@ -50,15 +50,15 @@ type Account struct {
sl *Sublist
etmr *time.Timer
ctmr *time.Timer
strack map[string]int
nrclients int
sysclients int
strack map[string]int32
nrclients int32
sysclients int32
clients map[*client]*client
rm map[string]*rme
imports importMap
exports exportMap
limits
nae int
nae int32
pruning bool
expired bool
srv *Server // server this account is registered with (possibly nil)
Expand All @@ -67,9 +67,9 @@ type Account struct {
// Account based limits.
type limits struct {
mpay int32
msubs int
mconns int
maxnae int
msubs int32
mconns int32
maxnae int32
maxaettl time.Duration
}

Expand Down Expand Up @@ -137,7 +137,7 @@ func (a *Account) shallowCopy() *Account {
func (a *Account) NumConnections() int {
a.mu.RLock()
defer a.mu.RUnlock()
return len(a.clients) + a.nrclients
return len(a.clients) + int(a.nrclients)
}

// NumLocalClients returns active number of clients for this account
Expand All @@ -150,7 +150,7 @@ func (a *Account) NumLocalConnections() int {

// Do not account for the system accounts.
func (a *Account) numLocalConnections() int {
return len(a.clients) - a.sysclients
return len(a.clients) - int(a.sysclients)
}

// MaxClientsReached returns if we have reached our limit for number of connections.
Expand All @@ -162,7 +162,7 @@ func (a *Account) MaxTotalConnectionsReached() bool {

func (a *Account) maxTotalConnectionsReached() bool {
if a.mconns != jwt.NoLimit {
return len(a.clients)-a.sysclients+a.nrclients >= a.mconns
return len(a.clients)-int(a.sysclients)+int(a.nrclients) >= int(a.mconns)
}
return false
}
Expand All @@ -172,7 +172,7 @@ func (a *Account) maxTotalConnectionsReached() bool {
func (a *Account) MaxActiveConnections() int {
a.mu.RLock()
defer a.mu.RUnlock()
return a.mconns
return int(a.mconns)
}

// RoutedSubs returns how many subjects we would send across a route when first
Expand Down Expand Up @@ -315,22 +315,22 @@ func (a *Account) removeServiceImport(subject string) {
func (a *Account) numAutoExpireResponseMaps() int {
a.mu.RLock()
defer a.mu.RUnlock()
return a.nae
return int(a.nae)
}

// MaxAutoExpireResponseMaps return the maximum number of
// auto expire response maps we will allow.
func (a *Account) MaxAutoExpireResponseMaps() int {
a.mu.RLock()
defer a.mu.RUnlock()
return a.maxnae
return int(a.maxnae)
}

// SetMaxAutoExpireResponseMaps sets the max outstanding auto expire response maps.
func (a *Account) SetMaxAutoExpireResponseMaps(max int) {
a.mu.Lock()
defer a.mu.Unlock()
a.maxnae = max
a.maxnae = int32(max)
}

// AutoExpireTTL returns the ttl for response maps.
Expand Down Expand Up @@ -415,7 +415,7 @@ func (a *Account) pruneAutoExpireResponseMaps() {
}

a.mu.RLock()
numOver := a.nae - a.maxnae
numOver := int(a.nae - a.maxnae)
a.mu.RUnlock()

if numOver <= 0 {
Expand Down Expand Up @@ -931,9 +931,9 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) {
}

// Now do limits if they are present.
a.msubs = int(ac.Limits.Subs)
a.msubs = int32(ac.Limits.Subs)
a.mpay = int32(ac.Limits.Payload)
a.mconns = int(ac.Limits.Conn)
a.mconns = int32(ac.Limits.Conn)

clients := gatherClients()
// Sort if we are over the limit.
Expand All @@ -943,7 +943,7 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) {
})
}
for i, c := range clients {
if a.mconns != jwt.NoLimit && i >= a.mconns {
if a.mconns != jwt.NoLimit && i >= int(a.mconns) {
c.maxAccountConnExceeded()
continue
}
Expand Down
89 changes: 50 additions & 39 deletions server/client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2012-2018 The NATS Authors
// Copyright 2012-2019 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -145,7 +145,8 @@ type client struct {
// Here first because of use of atomics, and memory alignment.
stats
mpay int32
msubs int
msubs int32
mcl int32
mu sync.Mutex
kind int
cid uint64
Expand All @@ -159,7 +160,7 @@ type client struct {
acc *Account
user *NkeyUser
host string
port int
port uint16
subs map[string]*subscription
perms *permissions
mperms *msgDeny
Expand Down Expand Up @@ -197,16 +198,16 @@ type outbound struct {
p []byte // Primary write buffer
s []byte // Secondary for use post flush
nb net.Buffers // net.Buffers for writev IO
sz int // limit size per []byte, uses variable BufSize constants, start, min, max.
sws int // Number of short writes, used for dynamic resizing.
pb int64 // Total pending/queued bytes.
pm int64 // Total pending/queued messages.
sz int32 // limit size per []byte, uses variable BufSize constants, start, min, max.
sws int32 // Number of short writes, used for dynamic resizing.
pb int32 // Total pending/queued bytes.
pm int32 // Total pending/queued messages.
sg *sync.Cond // Flusher conditional for signaling.
sgw bool // Indicate flusher is waiting on condition wait.
wdl time.Duration // Snapshot fo write deadline.
mp int64 // snapshot of max pending.
fsp int // Flush signals that are pending from readLoop's pcd.
mp int32 // snapshot of max pending.
fsp int32 // Flush signals that are pending from readLoop's pcd.
lft time.Duration // Last flush time.
sgw bool // Indicate flusher is waiting on condition wait.
}

type perm struct {
Expand Down Expand Up @@ -258,11 +259,14 @@ type readCache struct {
rts []routeTarget

prand *rand.Rand
msgs int
bytes int
subs int
rsz int // Read buffer size
srs int // Short reads, used for dynamic buffer resizing.

// These are all temporary totals for an invocation of a read in readloop.
msgs int32
bytes int32
subs int32

rsz int32 // Read buffer size
srs int32 // Short reads, used for dynamic buffer resizing.
}

const (
Expand Down Expand Up @@ -353,7 +357,7 @@ func (c *client) initClient() {
opts := s.getOpts()
// Snapshots to avoid mutex access in fast paths.
c.out.wdl = opts.WriteDeadline
c.out.mp = opts.MaxPending
c.out.mp = int32(opts.MaxPending)

c.subs = make(map[string]*subscription)
c.echo = true
Expand All @@ -376,7 +380,7 @@ func (c *client) initClient() {
if ip, ok := c.nc.(*net.TCPConn); ok {
addr := ip.RemoteAddr().(*net.TCPAddr)
c.host = addr.IP.String()
c.port = addr.Port
c.port = uint16(addr.Port)
conn = fmt.Sprintf("%s:%d", addr.IP, addr.Port)
}

Expand Down Expand Up @@ -447,12 +451,12 @@ func (c *client) registerWithAccount(acc *Account) error {

// Helper to determine if we have exceeded max subs.
func (c *client) subsExceeded() bool {
return c.msubs != jwt.NoLimit && len(c.subs) > c.msubs
return c.msubs != jwt.NoLimit && len(c.subs) > int(c.msubs)
}

// Helper to determine if we have met or exceeded max subs.
func (c *client) subsAtLimit() bool {
return c.msubs != jwt.NoLimit && len(c.subs) >= c.msubs
return c.msubs != jwt.NoLimit && len(c.subs) >= int(c.msubs)
}

// Apply account limits
Expand Down Expand Up @@ -480,9 +484,9 @@ func (c *client) applyAccountLimits() {
}

// We check here if the server has an option set that is lower than the account limit.
if c.msubs != jwt.NoLimit && opts.MaxSubs != 0 && opts.MaxSubs < c.acc.msubs {
if c.msubs != jwt.NoLimit && opts.MaxSubs != 0 && opts.MaxSubs < int(c.acc.msubs) {
c.Errorf("Max Subscriptions set to %d from server config which overrides %d from account claims", opts.MaxSubs, c.acc.msubs)
c.msubs = opts.MaxSubs
c.msubs = int32(opts.MaxSubs)
}

if c.subsExceeded() {
Expand Down Expand Up @@ -656,6 +660,15 @@ func (c *client) readLoop() {
nc := c.nc
s := c.srv
c.in.rsz = startBufSize
// Snapshot max control line since currently can not be changed on reload and we
// were checking it on each call to parse. If this changes and we allow MaxControlLine
// to be reloaded without restart, this code will need to change.
c.mcl = MAX_CONTROL_LINE_SIZE
if s != nil {
if opts := s.getOpts(); opts != nil {
c.mcl = int32(opts.MaxControlLine)
}
}
defer s.grWG.Done()
c.mu.Unlock()

Expand Down Expand Up @@ -748,11 +761,11 @@ func (c *client) readLoop() {
// Update read buffer size as/if needed.
if n >= cap(b) && cap(b) < maxBufSize {
// Grow
c.in.rsz = cap(b) * 2
c.in.rsz = int32(cap(b) * 2)
b = make([]byte, c.in.rsz)
} else if n < cap(b) && cap(b) > minBufSize && c.in.srs > shortsToShrink {
// Shrink, for now don't accelerate, ping/pong will eventually sort it out.
c.in.rsz = cap(b) / 2
c.in.rsz = int32(cap(b) / 2)
b = make([]byte, c.in.rsz)
}
c.mu.Unlock()
Expand Down Expand Up @@ -845,11 +858,11 @@ func (c *client) flushOutbound() bool {
c.out.lft = lft

// Subtract from pending bytes and messages.
c.out.pb -= n
c.out.pb -= int32(n)
c.out.pm -= apm // FIXME(dlc) - this will not be totally accurate.

// Check for partial writes
if n != attempted && n > 0 {
if n != int64(attempted) && n > 0 {
c.handlePartialWrite(nb)
} else if n >= int64(c.out.sz) {
c.out.sws = 0
Expand Down Expand Up @@ -892,10 +905,10 @@ func (c *client) flushOutbound() bool {
}

// Adjust based on what we wrote plus any pending.
pt := int(n + c.out.pb)
pt := int32(n) + c.out.pb

// Adjust sz as needed downward, keeping power of 2.
// We do this at a slower rate, hence the pt*4.
// We do this at a slower rate.
if pt < c.out.sz && c.out.sz > minBufSize {
c.out.sws++
if c.out.sws > shortsToShrink {
Expand All @@ -910,11 +923,11 @@ func (c *client) flushOutbound() bool {
// Check to see if we can reuse buffers.
if len(cnb) > 0 {
oldp := cnb[0][:0]
if cap(oldp) >= c.out.sz {
if cap(oldp) >= int(c.out.sz) {
// Replace primary or secondary if they are nil, reusing same buffer.
if c.out.p == nil {
c.out.p = oldp
} else if c.out.s == nil || cap(c.out.s) < c.out.sz {
} else if c.out.s == nil || cap(c.out.s) < int(c.out.sz) {
c.out.s = oldp
}
}
Expand Down Expand Up @@ -1234,7 +1247,7 @@ func (c *client) queueOutbound(data []byte) bool {
// Assume data will not be referenced
referenced := false
// Add to pending bytes total.
c.out.pb += int64(len(data))
c.out.pb += int32(len(data))

// Check for slow consumer via pending bytes limit.
// ok to return here, client is going away.
Expand All @@ -1249,7 +1262,7 @@ func (c *client) queueOutbound(data []byte) bool {
if c.out.sz == 0 {
c.out.sz = startBufSize
}
if c.out.s != nil && cap(c.out.s) >= c.out.sz {
if c.out.s != nil && cap(c.out.s) >= int(c.out.sz) {
c.out.p = c.out.s
c.out.s = nil
} else {
Expand All @@ -1262,7 +1275,7 @@ func (c *client) queueOutbound(data []byte) bool {
if len(data) > available {
// We can fit into existing primary, but message will fit in next one
// we allocate or utilize from the secondary. So copy what we can.
if available > 0 && len(data) < c.out.sz {
if available > 0 && len(data) < int(c.out.sz) {
c.out.p = append(c.out.p, data[:available]...)
data = data[available:]
}
Expand All @@ -1283,10 +1296,10 @@ func (c *client) queueOutbound(data []byte) bool {
if (c.out.sz << 1) <= maxBufSize {
c.out.sz <<= 1
}
if len(data) > c.out.sz {
if len(data) > int(c.out.sz) {
c.out.p = make([]byte, 0, len(data))
} else {
if c.out.s != nil && cap(c.out.s) >= c.out.sz { // TODO(dlc) - Size mismatch?
if c.out.s != nil && cap(c.out.s) >= int(c.out.sz) { // TODO(dlc) - Size mismatch?
c.out.p = c.out.s
c.out.s = nil
} else {
Expand Down Expand Up @@ -2115,7 +2128,7 @@ func (c *client) processInboundClientMsg(msg []byte) {
// Update statistics
// The msg includes the CR_LF, so pull back out for accounting.
c.in.msgs++
c.in.bytes += len(msg) - LEN_CR_LF
c.in.bytes += int32(len(msg) - LEN_CR_LF)

if c.trace {
c.traceMsg(msg)
Expand Down Expand Up @@ -2540,11 +2553,9 @@ func (c *client) clearAuthTimer() bool {

// We may reuse atmr for expiring user jwts,
// so check connectReceived.
// Lock assume held on entry.
func (c *client) awaitingAuth() bool {
c.mu.Lock()
authSet := !c.flags.isSet(connectReceived) && c.atmr != nil
c.mu.Unlock()
return authSet
return !c.flags.isSet(connectReceived) && c.atmr != nil
}

// This will set the atmr for the JWT expiration time.
Expand Down

0 comments on commit 36b9026

Please sign in to comment.