Skip to content

Commit

Permalink
resolver: improve signaling for missing account lookups
Browse files Browse the repository at this point in the history
Signed-off-by: Waldemar Quevedo <wally@nats.io>
  • Loading branch information
wallyqs committed May 11, 2023
1 parent 76f4358 commit 0781d88
Show file tree
Hide file tree
Showing 3 changed files with 240 additions and 25 deletions.
63 changes: 42 additions & 21 deletions server/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"hash/fnv"
"hash/maphash"
"io"
"io/fs"
"math"
"math/rand"
"net/http"
Expand Down Expand Up @@ -3990,20 +3991,23 @@ func (dr *DirAccResolver) Start(s *Server) error {
defer dr.Unlock()
dr.Server = s
dr.operator = opKeys
fetchTimeout := dr.fetchTimeout
dr.DirJWTStore.changed = func(pubKey string) {
if v, ok := s.accounts.Load(pubKey); ok {
if theJwt, err := dr.LoadAcc(pubKey); err != nil {
s.Errorf("update got error on load: %v", err)
s.Errorf("RESOLVER - Update got error on load: %v", err)
} else {
acc := v.(*Account)
if err = s.updateAccountWithClaimJWT(acc, theJwt); err != nil {
s.Errorf("update resulted in error %v", err)
s.Errorf("RESOLVER - Update for account %q resulted in error %v", pubKey, err)
} else {
if _, jsa, err := acc.checkForJetStream(); err != nil {
s.Warnf("error checking for JetStream enabled error %v", err)
if !IsNatsErr(err, JSNotEnabledForAccountErr) {
s.Warnf("RESOLVER - Error checking for JetStream support for account %q: %v", pubKey, err)
}
} else if jsa == nil {
if err = s.configJetStream(acc); err != nil {
s.Errorf("updated resulted in error when configuring JetStream %v", err)
s.Errorf("RESOLVER - Error configuring JetStream for account %q: %v", pubKey, err)
}
}
}
Expand All @@ -4024,7 +4028,7 @@ func (dr *DirAccResolver) Start(s *Server) error {
} else if len(tk) == accUpdateTokensOld {
pubKey = tk[accUpdateAccIdxOld]
} else {
s.Debugf("jwt update skipped due to bad subject %q", subj)
s.Debugf("RESOLVER - JWT update skipped due to bad subject %q", subj)
return
}
if claim, err := jwt.DecodeAccountClaims(string(msg)); err != nil {
Expand Down Expand Up @@ -4074,31 +4078,42 @@ func (dr *DirAccResolver) Start(s *Server) error {
if len(tk) != accLookupReqTokens {
return
}
if theJWT, err := dr.DirJWTStore.LoadAcc(tk[accReqAccIndex]); err != nil {
s.Errorf("Merging resulted in error: %v", err)
accName := tk[accReqAccIndex]
if theJWT, err := dr.DirJWTStore.LoadAcc(accName); err != nil {
if errors.Is(err, fs.ErrNotExist) {
s.Debugf("RESOLVER - Could not find account %q", accName)
go func() {
// Reply with empty response to signal absence of JWT to others,
// but not too fast to mitigate beating actual responses with content.
time.Sleep(fetchTimeout / 2)
s.sendInternalMsgLocked(reply, _EMPTY_, nil, nil)
}()
} else {
s.Errorf("RESOLVER - Error looking up account %q: %v", accName, err)
}
} else {
s.sendInternalMsgLocked(reply, _EMPTY_, nil, []byte(theJWT))
}
}); err != nil {
return fmt.Errorf("error setting up lookup request handling: %v", err)
}
// respond to pack requests with one or more pack messages
// an empty message signifies the end of the response responder
// an empty message signifies the end of the response responder.
if _, err := s.sysSubscribeQ(accPackReqSubj, "responder", func(_ *subscription, _ *client, _ *Account, _, reply string, theirHash []byte) {
if reply == _EMPTY_ {
return
}
ourHash := dr.DirJWTStore.Hash()
if bytes.Equal(theirHash, ourHash[:]) {
s.sendInternalMsgLocked(reply, _EMPTY_, nil, []byte{})
s.Debugf("pack request matches hash %x", ourHash[:])
s.Debugf("RESOLVER - Pack request matches hash %x", ourHash[:])
} else if err := dr.DirJWTStore.PackWalk(1, func(partialPackMsg string) {
s.sendInternalMsgLocked(reply, _EMPTY_, nil, []byte(partialPackMsg))
}); err != nil {
// let them timeout
s.Errorf("pack request error: %v", err)
s.Errorf("RESOLVER - Pack request error: %v", err)
} else {
s.Debugf("pack request hash %x - finished responding with hash %x", theirHash, ourHash)
s.Debugf("RESOLVER - Pack request hash %x - finished responding with hash %x", theirHash, ourHash)
s.sendInternalMsgLocked(reply, _EMPTY_, nil, []byte{})
}
}); err != nil {
Expand All @@ -4119,12 +4134,12 @@ func (dr *DirAccResolver) Start(s *Server) error {
if _, err := s.sysSubscribe(packRespIb, func(_ *subscription, _ *client, _ *Account, _, _ string, msg []byte) {
hash := dr.DirJWTStore.Hash()
if len(msg) == 0 { // end of response stream
s.Debugf("Merging Finished and resulting in: %x", dr.DirJWTStore.Hash())
s.Debugf("RESOLVER - Merging finished and resulting in: %x", dr.DirJWTStore.Hash())
return
} else if err := dr.DirJWTStore.Merge(string(msg)); err != nil {
s.Errorf("Merging resulted in error: %v", err)
s.Errorf("RESOLVER - Merging resulted in error: %v", err)
} else {
s.Debugf("Merging succeeded and changed %x to %x", hash, dr.DirJWTStore.Hash())
s.Debugf("RESOLVER - Merging succeeded and changed %x to %x", hash, dr.DirJWTStore.Hash())
}
}); err != nil {
return fmt.Errorf("error setting up pack response handling: %v", err)
Expand All @@ -4142,7 +4157,7 @@ func (dr *DirAccResolver) Start(s *Server) error {
case <-ticker.C:
}
ourHash := dr.DirJWTStore.Hash()
s.Debugf("Checking store state: %x", ourHash)
s.Debugf("RESOLVER - Checking store state: %x", ourHash)
s.sendInternalMsgLocked(accPackReqSubj, packRespIb, nil, ourHash[:])
}
})
Expand Down Expand Up @@ -4229,14 +4244,18 @@ func (s *Server) fetch(res AccountResolver, name string, timeout time.Duration)
}
replySubj := s.newRespInbox()
replies := s.sys.replies

// Store our handler.
replies[replySubj] = func(sub *subscription, _ *client, _ *Account, subject, _ string, msg []byte) {
replies[replySubj] = func(sub *subscription, _ *client, _ *Account, subject, reply string, msg []byte) {
clone := make([]byte, len(msg))
copy(clone, msg)

s.mu.Lock()
// Use the first valid response if there is still interest or
// one of the empty responses to signal that it was not found.
if _, ok := replies[replySubj]; ok {
select {
case respC <- clone: // only use first response and only if there is still interest
case respC <- clone:
default:
}
}
Expand All @@ -4253,7 +4272,9 @@ func (s *Server) fetch(res AccountResolver, name string, timeout time.Duration)
case <-time.After(timeout):
err = errors.New("fetching jwt timed out")
case m := <-respC:
if err = res.Store(name, string(m)); err == nil {
if len(m) == 0 {
err = errors.New("account jwt not found")
} else if err = res.Store(name, string(m)); err == nil {
theJWT = string(m)
}
}
Expand Down Expand Up @@ -4291,9 +4312,9 @@ func (dr *CacheDirAccResolver) Start(s *Server) error {
dr.DirJWTStore.changed = func(pubKey string) {
if v, ok := s.accounts.Load(pubKey); !ok {
} else if theJwt, err := dr.LoadAcc(pubKey); err != nil {
s.Errorf("update got error on load: %v", err)
s.Errorf("RESOLVER - Update got error on load: %v", err)
} else if err := s.updateAccountWithClaimJWT(v.(*Account), theJwt); err != nil {
s.Errorf("update resulted in error %v", err)
s.Errorf("RESOLVER - Update resulted in error %v", err)
}
}
dr.DirJWTStore.deleted = func(pubKey string) {
Expand All @@ -4309,7 +4330,7 @@ func (dr *CacheDirAccResolver) Start(s *Server) error {
} else if len(tk) == accUpdateTokensOld {
pubKey = tk[accUpdateAccIdxOld]
} else {
s.Debugf("jwt update cache skipped due to bad subject %q", subj)
s.Debugf("RESOLVER - JWT update cache skipped due to bad subject %q", subj)
return
}
if claim, err := jwt.DecodeAccountClaims(string(msg)); err != nil {
Expand Down
7 changes: 7 additions & 0 deletions server/dirstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,10 @@ func (store *DirJWTStore) PackWalk(maxJWTs int, cb func(partialPackMsg string))
if err != nil {
return err
}
if len(jwtBytes) == 0 {
// Skip if no contents in the JWT.
return nil
}
if exp != nil {
claim, err := jwt.DecodeGeneric(string(jwtBytes))
if err == nil && claim.Expires > 0 && claim.Expires < time.Now().Unix() {
Expand Down Expand Up @@ -406,6 +410,9 @@ func (store *DirJWTStore) load(publicKey string) (string, error) {
// write that keeps hash of all jwt in sync
// Assumes the lock is held. Does return true or an error never both.
func (store *DirJWTStore) write(path string, publicKey string, theJWT string) (bool, error) {
if len(theJWT) == 0 {
return false, fmt.Errorf("invalid JWT")
}
var newHash *[sha256.Size]byte
if store.expiration != nil {
h := sha256.Sum256([]byte(theJWT))
Expand Down
Loading

0 comments on commit 0781d88

Please sign in to comment.