From 11915f44af549657d035daf0be0b5fe57e545019 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Wed, 10 May 2023 17:03:43 -0700 Subject: [PATCH 1/6] resolver: improve signaling for missing account lookups Signed-off-by: Waldemar Quevedo --- server/accounts.go | 63 ++++++++++----- server/dirstore.go | 7 ++ server/jwt_test.go | 195 ++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 240 insertions(+), 25 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index 8f62104b38..eecce13db2 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -21,6 +21,7 @@ import ( "hash/fnv" "hash/maphash" "io" + "io/fs" "math" "math/rand" "net/http" @@ -3990,20 +3991,23 @@ func (dr *DirAccResolver) Start(s *Server) error { defer dr.Unlock() dr.Server = s dr.operator = opKeys + fetchTimeout := dr.fetchTimeout dr.DirJWTStore.changed = func(pubKey string) { if v, ok := s.accounts.Load(pubKey); ok { if theJwt, err := dr.LoadAcc(pubKey); err != nil { - s.Errorf("update got error on load: %v", err) + s.Errorf("RESOLVER - Update got error on load: %v", err) } else { acc := v.(*Account) if err = s.updateAccountWithClaimJWT(acc, theJwt); err != nil { - s.Errorf("update resulted in error %v", err) + s.Errorf("RESOLVER - Update for account %q resulted in error %v", pubKey, err) } else { if _, jsa, err := acc.checkForJetStream(); err != nil { - s.Warnf("error checking for JetStream enabled error %v", err) + if !IsNatsErr(err, JSNotEnabledForAccountErr) { + s.Warnf("RESOLVER - Error checking for JetStream support for account %q: %v", pubKey, err) + } } else if jsa == nil { if err = s.configJetStream(acc); err != nil { - s.Errorf("updated resulted in error when configuring JetStream %v", err) + s.Errorf("RESOLVER - Error configuring JetStream for account %q: %v", pubKey, err) } } } @@ -4024,7 +4028,7 @@ func (dr *DirAccResolver) Start(s *Server) error { } else if len(tk) == accUpdateTokensOld { pubKey = tk[accUpdateAccIdxOld] } else { - s.Debugf("jwt update skipped due to bad subject %q", subj) + s.Debugf("RESOLVER - JWT update skipped due to bad subject %q", subj) return } if claim, err := jwt.DecodeAccountClaims(string(msg)); err != nil { @@ -4074,8 +4078,19 @@ func (dr *DirAccResolver) Start(s *Server) error { if len(tk) != accLookupReqTokens { return } - if theJWT, err := dr.DirJWTStore.LoadAcc(tk[accReqAccIndex]); err != nil { - s.Errorf("Merging resulted in error: %v", err) + accName := tk[accReqAccIndex] + if theJWT, err := dr.DirJWTStore.LoadAcc(accName); err != nil { + if errors.Is(err, fs.ErrNotExist) { + s.Debugf("RESOLVER - Could not find account %q", accName) + go func() { + // Reply with empty response to signal absence of JWT to others, + // but not too fast to mitigate beating actual responses with content. + time.Sleep(fetchTimeout / 2) + s.sendInternalMsgLocked(reply, _EMPTY_, nil, nil) + }() + } else { + s.Errorf("RESOLVER - Error looking up account %q: %v", accName, err) + } } else { s.sendInternalMsgLocked(reply, _EMPTY_, nil, []byte(theJWT)) } @@ -4083,7 +4098,7 @@ func (dr *DirAccResolver) Start(s *Server) error { return fmt.Errorf("error setting up lookup request handling: %v", err) } // respond to pack requests with one or more pack messages - // an empty message signifies the end of the response responder + // an empty message signifies the end of the response responder. if _, err := s.sysSubscribeQ(accPackReqSubj, "responder", func(_ *subscription, _ *client, _ *Account, _, reply string, theirHash []byte) { if reply == _EMPTY_ { return @@ -4091,14 +4106,14 @@ func (dr *DirAccResolver) Start(s *Server) error { ourHash := dr.DirJWTStore.Hash() if bytes.Equal(theirHash, ourHash[:]) { s.sendInternalMsgLocked(reply, _EMPTY_, nil, []byte{}) - s.Debugf("pack request matches hash %x", ourHash[:]) + s.Debugf("RESOLVER - Pack request matches hash %x", ourHash[:]) } else if err := dr.DirJWTStore.PackWalk(1, func(partialPackMsg string) { s.sendInternalMsgLocked(reply, _EMPTY_, nil, []byte(partialPackMsg)) }); err != nil { // let them timeout - s.Errorf("pack request error: %v", err) + s.Errorf("RESOLVER - Pack request error: %v", err) } else { - s.Debugf("pack request hash %x - finished responding with hash %x", theirHash, ourHash) + s.Debugf("RESOLVER - Pack request hash %x - finished responding with hash %x", theirHash, ourHash) s.sendInternalMsgLocked(reply, _EMPTY_, nil, []byte{}) } }); err != nil { @@ -4119,12 +4134,12 @@ func (dr *DirAccResolver) Start(s *Server) error { if _, err := s.sysSubscribe(packRespIb, func(_ *subscription, _ *client, _ *Account, _, _ string, msg []byte) { hash := dr.DirJWTStore.Hash() if len(msg) == 0 { // end of response stream - s.Debugf("Merging Finished and resulting in: %x", dr.DirJWTStore.Hash()) + s.Debugf("RESOLVER - Merging finished and resulting in: %x", dr.DirJWTStore.Hash()) return } else if err := dr.DirJWTStore.Merge(string(msg)); err != nil { - s.Errorf("Merging resulted in error: %v", err) + s.Errorf("RESOLVER - Merging resulted in error: %v", err) } else { - s.Debugf("Merging succeeded and changed %x to %x", hash, dr.DirJWTStore.Hash()) + s.Debugf("RESOLVER - Merging succeeded and changed %x to %x", hash, dr.DirJWTStore.Hash()) } }); err != nil { return fmt.Errorf("error setting up pack response handling: %v", err) @@ -4142,7 +4157,7 @@ func (dr *DirAccResolver) Start(s *Server) error { case <-ticker.C: } ourHash := dr.DirJWTStore.Hash() - s.Debugf("Checking store state: %x", ourHash) + s.Debugf("RESOLVER - Checking store state: %x", ourHash) s.sendInternalMsgLocked(accPackReqSubj, packRespIb, nil, ourHash[:]) } }) @@ -4229,14 +4244,18 @@ func (s *Server) fetch(res AccountResolver, name string, timeout time.Duration) } replySubj := s.newRespInbox() replies := s.sys.replies + // Store our handler. - replies[replySubj] = func(sub *subscription, _ *client, _ *Account, subject, _ string, msg []byte) { + replies[replySubj] = func(sub *subscription, _ *client, _ *Account, subject, reply string, msg []byte) { clone := make([]byte, len(msg)) copy(clone, msg) + s.mu.Lock() + // Use the first valid response if there is still interest or + // one of the empty responses to signal that it was not found. if _, ok := replies[replySubj]; ok { select { - case respC <- clone: // only use first response and only if there is still interest + case respC <- clone: default: } } @@ -4253,7 +4272,9 @@ func (s *Server) fetch(res AccountResolver, name string, timeout time.Duration) case <-time.After(timeout): err = errors.New("fetching jwt timed out") case m := <-respC: - if err = res.Store(name, string(m)); err == nil { + if len(m) == 0 { + err = errors.New("account jwt not found") + } else if err = res.Store(name, string(m)); err == nil { theJWT = string(m) } } @@ -4291,9 +4312,9 @@ func (dr *CacheDirAccResolver) Start(s *Server) error { dr.DirJWTStore.changed = func(pubKey string) { if v, ok := s.accounts.Load(pubKey); !ok { } else if theJwt, err := dr.LoadAcc(pubKey); err != nil { - s.Errorf("update got error on load: %v", err) + s.Errorf("RESOLVER - Update got error on load: %v", err) } else if err := s.updateAccountWithClaimJWT(v.(*Account), theJwt); err != nil { - s.Errorf("update resulted in error %v", err) + s.Errorf("RESOLVER - Update resulted in error %v", err) } } dr.DirJWTStore.deleted = func(pubKey string) { @@ -4309,7 +4330,7 @@ func (dr *CacheDirAccResolver) Start(s *Server) error { } else if len(tk) == accUpdateTokensOld { pubKey = tk[accUpdateAccIdxOld] } else { - s.Debugf("jwt update cache skipped due to bad subject %q", subj) + s.Debugf("RESOLVER - JWT update cache skipped due to bad subject %q", subj) return } if claim, err := jwt.DecodeAccountClaims(string(msg)); err != nil { diff --git a/server/dirstore.go b/server/dirstore.go index cabd4a1997..b39ab9ae08 100644 --- a/server/dirstore.go +++ b/server/dirstore.go @@ -288,6 +288,10 @@ func (store *DirJWTStore) PackWalk(maxJWTs int, cb func(partialPackMsg string)) if err != nil { return err } + if len(jwtBytes) == 0 { + // Skip if no contents in the JWT. + return nil + } if exp != nil { claim, err := jwt.DecodeGeneric(string(jwtBytes)) if err == nil && claim.Expires > 0 && claim.Expires < time.Now().Unix() { @@ -406,6 +410,9 @@ func (store *DirJWTStore) load(publicKey string) (string, error) { // write that keeps hash of all jwt in sync // Assumes the lock is held. Does return true or an error never both. func (store *DirJWTStore) write(path string, publicKey string, theJWT string) (bool, error) { + if len(theJWT) == 0 { + return false, fmt.Errorf("invalid JWT") + } var newHash *[sha256.Size]byte if store.expiration != nil { h := sha256.Sum256([]byte(theJWT)) diff --git a/server/jwt_test.go b/server/jwt_test.go index 1085892434..763c67927a 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -3692,7 +3692,7 @@ func TestJWTAccountNATSResolverCrossClusterFetch(t *testing.T) { listen: 127.0.0.1:-1 no_advertise: true } - `, ojwt, syspub, dirAA))) + `, ojwt, syspub, dirAA))) sAA, _ := RunServerWithConfig(confAA) defer sAA.Shutdown() // Create Server B (using no_advertise to prevent fail over) @@ -3718,7 +3718,7 @@ func TestJWTAccountNATSResolverCrossClusterFetch(t *testing.T) { nats-route://127.0.0.1:%d ] } - `, ojwt, syspub, dirAB, sAA.opts.Cluster.Port))) + `, ojwt, syspub, dirAB, sAA.opts.Cluster.Port))) sAB, _ := RunServerWithConfig(confAB) defer sAB.Shutdown() // Create Server C (using no_advertise to prevent fail over) @@ -3744,7 +3744,7 @@ func TestJWTAccountNATSResolverCrossClusterFetch(t *testing.T) { listen: 127.0.0.1:-1 no_advertise: true } - `, ojwt, syspub, dirBA, sAA.opts.Gateway.Port))) + `, ojwt, syspub, dirBA, sAA.opts.Gateway.Port))) sBA, _ := RunServerWithConfig(confBA) defer sBA.Shutdown() // Create Sever BA (using no_advertise to prevent fail over) @@ -3773,7 +3773,7 @@ func TestJWTAccountNATSResolverCrossClusterFetch(t *testing.T) { {name: "clust-A", url: "nats://127.0.0.1:%d"}, ] } - `, ojwt, syspub, dirBB, sBA.opts.Cluster.Port, sAA.opts.Cluster.Port))) + `, ojwt, syspub, dirBB, sBA.opts.Cluster.Port, sAA.opts.Cluster.Port))) sBB, _ := RunServerWithConfig(confBB) defer sBB.Shutdown() // Assert topology @@ -6592,3 +6592,190 @@ func TestServerOperatorModeNoAuthRequired(t *testing.T) { require_True(t, nc.AuthRequired()) } + +func TestJWTAccountNATSResolverWrongCreds(t *testing.T) { + require_NoLocalOrRemoteConnections := func(account string, srvs ...*Server) { + t.Helper() + for _, srv := range srvs { + if acc, ok := srv.accounts.Load(account); ok { + checkAccClientsCount(t, acc.(*Account), 0) + } + } + } + connect := func(url string, credsfile string, acc string, srvs ...*Server) { + t.Helper() + nc := natsConnect(t, url, nats.UserCredentials(credsfile), nats.Timeout(5*time.Second)) + nc.Close() + require_NoLocalOrRemoteConnections(acc, srvs...) + } + createAccountAndUser := func(limit bool, done chan struct{}, pubKey, jwt1, jwt2, creds *string) { + t.Helper() + kp, _ := nkeys.CreateAccount() + *pubKey, _ = kp.PublicKey() + claim := jwt.NewAccountClaims(*pubKey) + var err error + *jwt1, err = claim.Encode(oKp) + require_NoError(t, err) + *jwt2, err = claim.Encode(oKp) + require_NoError(t, err) + ukp, _ := nkeys.CreateUser() + seed, _ := ukp.Seed() + upub, _ := ukp.PublicKey() + uclaim := newJWTTestUserClaims() + uclaim.Subject = upub + ujwt, err := uclaim.Encode(kp) + require_NoError(t, err) + *creds = genCredsFile(t, ujwt, seed) + done <- struct{}{} + } + // Create Accounts and corresponding user creds. + doneChan := make(chan struct{}, 4) + defer close(doneChan) + var syspub, sysjwt, dummy1, sysCreds string + createAccountAndUser(false, doneChan, &syspub, &sysjwt, &dummy1, &sysCreds) + + var apub, ajwt1, ajwt2, aCreds string + createAccountAndUser(true, doneChan, &apub, &ajwt1, &ajwt2, &aCreds) + + var bpub, bjwt1, bjwt2, bCreds string + createAccountAndUser(true, doneChan, &bpub, &bjwt1, &bjwt2, &bCreds) + + // The one that is going to be missing. + var cpub, cjwt1, cjwt2, cCreds string + createAccountAndUser(true, doneChan, &cpub, &cjwt1, &cjwt2, &cCreds) + for i := 0; i < cap(doneChan); i++ { + <-doneChan + } + // Create one directory for each server + dirA := t.TempDir() + dirB := t.TempDir() + dirC := t.TempDir() + + // Store accounts on servers A and B, then let C sync on its own. + writeJWT(t, dirA, apub, ajwt1) + writeJWT(t, dirB, bpub, bjwt1) + + ///////////////////////////////////////// + // // + // Server A: has creds from client A // + // // + ///////////////////////////////////////// + confA := createConfFile(t, []byte(fmt.Sprintf(` + listen: 127.0.0.1:-1 + server_name: srv-A + operator: %s + system_account: %s + debug: true + resolver: { + type: full + dir: '%s' + allow_delete: true + timeout: "1.5s" + interval: "200ms" + } + resolver_preload: { + %s: %s + } + cluster { + name: clust + listen: 127.0.0.1:-1 + no_advertise: true + } + `, ojwt, syspub, dirA, apub, ajwt1))) + sA, _ := RunServerWithConfig(confA) + defer sA.Shutdown() + require_JWTPresent(t, dirA, apub) + + ///////////////////////////////////////// + // // + // Server B: has creds from client B // + // // + ///////////////////////////////////////// + confB := createConfFile(t, []byte(fmt.Sprintf(` + listen: 127.0.0.1:-1 + server_name: srv-B + operator: %s + system_account: %s + resolver: { + type: full + dir: '%s' + allow_delete: true + timeout: "1.5s" + interval: "200ms" + } + cluster { + name: clust + listen: 127.0.0.1:-1 + no_advertise: true + routes [ + nats-route://127.0.0.1:%d + ] + } + `, ojwt, syspub, dirB, sA.opts.Cluster.Port))) + sB, _ := RunServerWithConfig(confB) + defer sB.Shutdown() + + ///////////////////////////////////////// + // // + // Server C: has no creds // + // // + ///////////////////////////////////////// + fmtC := ` + listen: 127.0.0.1:-1 + server_name: srv-C + operator: %s + system_account: %s + resolver: { + type: full + dir: '%s' + allow_delete: true + timeout: "1.5s" + interval: "200ms" + } + cluster { + name: clust + listen: 127.0.0.1:-1 + no_advertise: true + routes [ + nats-route://127.0.0.1:%d + ] + } + ` + confClongTTL := createConfFile(t, []byte(fmt.Sprintf(fmtC, ojwt, syspub, dirC, sA.opts.Cluster.Port))) + sC, _ := RunServerWithConfig(confClongTTL) // use long ttl to assure it is not kicking + defer sC.Shutdown() + + // startup cluster + checkClusterFormed(t, sA, sB, sC) + time.Sleep(1 * time.Second) // wait for the protocol to converge + // // Check all accounts + require_JWTPresent(t, dirA, apub) // was already present on startup + require_JWTPresent(t, dirB, apub) // was copied from server A + require_JWTPresent(t, dirA, bpub) // was copied from server B + require_JWTPresent(t, dirB, bpub) // was already present on startup + + // There should be no state about the missing account. + require_JWTAbsent(t, dirA, cpub) + require_JWTAbsent(t, dirB, cpub) + require_JWTAbsent(t, dirC, cpub) + + // system account client can connect to every server + connect(sA.ClientURL(), sysCreds, "") + connect(sB.ClientURL(), sysCreds, "") + connect(sC.ClientURL(), sysCreds, "") + + // A and B clients can connect to any server. + connect(sA.ClientURL(), aCreds, "") + connect(sB.ClientURL(), aCreds, "") + connect(sC.ClientURL(), aCreds, "") + connect(sA.ClientURL(), bCreds, "") + connect(sB.ClientURL(), bCreds, "") + connect(sC.ClientURL(), bCreds, "") + + // Check that trying to connect with bad credentials should not hang until the fetch timeout + // and instead return a faster response when an account is not found. + _, err := nats.Connect(sC.ClientURL(), nats.UserCredentials(cCreds), nats.Timeout(1*time.Second)) + if err != nil && !errors.Is(err, nats.ErrAuthorization) { + t.Fatalf("Expected auth error: %v", err) + } +} From 811ef75287f51bce5a7c9eb544fb8cdcc1a6fd16 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Fri, 12 May 2023 16:25:33 -0700 Subject: [PATCH 2/6] Update not found delay to be 1/10th of fetch timeout Also update logs based on comments Signed-off-by: Waldemar Quevedo --- server/accounts.go | 36 ++++++++++++++++++------------------ server/jwt_test.go | 2 +- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index eecce13db2..fcc434cf67 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -3995,19 +3995,19 @@ func (dr *DirAccResolver) Start(s *Server) error { dr.DirJWTStore.changed = func(pubKey string) { if v, ok := s.accounts.Load(pubKey); ok { if theJwt, err := dr.LoadAcc(pubKey); err != nil { - s.Errorf("RESOLVER - Update got error on load: %v", err) + s.Errorf("DirResolver - Update got error on load: %v", err) } else { acc := v.(*Account) if err = s.updateAccountWithClaimJWT(acc, theJwt); err != nil { - s.Errorf("RESOLVER - Update for account %q resulted in error %v", pubKey, err) + s.Errorf("DirResolver - Update for account %q resulted in error %v", pubKey, err) } else { if _, jsa, err := acc.checkForJetStream(); err != nil { if !IsNatsErr(err, JSNotEnabledForAccountErr) { - s.Warnf("RESOLVER - Error checking for JetStream support for account %q: %v", pubKey, err) + s.Warnf("DirResolver - Error checking for JetStream support for account %q: %v", pubKey, err) } } else if jsa == nil { if err = s.configJetStream(acc); err != nil { - s.Errorf("RESOLVER - Error configuring JetStream for account %q: %v", pubKey, err) + s.Errorf("DirResolver - Error configuring JetStream for account %q: %v", pubKey, err) } } } @@ -4028,7 +4028,7 @@ func (dr *DirAccResolver) Start(s *Server) error { } else if len(tk) == accUpdateTokensOld { pubKey = tk[accUpdateAccIdxOld] } else { - s.Debugf("RESOLVER - JWT update skipped due to bad subject %q", subj) + s.Debugf("DirResolver - jwt update skipped due to bad subject %q", subj) return } if claim, err := jwt.DecodeAccountClaims(string(msg)); err != nil { @@ -4081,15 +4081,15 @@ func (dr *DirAccResolver) Start(s *Server) error { accName := tk[accReqAccIndex] if theJWT, err := dr.DirJWTStore.LoadAcc(accName); err != nil { if errors.Is(err, fs.ErrNotExist) { - s.Debugf("RESOLVER - Could not find account %q", accName) + s.Debugf("DirResolver - Could not find account %q", accName) go func() { // Reply with empty response to signal absence of JWT to others, // but not too fast to mitigate beating actual responses with content. - time.Sleep(fetchTimeout / 2) + time.Sleep(fetchTimeout / 10) s.sendInternalMsgLocked(reply, _EMPTY_, nil, nil) }() } else { - s.Errorf("RESOLVER - Error looking up account %q: %v", accName, err) + s.Errorf("DirResolver - Error looking up account %q: %v", accName, err) } } else { s.sendInternalMsgLocked(reply, _EMPTY_, nil, []byte(theJWT)) @@ -4106,14 +4106,14 @@ func (dr *DirAccResolver) Start(s *Server) error { ourHash := dr.DirJWTStore.Hash() if bytes.Equal(theirHash, ourHash[:]) { s.sendInternalMsgLocked(reply, _EMPTY_, nil, []byte{}) - s.Debugf("RESOLVER - Pack request matches hash %x", ourHash[:]) + s.Debugf("DirResolver - Pack request matches hash %x", ourHash[:]) } else if err := dr.DirJWTStore.PackWalk(1, func(partialPackMsg string) { s.sendInternalMsgLocked(reply, _EMPTY_, nil, []byte(partialPackMsg)) }); err != nil { // let them timeout - s.Errorf("RESOLVER - Pack request error: %v", err) + s.Errorf("DirResolver - Pack request error: %v", err) } else { - s.Debugf("RESOLVER - Pack request hash %x - finished responding with hash %x", theirHash, ourHash) + s.Debugf("DirResolver - Pack request hash %x - finished responding with hash %x", theirHash, ourHash) s.sendInternalMsgLocked(reply, _EMPTY_, nil, []byte{}) } }); err != nil { @@ -4134,12 +4134,12 @@ func (dr *DirAccResolver) Start(s *Server) error { if _, err := s.sysSubscribe(packRespIb, func(_ *subscription, _ *client, _ *Account, _, _ string, msg []byte) { hash := dr.DirJWTStore.Hash() if len(msg) == 0 { // end of response stream - s.Debugf("RESOLVER - Merging finished and resulting in: %x", dr.DirJWTStore.Hash()) + s.Debugf("DirResolver - Merging finished and resulting in: %x", dr.DirJWTStore.Hash()) return } else if err := dr.DirJWTStore.Merge(string(msg)); err != nil { - s.Errorf("RESOLVER - Merging resulted in error: %v", err) + s.Errorf("DirResolver - Merging resulted in error: %v", err) } else { - s.Debugf("RESOLVER - Merging succeeded and changed %x to %x", hash, dr.DirJWTStore.Hash()) + s.Debugf("DirResolver - Merging succeeded and changed %x to %x", hash, dr.DirJWTStore.Hash()) } }); err != nil { return fmt.Errorf("error setting up pack response handling: %v", err) @@ -4157,7 +4157,7 @@ func (dr *DirAccResolver) Start(s *Server) error { case <-ticker.C: } ourHash := dr.DirJWTStore.Hash() - s.Debugf("RESOLVER - Checking store state: %x", ourHash) + s.Debugf("DirResolver - Checking store state: %x", ourHash) s.sendInternalMsgLocked(accPackReqSubj, packRespIb, nil, ourHash[:]) } }) @@ -4312,9 +4312,9 @@ func (dr *CacheDirAccResolver) Start(s *Server) error { dr.DirJWTStore.changed = func(pubKey string) { if v, ok := s.accounts.Load(pubKey); !ok { } else if theJwt, err := dr.LoadAcc(pubKey); err != nil { - s.Errorf("RESOLVER - Update got error on load: %v", err) + s.Errorf("DirResolver - Update got error on load: %v", err) } else if err := s.updateAccountWithClaimJWT(v.(*Account), theJwt); err != nil { - s.Errorf("RESOLVER - Update resulted in error %v", err) + s.Errorf("DirResolver - Update resulted in error %v", err) } } dr.DirJWTStore.deleted = func(pubKey string) { @@ -4330,7 +4330,7 @@ func (dr *CacheDirAccResolver) Start(s *Server) error { } else if len(tk) == accUpdateTokensOld { pubKey = tk[accUpdateAccIdxOld] } else { - s.Debugf("RESOLVER - JWT update cache skipped due to bad subject %q", subj) + s.Debugf("DirResolver - jwt update cache skipped due to bad subject %q", subj) return } if claim, err := jwt.DecodeAccountClaims(string(msg)); err != nil { diff --git a/server/jwt_test.go b/server/jwt_test.go index 763c67927a..a97111c624 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -3747,7 +3747,7 @@ func TestJWTAccountNATSResolverCrossClusterFetch(t *testing.T) { `, ojwt, syspub, dirBA, sAA.opts.Gateway.Port))) sBA, _ := RunServerWithConfig(confBA) defer sBA.Shutdown() - // Create Sever BA (using no_advertise to prevent fail over) + // Create Server BA (using no_advertise to prevent fail over) confBB := createConfFile(t, []byte(fmt.Sprintf(` listen: 127.0.0.1:-1 server_name: srv-B-B From 94b8d0ba41d7bef33e1d622a3a62385a341feb0b Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Fri, 12 May 2023 21:24:37 -0700 Subject: [PATCH 3/6] resolver: wait for all available servers to respond on fetch Signed-off-by: Waldemar Quevedo --- server/accounts.go | 24 ++++++++++++++++++------ server/jwt_test.go | 2 +- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index fcc434cf67..09fd12db53 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -4082,12 +4082,10 @@ func (dr *DirAccResolver) Start(s *Server) error { if theJWT, err := dr.DirJWTStore.LoadAcc(accName); err != nil { if errors.Is(err, fs.ErrNotExist) { s.Debugf("DirResolver - Could not find account %q", accName) - go func() { - // Reply with empty response to signal absence of JWT to others, - // but not too fast to mitigate beating actual responses with content. - time.Sleep(fetchTimeout / 10) - s.sendInternalMsgLocked(reply, _EMPTY_, nil, nil) - }() + // Reply with empty response to signal absence of JWT to others, + // but not too fast to mitigate beating actual responses with content. + time.Sleep(fetchTimeout / 10) + s.sendInternalMsgLocked(reply, _EMPTY_, nil, nil) } else { s.Errorf("DirResolver - Error looking up account %q: %v", accName, err) } @@ -4238,6 +4236,12 @@ func (s *Server) fetch(res AccountResolver, name string, timeout time.Duration) respC := make(chan []byte, 1) accountLookupRequest := fmt.Sprintf(accLookupReqSubj, name) s.mu.Lock() + // Resolver will wait for available routes + gateways to reply + // before serving an error in case there weren't any found. + expectedServers := len(s.routes) + if s.gateway != nil { + expectedServers += len(s.gateway.remotes) + } if s.sys == nil || s.sys.replies == nil { s.mu.Unlock() return _EMPTY_, fmt.Errorf("eventing shut down") @@ -4251,6 +4255,14 @@ func (s *Server) fetch(res AccountResolver, name string, timeout time.Duration) copy(clone, msg) s.mu.Lock() + expectedServers-- + if len(msg) == 0 { + // Skip empty responses until getting all the available servers. + if expectedServers > 0 { + s.mu.Unlock() + return + } + } // Use the first valid response if there is still interest or // one of the empty responses to signal that it was not found. if _, ok := replies[replySubj]; ok { diff --git a/server/jwt_test.go b/server/jwt_test.go index a97111c624..58884b97c5 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -6774,7 +6774,7 @@ func TestJWTAccountNATSResolverWrongCreds(t *testing.T) { // Check that trying to connect with bad credentials should not hang until the fetch timeout // and instead return a faster response when an account is not found. - _, err := nats.Connect(sC.ClientURL(), nats.UserCredentials(cCreds), nats.Timeout(1*time.Second)) + _, err := nats.Connect(sC.ClientURL(), nats.UserCredentials(cCreds), nats.Timeout(500*time.Second)) if err != nil && !errors.Is(err, nats.ErrAuthorization) { t.Fatalf("Expected auth error: %v", err) } From 13db582f9dd0a5861b74692aaf408b8409752fc2 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Sat, 13 May 2023 00:00:56 -0700 Subject: [PATCH 4/6] review updates Signed-off-by: Waldemar Quevedo --- server/accounts.go | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index 09fd12db53..a1b5a37dea 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -3991,7 +3991,6 @@ func (dr *DirAccResolver) Start(s *Server) error { defer dr.Unlock() dr.Server = s dr.operator = opKeys - fetchTimeout := dr.fetchTimeout dr.DirJWTStore.changed = func(pubKey string) { if v, ok := s.accounts.Load(pubKey); ok { if theJwt, err := dr.LoadAcc(pubKey); err != nil { @@ -4082,9 +4081,7 @@ func (dr *DirAccResolver) Start(s *Server) error { if theJWT, err := dr.DirJWTStore.LoadAcc(accName); err != nil { if errors.Is(err, fs.ErrNotExist) { s.Debugf("DirResolver - Could not find account %q", accName) - // Reply with empty response to signal absence of JWT to others, - // but not too fast to mitigate beating actual responses with content. - time.Sleep(fetchTimeout / 10) + // Reply with empty response to signal absence of JWT to others. s.sendInternalMsgLocked(reply, _EMPTY_, nil, nil) } else { s.Errorf("DirResolver - Error looking up account %q: %v", accName, err) @@ -4250,18 +4247,16 @@ func (s *Server) fetch(res AccountResolver, name string, timeout time.Duration) replies := s.sys.replies // Store our handler. - replies[replySubj] = func(sub *subscription, _ *client, _ *Account, subject, reply string, msg []byte) { + replies[replySubj] = func(sub *subscription, _ *client, _ *Account, subject, _ string, msg []byte) { clone := make([]byte, len(msg)) copy(clone, msg) s.mu.Lock() + defer s.mu.Unlock() expectedServers-- - if len(msg) == 0 { - // Skip empty responses until getting all the available servers. - if expectedServers > 0 { - s.mu.Unlock() - return - } + // Skip empty responses until getting all the available servers. + if len(msg) == 0 && expectedServers > 0 { + return } // Use the first valid response if there is still interest or // one of the empty responses to signal that it was not found. @@ -4271,7 +4266,6 @@ func (s *Server) fetch(res AccountResolver, name string, timeout time.Duration) default: } } - s.mu.Unlock() } s.sendInternalMsg(accountLookupRequest, replySubj, nil, []byte{}) quit := s.quitCh From 9198044e2e1507e4ec977ccb0e70d3fc04c95e05 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Sun, 14 May 2023 09:44:56 -0700 Subject: [PATCH 5/6] use detected active servers when waiting for acc lookup Signed-off-by: Waldemar Quevedo --- server/accounts.go | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index a1b5a37dea..65325c280c 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -4233,29 +4233,35 @@ func (s *Server) fetch(res AccountResolver, name string, timeout time.Duration) respC := make(chan []byte, 1) accountLookupRequest := fmt.Sprintf(accLookupReqSubj, name) s.mu.Lock() - // Resolver will wait for available routes + gateways to reply + // Resolver will wait for detected active servers to reply // before serving an error in case there weren't any found. - expectedServers := len(s.routes) - if s.gateway != nil { - expectedServers += len(s.gateway.remotes) - } + var expectedServers int if s.sys == nil || s.sys.replies == nil { s.mu.Unlock() return _EMPTY_, fmt.Errorf("eventing shut down") } + if s.sys != nil && s.sys.servers != nil { + expectedServers = len(s.sys.servers) + } replySubj := s.newRespInbox() replies := s.sys.replies // Store our handler. replies[replySubj] = func(sub *subscription, _ *client, _ *Account, subject, _ string, msg []byte) { - clone := make([]byte, len(msg)) - copy(clone, msg) + var clone []byte + var isEmpty bool + if len(msg) > 0 { + clone = make([]byte, len(msg)) + copy(clone, msg) + } else { + isEmpty = true + } s.mu.Lock() defer s.mu.Unlock() expectedServers-- // Skip empty responses until getting all the available servers. - if len(msg) == 0 && expectedServers > 0 { + if isEmpty && expectedServers > 0 { return } // Use the first valid response if there is still interest or From 4af3dedd40c88db1472658b432e437b7b4427c54 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Sun, 14 May 2023 10:05:49 -0700 Subject: [PATCH 6/6] review updates Signed-off-by: Waldemar Quevedo --- server/accounts.go | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index 65325c280c..5d37d5f89c 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -4233,30 +4233,24 @@ func (s *Server) fetch(res AccountResolver, name string, timeout time.Duration) respC := make(chan []byte, 1) accountLookupRequest := fmt.Sprintf(accLookupReqSubj, name) s.mu.Lock() - // Resolver will wait for detected active servers to reply - // before serving an error in case there weren't any found. - var expectedServers int if s.sys == nil || s.sys.replies == nil { s.mu.Unlock() return _EMPTY_, fmt.Errorf("eventing shut down") } - if s.sys != nil && s.sys.servers != nil { - expectedServers = len(s.sys.servers) - } + // Resolver will wait for detected active servers to reply + // before serving an error in case there weren't any found. + expectedServers := len(s.sys.servers) replySubj := s.newRespInbox() replies := s.sys.replies // Store our handler. replies[replySubj] = func(sub *subscription, _ *client, _ *Account, subject, _ string, msg []byte) { var clone []byte - var isEmpty bool - if len(msg) > 0 { + isEmpty := len(msg) == 0 + if !isEmpty { clone = make([]byte, len(msg)) copy(clone, msg) - } else { - isEmpty = true } - s.mu.Lock() defer s.mu.Unlock() expectedServers--