Skip to content

Commit

Permalink
Merge pull request #761 from nats-io/expire
Browse files Browse the repository at this point in the history
Add max and ttl for reponse maps
  • Loading branch information
derekcollison committed Oct 3, 2018
2 parents fd3e5fd + 364f3fe commit 871644b
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 19 deletions.
55 changes: 52 additions & 3 deletions server/accounts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"os"
"strings"
"testing"
"time"

"github.com/nats-io/nkeys"
)
Expand Down Expand Up @@ -745,9 +746,9 @@ func TestCrossAccountRequestReply(t *testing.T) {
t.Fatalf("Error registering client with 'bar' account: %v", err)
}

// Add in the service import for the requests. Make it public.
// Add in the service export for the requests. Make it public.
if err := cfoo.acc.addServiceExport("test.request", nil); err != nil {
t.Fatalf("Error adding account service import to client foo: %v", err)
t.Fatalf("Error adding account service export to client foo: %v", err)
}

// Test addServiceImport to make sure it requires accounts, and literalsubjects for both from and to subjects.
Expand All @@ -763,7 +764,7 @@ func TestCrossAccountRequestReply(t *testing.T) {

// Now add in the Route for request to be routed to the foo account.
if err := cbar.acc.addServiceImport(fooAcc, "foo", "test.request"); err != nil {
t.Fatalf("Error adding account route to client bar: %v", err)
t.Fatalf("Error adding account service import to client bar: %v", err)
}

// Now setup the resonder under cfoo
Expand Down Expand Up @@ -827,6 +828,54 @@ func TestCrossAccountRequestReply(t *testing.T) {
}
}

func TestCrossAccountRequestReplyResponseMaps(t *testing.T) {
s, fooAcc, barAcc := simpleAccountServer(t)
defer s.Shutdown()

ttl := 500 * time.Millisecond
barAcc.setMaxAutoExpireResponseMaps(5)
barAcc.setMaxAutoExpireTTL(ttl)
cfoo, _, _ := newClientForServer(s)
defer cfoo.nc.Close()

if err := cfoo.registerWithAccount(fooAcc); err != nil {
t.Fatalf("Error registering client with 'foo' account: %v", err)
}

if err := barAcc.addServiceExport("test.request", nil); err != nil {
t.Fatalf("Error adding account service export: %v", err)
}
if err := fooAcc.addServiceImport(barAcc, "foo", "test.request"); err != nil {
t.Fatalf("Error adding account service import: %v", err)
}

for i := 0; i < 10; i++ {
cfoo.parseAndFlush([]byte("PUB foo bar 4\r\nhelp\r\n"))
}

// We should expire because max.
checkFor(t, time.Second, 10*time.Millisecond, func() error {
if nae := barAcc.numAutoExpireResponseMaps(); nae != 5 {
return fmt.Errorf("Number of responsemaps is %d", nae)
}
return nil
})

// Wait for the ttl to expire.
time.Sleep(2 * ttl)

// Now run prune and make sure we collect the timedout ones.
barAcc.pruneAutoExpireResponseMaps()

// We should expire because ttl.
checkFor(t, time.Second, 10*time.Millisecond, func() error {
if nae := barAcc.numAutoExpireResponseMaps(); nae != 0 {
return fmt.Errorf("Number of responsemaps is %d", nae)
}
return nil
})
}

func TestAccountMapsUsers(t *testing.T) {
// Used for the nkey users to properly sign.
seed1 := "SUAPM67TC4RHQLKBX55NIQXSMATZDOZK6FNEOSS36CAYA7F7TY66LP4BOM"
Expand Down
116 changes: 109 additions & 7 deletions server/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ package server
import (
"crypto/tls"
"encoding/base64"
"sort"
"strings"
"sync"
"time"

"github.com/nats-io/nkeys"
"golang.org/x/crypto/bcrypt"
Expand All @@ -41,12 +43,16 @@ type ClientAuthentication interface {

// Accounts
type Account struct {
Name string
Nkey string
mu sync.RWMutex
sl *Sublist
imports importMap
exports exportMap
Name string
Nkey string
mu sync.RWMutex
sl *Sublist
imports importMap
exports exportMap
nae int
maxnae int
maxaettl time.Duration
prunning bool
}

// Import stream mapping struct
Expand All @@ -62,6 +68,7 @@ type serviceImport struct {
from string
to string
ae bool
ts int64
}

// importMap tracks the imported streams and services.
Expand Down Expand Up @@ -129,10 +136,54 @@ func (a *Account) addServiceImport(destination *Account, from, to string) error
// removeServiceImport will remove the route by subject.
func (a *Account) removeServiceImport(subject string) {
a.mu.Lock()
si, ok := a.imports.services[subject]
if ok && si != nil && si.ae {
a.nae--
}
delete(a.imports.services, subject)
a.mu.Unlock()
}

// Return the number of AutoExpireResponseMaps for request/reply. These are mapped to the account that
func (a *Account) numAutoExpireResponseMaps() int {
a.mu.RLock()
defer a.mu.RUnlock()
return a.nae
}

// Set the max outstanding auto expire response maps.
func (a *Account) setMaxAutoExpireResponseMaps(max int) {
a.mu.Lock()
defer a.mu.Unlock()
a.maxnae = max
}

// Set the max ttl for response maps.
func (a *Account) setMaxAutoExpireTTL(ttl time.Duration) {
a.mu.Lock()
defer a.mu.Unlock()
a.maxaettl = ttl
}

// Return a list of the current autoExpireResponseMaps.
func (a *Account) autoExpireResponseMaps() []*serviceImport {
a.mu.RLock()
defer a.mu.RUnlock()
if len(a.imports.services) == 0 {
return nil
}
aesis := make([]*serviceImport, 0, len(a.imports.services))
for _, si := range a.imports.services {
if si.ae {
aesis = append(aesis, si)
}
}
sort.Slice(aesis, func(i, j int) bool {
return aesis[i].ts < aesis[j].ts
})
return aesis
}

// Add a route to connect from an implicit route created for a response to a request.
// This does no checks and should be only called by the msg processing code. Use addRoute
// above if responding to user input or config, etc.
Expand All @@ -141,11 +192,62 @@ func (a *Account) addImplicitServiceImport(destination *Account, from, to string
if a.imports.services == nil {
a.imports.services = make(map[string]*serviceImport)
}
a.imports.services[from] = &serviceImport{destination, from, to, autoexpire}
si := &serviceImport{destination, from, to, autoexpire, 0}
a.imports.services[from] = si
if autoexpire {
a.nae++
si.ts = time.Now().Unix()
if a.nae > a.maxnae && !a.prunning {
a.prunning = true
go a.pruneAutoExpireResponseMaps()
}
}
a.mu.Unlock()
return nil
}

// This will prune the list to below the threshold and remove all ttl'd maps.
func (a *Account) pruneAutoExpireResponseMaps() {
defer func() {
a.mu.Lock()
a.prunning = false
a.mu.Unlock()
}()

a.mu.RLock()
ttl := int64(a.maxaettl/time.Second) + 1
a.mu.RUnlock()

for {
sis := a.autoExpireResponseMaps()

// Check ttl items.
now := time.Now().Unix()
for i, si := range sis {
if now-si.ts >= ttl {
a.removeServiceImport(si.from)
} else {
sis = sis[i:]
break
}
}

a.mu.RLock()
numOver := a.nae - a.maxnae
a.mu.RUnlock()

if numOver <= 0 {
return
} else if numOver >= len(sis) {
numOver = len(sis) - 1
}
// These are in sorted order, remove at least numOver
for _, si := range sis[:numOver] {
a.removeServiceImport(si.from)
}
}
}

// addStreamImport will add in the stream import from a specific account.
func (a *Account) addStreamImport(account *Account, from, prefix string) error {
if account == nil {
Expand Down
6 changes: 6 additions & 0 deletions server/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,10 @@ const (

// DEFAULT_MAX_CLOSED_CLIENTS
DEFAULT_MAX_CLOSED_CLIENTS = 10000

// DEFAULT_MAX_ACCOUNT_AE_RESPONSE_MAPS
DEFAULT_MAX_ACCOUNT_AE_RESPONSE_MAPS = 100000

// DEFAULT_TTL_AE_RESPONSE_MAP
DEFAULT_TTL_AE_RESPONSE_MAP = 10 * time.Minute
)
26 changes: 17 additions & 9 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,7 @@ func (s *Server) setOpts(opts *Options) {
func (s *Server) configureAccounts() {
// Check opts and walk through them. Making sure to create SLs.
for _, acc := range s.opts.Accounts {
if acc.sl == nil {
acc.sl = NewSublist()
}
s.accounts[acc.Name] = acc
s.registerAccount(acc)
}
}

Expand Down Expand Up @@ -329,14 +326,25 @@ func (s *Server) RegisterAccount(name string) (*Account, error) {
if _, ok := s.accounts[name]; ok {
return nil, ErrAccountExists
}
acc := &Account{
Name: name,
sl: NewSublist(),
}
s.accounts[name] = acc
acc := &Account{Name: name}
s.registerAccount(acc)
return acc, nil
}

// Place common account setup here.
func (s *Server) registerAccount(acc *Account) {
if acc.sl == nil {
acc.sl = NewSublist()
}
if acc.maxnae == 0 {
acc.maxnae = DEFAULT_MAX_ACCOUNT_AE_RESPONSE_MAPS
}
if acc.maxaettl == 0 {
acc.maxaettl = DEFAULT_TTL_AE_RESPONSE_MAP
}
s.accounts[acc.Name] = acc
}

// LookupAccount is a public function to return the account structure
// associated with name.
func (s *Server) LookupAccount(name string) *Account {
Expand Down

0 comments on commit 871644b

Please sign in to comment.