diff --git a/server/accounts.go b/server/accounts.go index 685b816c6c1..4752d0123f7 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -78,12 +78,13 @@ type streamImport struct { // Import service mapping struct type serviceImport struct { - acc *Account - from string - to string - ae bool - ts int64 - claim *jwt.Import + acc *Account + from string + to string + ae bool + ts int64 + claim *jwt.Import + invalid bool } // exportAuth holds configured approvals or boolean indicating an @@ -296,7 +297,7 @@ func (a *Account) addImplicitServiceImport(destination *Account, from, to string if a.imports.services == nil { a.imports.services = make(map[string]*serviceImport) } - si := &serviceImport{destination, from, to, autoexpire, 0, claim} + si := &serviceImport{destination, from, to, autoexpire, 0, claim, false} a.imports.services[from] = si if autoexpire { a.nae++ @@ -481,15 +482,13 @@ func fetchActivation(url string) string { return string(body) } -// Fires for expired activation tokens. We could track this with timers etc. -// Instead we just re-analyze where we are and if we need to act. -func (a *Account) activationExpired(subject string) { +// These are import stream specific versions for when an activation expires. +func (a *Account) streamActivationExpired(subject string) { a.mu.RLock() if a.expired || a.imports.streams == nil { a.mu.RUnlock() return } - // FIXME(dlc) - check services too? si := a.imports.streams[subject] if si == nil || si.invalid { a.mu.RUnlock() @@ -515,6 +514,41 @@ func (a *Account) activationExpired(subject string) { } } +// These are import service specific versions for when an activation expires. +func (a *Account) serviceActivationExpired(subject string) { + a.mu.RLock() + if a.expired || a.imports.services == nil { + a.mu.RUnlock() + return + } + si := a.imports.services[subject] + if si == nil || si.invalid { + a.mu.RUnlock() + return + } + a.mu.RUnlock() + + if si.acc.checkActivation(a, si.claim, false) { + // The token has been updated most likely and we are good to go. + return + } + + a.mu.Lock() + si.invalid = true + a.mu.Unlock() +} + +// Fires for expired activation tokens. We could track this with timers etc. +// Instead we just re-analyze where we are and if we need to act. +func (a *Account) activationExpired(subject string, kind jwt.ExportType) { + switch kind { + case jwt.Stream: + a.streamActivationExpired(subject) + case jwt.Service: + a.serviceActivationExpired(subject) + } +} + // checkActivation will check the activation token for validity. func (a *Account) checkActivation(acc *Account, claim *jwt.Import, expTimer bool) bool { if claim == nil || claim.Token == "" { @@ -549,7 +583,7 @@ func (a *Account) checkActivation(acc *Account, claim *jwt.Import, expTimer bool if expTimer { expiresAt := time.Duration(act.Expires - tn) time.AfterFunc(expiresAt*time.Second, func() { - acc.activationExpired(string(act.ImportSubject)) + acc.activationExpired(string(act.ImportSubject), claim.Type) }) } } @@ -594,12 +628,31 @@ func (a *Account) checkStreamExportsEqual(b *Account) bool { return true } -// Check if another account is authorized to route requests to this service. +func (a *Account) checkServiceExportsEqual(b *Account) bool { + if len(a.exports.services) != len(b.exports.services) { + return false + } + for subj, aea := range a.exports.services { + bea, ok := b.exports.services[subj] + if !ok { + return false + } + if !reflect.DeepEqual(aea, bea) { + return false + } + } + return true +} + func (a *Account) checkServiceImportAuthorized(account *Account, subject string, imClaim *jwt.Import) bool { - // Find the subject in the services list. a.mu.RLock() defer a.mu.RUnlock() + return a.checkServiceImportAuthorizedNoLock(account, subject, imClaim) +} +// Check if another account is authorized to route requests to this service. +func (a *Account) checkServiceImportAuthorizedNoLock(account *Account, subject string, imClaim *jwt.Import) bool { + // Find the subject in the services list. if a.exports.services == nil || !IsValidLiteralSubject(subject) { return false } @@ -610,7 +663,7 @@ func (a *Account) checkServiceImportAuthorized(account *Account, subject string, } if ae != nil && ae.tokenReq { - return a.checkActivation(account, imClaim, false) + return a.checkActivation(account, imClaim, true) } // Check to see if we are public or if we need to search for the account. @@ -755,7 +808,7 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) { c.processSubsOnConfigReload(awcsti) } } - // Now check if exports have changed. + // Now check if stream exports have changed. if !a.checkStreamExportsEqual(old) { clients := make([]*client, 0, 16) // We need to check all accounts that have an import claim from this account. @@ -770,7 +823,6 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) { for _, c := range acc.clients { clients = append(clients, c) } - break } } acc.mu.Unlock() @@ -780,6 +832,19 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) { c.processSubsOnConfigReload(awcsti) } } + // Now check if service exports have changed. + if !a.checkServiceExportsEqual(old) { + for _, acc := range s.accounts { + acc.mu.Lock() + for _, im := range acc.imports.services { + if im != nil && im.acc.Name == a.Name { + // Check for if we are still authorized for an import. + im.invalid = !a.checkServiceImportAuthorizedNoLock(im.acc, im.from, im.claim) + } + } + acc.mu.Unlock() + } + } // Now do limits if they are present. a.msubs = int(ac.Limits.Subs) diff --git a/server/client.go b/server/client.go index f33f5fe167c..df7f6326481 100644 --- a/server/client.go +++ b/server/client.go @@ -2050,9 +2050,11 @@ func (c *client) checkForImportServices(acc *Account, msg []byte) { } acc.mu.RLock() rm := acc.imports.services[string(c.pa.subject)] + invalid := rm != nil && rm.invalid acc.mu.RUnlock() // Get the results from the other account for the mapped "to" subject. - if rm != nil && rm.acc != nil && rm.acc.sl != nil { + // If we have been marked invalid simply return here. + if rm != nil && !invalid && rm.acc != nil && rm.acc.sl != nil { var nrr []byte if rm.ae { acc.removeServiceImport(rm.from) diff --git a/server/jwt_test.go b/server/jwt_test.go index 5d55c118bce..c2160d7808e 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -14,6 +14,7 @@ package server import ( + "bufio" "encoding/base64" "encoding/json" "fmt" @@ -42,7 +43,7 @@ func opTrustBasicSetup() *Server { kp, _ := nkeys.FromSeed(oSeed) pub, _ := kp.PublicKey() opts := defaultServerOptions - opts.TrustedNkeys = []string{string(pub)} + opts.TrustedNkeys = []string{pub} s, _, _, _ := rawSetup(opts) return s } @@ -51,7 +52,7 @@ func buildMemAccResolver(s *Server) { kp, _ := nkeys.FromSeed(aSeed) pub, _ := kp.PublicKey() mr := &MemAccResolver{} - mr.Store(string(pub), aJWT) + mr.Store(pub, aJWT) s.mu.Lock() s.accResolver = mr s.mu.Unlock() @@ -63,6 +64,47 @@ func addAccountToMemResolver(s *Server, pub, jwt string) { s.mu.Unlock() } +func createClient(t *testing.T, s *Server, akp nkeys.KeyPair) (*client, *bufio.Reader, string) { + t.Helper() + nkp, _ := nkeys.CreateUser() + pub, _ := nkp.PublicKey() + nuc := jwt.NewUserClaims(pub) + ujwt, err := nuc.Encode(akp) + if err != nil { + t.Fatalf("Error generating user JWT: %v", err) + } + c, cr, l := newClientForServer(s) + + // Sign Nonce + var info nonceInfo + json.Unmarshal([]byte(l[5:]), &info) + sigraw, _ := nkp.Sign([]byte(info.Nonce)) + sig := base64.StdEncoding.EncodeToString(sigraw) + + cs := fmt.Sprintf("CONNECT {\"jwt\":%q,\"sig\":\"%s\"}\r\nPING\r\n", ujwt, sig) + return c, cr, cs +} + +// Helper function to generate an async parser and a quit chan. This allows us to +// parse multiple control statements in same go routine since by default these are +// not protected in the server. +func genAsyncParser(c *client) (func(string), chan bool) { + pab := make(chan []byte, 16) + pas := func(cs string) { pab <- []byte(cs) } + quit := make(chan bool) + go func() { + for { + select { + case cs := <-pab: + c.parse(cs) + case <-quit: + return + } + } + }() + return pas, quit +} + func TestJWTUser(t *testing.T) { s := opTrustBasicSetup() defer s.Shutdown() @@ -156,7 +198,7 @@ func TestJWTUserExpired(t *testing.T) { // Create a new user that we will make sure has expired. nkp, _ := nkeys.CreateUser() pub, _ := nkp.PublicKey() - nuc := jwt.NewUserClaims(string(pub)) + nuc := jwt.NewUserClaims(pub) nuc.IssuedAt = time.Now().Add(-10 * time.Second).Unix() nuc.Expires = time.Now().Add(-2 * time.Second).Unix() @@ -192,7 +234,7 @@ func TestJWTUserExpiresAfterConnect(t *testing.T) { // Create a new user that we will make sure has expired. nkp, _ := nkeys.CreateUser() pub, _ := nkp.PublicKey() - nuc := jwt.NewUserClaims(string(pub)) + nuc := jwt.NewUserClaims(pub) nuc.IssuedAt = time.Now().Unix() nuc.Expires = time.Now().Add(time.Second).Unix() @@ -217,6 +259,7 @@ func TestJWTUserExpiresAfterConnect(t *testing.T) { // PING needed to flush the +OK/-ERR to us. // This should fail too since no account resolver is defined. cs := fmt.Sprintf("CONNECT {\"jwt\":%q,\"sig\":\"%s\",\"verbose\":true,\"pedantic\":true}\r\nPING\r\n", jwt, sig) + go c.parse([]byte(cs)) l, _ = cr.ReadString('\n') if !strings.HasPrefix(l, "+OK") { @@ -228,7 +271,7 @@ func TestJWTUserExpiresAfterConnect(t *testing.T) { } // Now we should expire after 1 second or so. - time.Sleep(time.Second) + time.Sleep(1250 * time.Millisecond) l, _ = cr.ReadString('\n') if !strings.HasPrefix(l, "-ERR ") { @@ -242,7 +285,7 @@ func TestJWTUserExpiresAfterConnect(t *testing.T) { func TestJWTUserPermissionClaims(t *testing.T) { nkp, _ := nkeys.CreateUser() pub, _ := nkp.PublicKey() - nuc := jwt.NewUserClaims(string(pub)) + nuc := jwt.NewUserClaims(pub) nuc.Permissions.Pub.Allow.Add("foo") nuc.Permissions.Pub.Allow.Add("bar") @@ -309,7 +352,7 @@ func TestJWTAccountExpired(t *testing.T) { // Create an account that will be expired. akp, _ := nkeys.CreateAccount() apub, _ := akp.PublicKey() - nac := jwt.NewAccountClaims(string(apub)) + nac := jwt.NewAccountClaims(apub) nac.IssuedAt = time.Now().Add(-10 * time.Second).Unix() nac.Expires = time.Now().Add(-2 * time.Second).Unix() ajwt, err := nac.Encode(okp) @@ -317,30 +360,12 @@ func TestJWTAccountExpired(t *testing.T) { t.Fatalf("Error generating account JWT: %v", err) } - addAccountToMemResolver(s, string(apub), ajwt) + addAccountToMemResolver(s, apub, ajwt) // Create a new user - nkp, _ := nkeys.CreateUser() - pub, _ := nkp.PublicKey() - nuc := jwt.NewUserClaims(string(pub)) - jwt, err := nuc.Encode(akp) - if err != nil { - t.Fatalf("Error generating user JWT: %v", err) - } - - c, cr, l := newClientForServer(s) - - // Sign Nonce - var info nonceInfo - json.Unmarshal([]byte(l[5:]), &info) - sigraw, _ := nkp.Sign([]byte(info.Nonce)) - sig := base64.StdEncoding.EncodeToString(sigraw) - - // PING needed to flush the +OK/-ERR to us. - // This should fail since the account is expired. - cs := fmt.Sprintf("CONNECT {\"jwt\":%q,\"sig\":\"%s\",\"verbose\":true,\"pedantic\":true}\r\nPING\r\n", jwt, sig) + c, cr, cs := createClient(t, s, akp) go c.parse([]byte(cs)) - l, _ = cr.ReadString('\n') + l, _ := cr.ReadString('\n') if !strings.HasPrefix(l, "-ERR ") { t.Fatalf("Expected an error") } @@ -356,7 +381,7 @@ func TestJWTAccountExpiresAfterConnect(t *testing.T) { // Create an account that will expire. akp, _ := nkeys.CreateAccount() apub, _ := akp.PublicKey() - nac := jwt.NewAccountClaims(string(apub)) + nac := jwt.NewAccountClaims(apub) nac.IssuedAt = time.Now().Unix() nac.Expires = time.Now().Add(time.Second).Unix() ajwt, err := nac.Encode(okp) @@ -364,58 +389,34 @@ func TestJWTAccountExpiresAfterConnect(t *testing.T) { t.Fatalf("Error generating account JWT: %v", err) } - addAccountToMemResolver(s, string(apub), ajwt) + addAccountToMemResolver(s, apub, ajwt) // Create a new user - nkp, _ := nkeys.CreateUser() - pub, _ := nkp.PublicKey() - nuc := jwt.NewUserClaims(string(pub)) - jwt, err := nuc.Encode(akp) - if err != nil { - t.Fatalf("Error generating user JWT: %v", err) - } - - c, cr, l := newClientForServer(s) - - // Sign Nonce - var info nonceInfo - json.Unmarshal([]byte(l[5:]), &info) - sigraw, _ := nkp.Sign([]byte(info.Nonce)) - sig := base64.StdEncoding.EncodeToString(sigraw) + c, cr, cs := createClient(t, s, akp) - // PING needed to flush the +OK/-ERR to us. - cs := fmt.Sprintf("CONNECT {\"jwt\":%q,\"sig\":\"%s\",\"verbose\":true,\"pedantic\":true}\r\nPING\r\n", jwt, sig) - go c.parse([]byte(cs)) - l, _ = cr.ReadString('\n') - if !strings.HasPrefix(l, "+OK") { - t.Fatalf("Expected an OK, got: %v", l) - } - l, _ = cr.ReadString('\n') - if !strings.HasPrefix(l, "PONG") { - t.Fatalf("Expected a PONG") + expectPong := func(cr *bufio.Reader) { + t.Helper() + l, _ := cr.ReadString('\n') + if !strings.HasPrefix(l, "PONG") { + t.Fatalf("Expected a PONG, got %q", l) + } } + go c.parse([]byte(cs)) + expectPong(cr) // Now we should expire after 1 second or so. - time.Sleep(time.Second) + time.Sleep(1250 * time.Millisecond) - l, _ = cr.ReadString('\n') + l, _ := cr.ReadString('\n') if !strings.HasPrefix(l, "-ERR ") { - t.Fatalf("Expected an error") + t.Fatalf("Expected an error, got %q", l) } if !strings.Contains(l, "Expired") { t.Fatalf("Expected 'Expired' to be in the error") } // Now make sure that accounts that have expired return an error. - c, cr, l = newClientForServer(s) - - // Sign Nonce - json.Unmarshal([]byte(l[5:]), &info) - sigraw, _ = nkp.Sign([]byte(info.Nonce)) - sig = base64.StdEncoding.EncodeToString(sigraw) - - // PING needed to flush the +OK/-ERR to us. - cs = fmt.Sprintf("CONNECT {\"jwt\":%q,\"sig\":\"%s\",\"verbose\":true,\"pedantic\":true}\r\nPING\r\n", jwt, sig) + c, cr, cs = createClient(t, s, akp) go c.parse([]byte(cs)) l, _ = cr.ReadString('\n') if !strings.HasPrefix(l, "-ERR ") { @@ -433,7 +434,7 @@ func TestJWTAccountRenew(t *testing.T) { // Create an account that has expired. akp, _ := nkeys.CreateAccount() apub, _ := akp.PublicKey() - nac := jwt.NewAccountClaims(string(apub)) + nac := jwt.NewAccountClaims(apub) nac.IssuedAt = time.Now().Add(-10 * time.Second).Unix() nac.Expires = time.Now().Add(-2 * time.Second).Unix() ajwt, err := nac.Encode(okp) @@ -441,30 +442,12 @@ func TestJWTAccountRenew(t *testing.T) { t.Fatalf("Error generating account JWT: %v", err) } - addAccountToMemResolver(s, string(apub), ajwt) + addAccountToMemResolver(s, apub, ajwt) // Create a new user - nkp, _ := nkeys.CreateUser() - pub, _ := nkp.PublicKey() - nuc := jwt.NewUserClaims(string(pub)) - ujwt, err := nuc.Encode(akp) - if err != nil { - t.Fatalf("Error generating user JWT: %v", err) - } - - c, cr, l := newClientForServer(s) - - // Sign Nonce - var info nonceInfo - json.Unmarshal([]byte(l[5:]), &info) - sigraw, _ := nkp.Sign([]byte(info.Nonce)) - sig := base64.StdEncoding.EncodeToString(sigraw) - - // PING needed to flush the +OK/-ERR to us. - // This should fail since the account is expired. - cs := fmt.Sprintf("CONNECT {\"jwt\":%q,\"sig\":\"%s\",\"verbose\":true,\"pedantic\":true}\r\nPING\r\n", ujwt, sig) + c, cr, cs := createClient(t, s, akp) go c.parse([]byte(cs)) - l, _ = cr.ReadString('\n') + l, _ := cr.ReadString('\n') if !strings.HasPrefix(l, "-ERR ") { t.Fatalf("Expected an error") } @@ -478,28 +461,19 @@ func TestJWTAccountRenew(t *testing.T) { } // Update the account - addAccountToMemResolver(s, string(apub), ajwt) - acc := s.LookupAccount(string(apub)) + addAccountToMemResolver(s, apub, ajwt) + acc := s.LookupAccount(apub) if acc == nil { t.Fatalf("Expected to retrive the account") } s.updateAccountClaims(acc, nac) // Now make sure we can connect. - c, cr, l = newClientForServer(s) - - // Sign Nonce - json.Unmarshal([]byte(l[5:]), &info) - sigraw, _ = nkp.Sign([]byte(info.Nonce)) - sig = base64.StdEncoding.EncodeToString(sigraw) - - // PING needed to flush the +OK/-ERR to us. - // This should fail too since no account resolver is defined. - cs = fmt.Sprintf("CONNECT {\"jwt\":%q,\"sig\":\"%s\",\"verbose\":true,\"pedantic\":true}\r\nPING\r\n", ujwt, sig) + c, cr, cs = createClient(t, s, akp) go c.parse([]byte(cs)) l, _ = cr.ReadString('\n') - if !strings.HasPrefix(l, "+OK") { - t.Fatalf("Expected an OK, got: %v", l) + if !strings.HasPrefix(l, "PONG") { + t.Fatalf("Expected a PONG, got: %q", l) } } @@ -513,7 +487,7 @@ func TestJWTAccountRenewFromResolver(t *testing.T) { // Create an account that has expired. akp, _ := nkeys.CreateAccount() apub, _ := akp.PublicKey() - nac := jwt.NewAccountClaims(string(apub)) + nac := jwt.NewAccountClaims(apub) nac.IssuedAt = time.Now().Add(-10 * time.Second).Unix() nac.Expires = time.Now().Add(time.Second).Unix() ajwt, err := nac.Encode(okp) @@ -521,38 +495,20 @@ func TestJWTAccountRenewFromResolver(t *testing.T) { t.Fatalf("Error generating account JWT: %v", err) } - addAccountToMemResolver(s, string(apub), ajwt) + addAccountToMemResolver(s, apub, ajwt) // Force it to be loaded by the server and start the expiration timer. - acc := s.LookupAccount(string(apub)) + acc := s.LookupAccount(apub) if acc == nil { t.Fatalf("Could not retrieve account for %q", apub) } // Create a new user - nkp, _ := nkeys.CreateUser() - pub, _ := nkp.PublicKey() - nuc := jwt.NewUserClaims(string(pub)) - ujwt, err := nuc.Encode(akp) - if err != nil { - t.Fatalf("Error generating user JWT: %v", err) - } - - c, cr, l := newClientForServer(s) - - // Sign Nonce - var info nonceInfo - json.Unmarshal([]byte(l[5:]), &info) - sigraw, _ := nkp.Sign([]byte(info.Nonce)) - sig := base64.StdEncoding.EncodeToString(sigraw) - + c, cr, cs := createClient(t, s, akp) // Wait for expiration. - time.Sleep(time.Second) + time.Sleep(1250 * time.Millisecond) - // PING needed to flush the +OK/-ERR to us. - // This should fail since the account is expired. - cs := fmt.Sprintf("CONNECT {\"jwt\":%q,\"sig\":\"%s\",\"verbose\":true,\"pedantic\":true}\r\nPING\r\n", ujwt, sig) go c.parse([]byte(cs)) - l, _ = cr.ReadString('\n') + l, _ := cr.ReadString('\n') if !strings.HasPrefix(l, "-ERR ") { t.Fatalf("Expected an error") } @@ -566,7 +522,7 @@ func TestJWTAccountRenewFromResolver(t *testing.T) { } // Update the account - addAccountToMemResolver(s, string(apub), ajwt) + addAccountToMemResolver(s, apub, ajwt) // Make sure the too quick update suppression does not bite us. acc.updated = time.Now().Add(-1 * time.Hour) @@ -574,20 +530,11 @@ func TestJWTAccountRenewFromResolver(t *testing.T) { // happen automatically. // Now make sure we can connect. - c, cr, l = newClientForServer(s) - - // Sign Nonce - json.Unmarshal([]byte(l[5:]), &info) - sigraw, _ = nkp.Sign([]byte(info.Nonce)) - sig = base64.StdEncoding.EncodeToString(sigraw) - - // PING needed to flush the +OK/-ERR to us. - // This should fail too since no account resolver is defined. - cs = fmt.Sprintf("CONNECT {\"jwt\":%q,\"sig\":\"%s\",\"verbose\":true,\"pedantic\":true}\r\nPING\r\n", ujwt, sig) + c, cr, cs = createClient(t, s, akp) go c.parse([]byte(cs)) l, _ = cr.ReadString('\n') - if !strings.HasPrefix(l, "+OK") { - t.Fatalf("Expected an OK, got: %v", l) + if !strings.HasPrefix(l, "PONG") { + t.Fatalf("Expected a PONG, got: %q", l) } } @@ -601,7 +548,7 @@ func TestJWTAccountBasicImportExport(t *testing.T) { // Create accounts and imports/exports. fooKP, _ := nkeys.CreateAccount() fooPub, _ := fooKP.PublicKey() - fooAC := jwt.NewAccountClaims(string(fooPub)) + fooAC := jwt.NewAccountClaims(fooPub) // Now create Exports. streamExport := &jwt.Export{Subject: "foo", Type: jwt.Stream} @@ -615,9 +562,9 @@ func TestJWTAccountBasicImportExport(t *testing.T) { t.Fatalf("Error generating account JWT: %v", err) } - addAccountToMemResolver(s, string(fooPub), fooJWT) + addAccountToMemResolver(s, fooPub, fooJWT) - acc := s.LookupAccount(string(fooPub)) + acc := s.LookupAccount(fooPub) if acc == nil { t.Fatalf("Expected to retrieve the account") } @@ -643,18 +590,18 @@ func TestJWTAccountBasicImportExport(t *testing.T) { barKP, _ := nkeys.CreateAccount() barPub, _ := barKP.PublicKey() - barAC := jwt.NewAccountClaims(string(barPub)) + barAC := jwt.NewAccountClaims(barPub) - streamImport := &jwt.Import{Account: string(fooPub), Subject: "foo", To: "import.foo", Type: jwt.Stream} - serviceImport := &jwt.Import{Account: string(fooPub), Subject: "req.echo", Type: jwt.Service} + streamImport := &jwt.Import{Account: fooPub, Subject: "foo", To: "import.foo", Type: jwt.Stream} + serviceImport := &jwt.Import{Account: fooPub, Subject: "req.echo", Type: jwt.Service} barAC.Imports.Add(streamImport, serviceImport) barJWT, err := barAC.Encode(okp) if err != nil { t.Fatalf("Error generating account JWT: %v", err) } - addAccountToMemResolver(s, string(barPub), barJWT) + addAccountToMemResolver(s, barPub, barJWT) - acc = s.LookupAccount(string(barPub)) + acc = s.LookupAccount(barPub) if acc == nil { t.Fatalf("Expected to retrieve the account") } @@ -667,14 +614,14 @@ func TestJWTAccountBasicImportExport(t *testing.T) { } // Now add in a bad activation token. - barAC = jwt.NewAccountClaims(string(barPub)) - serviceImport = &jwt.Import{Account: string(fooPub), Subject: "req.echo", Token: "not a token", Type: jwt.Service} + barAC = jwt.NewAccountClaims(barPub) + serviceImport = &jwt.Import{Account: fooPub, Subject: "req.echo", Token: "not a token", Type: jwt.Service} barAC.Imports.Add(serviceImport) barJWT, err = barAC.Encode(okp) if err != nil { t.Fatalf("Error generating account JWT: %v", err) } - addAccountToMemResolver(s, string(barPub), barJWT) + addAccountToMemResolver(s, barPub, barJWT) s.updateAccountClaims(acc, barAC) @@ -684,10 +631,10 @@ func TestJWTAccountBasicImportExport(t *testing.T) { } // Now make a correct one. - barAC = jwt.NewAccountClaims(string(barPub)) - serviceImport = &jwt.Import{Account: string(fooPub), Subject: "req.echo", Type: jwt.Service} + barAC = jwt.NewAccountClaims(barPub) + serviceImport = &jwt.Import{Account: fooPub, Subject: "req.echo", Type: jwt.Service} - activation := jwt.NewActivationClaims(string(barPub)) + activation := jwt.NewActivationClaims(barPub) activation.ImportSubject = "req.echo" activation.ImportType = jwt.Service actJWT, err := activation.Encode(fooKP) @@ -700,7 +647,7 @@ func TestJWTAccountBasicImportExport(t *testing.T) { if err != nil { t.Fatalf("Error generating account JWT: %v", err) } - addAccountToMemResolver(s, string(barPub), barJWT) + addAccountToMemResolver(s, barPub, barJWT) s.updateAccountClaims(acc, barAC) // Our service import should have succeeded. if les := len(acc.imports.services); les != 1 { @@ -708,10 +655,10 @@ func TestJWTAccountBasicImportExport(t *testing.T) { } // Now test url - barAC = jwt.NewAccountClaims(string(barPub)) - serviceImport = &jwt.Import{Account: string(fooPub), Subject: "req.add", Type: jwt.Service} + barAC = jwt.NewAccountClaims(barPub) + serviceImport = &jwt.Import{Account: fooPub, Subject: "req.add", Type: jwt.Service} - activation = jwt.NewActivationClaims(string(barPub)) + activation = jwt.NewActivationClaims(barPub) activation.ImportSubject = "req.add" activation.ImportType = jwt.Service actJWT, err = activation.Encode(fooKP) @@ -730,7 +677,7 @@ func TestJWTAccountBasicImportExport(t *testing.T) { if err != nil { t.Fatalf("Error generating account JWT: %v", err) } - addAccountToMemResolver(s, string(barPub), barJWT) + addAccountToMemResolver(s, barPub, barJWT) s.updateAccountClaims(acc, barAC) // Our service import should have succeeded. Should be the only one since we reset. if les := len(acc.imports.services); les != 1 { @@ -738,15 +685,15 @@ func TestJWTAccountBasicImportExport(t *testing.T) { } // Now streams - barAC = jwt.NewAccountClaims(string(barPub)) - streamImport = &jwt.Import{Account: string(fooPub), Subject: "private", To: "import.private", Type: jwt.Stream} + barAC = jwt.NewAccountClaims(barPub) + streamImport = &jwt.Import{Account: fooPub, Subject: "private", To: "import.private", Type: jwt.Stream} barAC.Imports.Add(streamImport) barJWT, err = barAC.Encode(okp) if err != nil { t.Fatalf("Error generating account JWT: %v", err) } - addAccountToMemResolver(s, string(barPub), barJWT) + addAccountToMemResolver(s, barPub, barJWT) s.updateAccountClaims(acc, barAC) // Our stream import should have not succeeded. if les := len(acc.imports.streams); les != 0 { @@ -754,10 +701,10 @@ func TestJWTAccountBasicImportExport(t *testing.T) { } // Now add in activation. - barAC = jwt.NewAccountClaims(string(barPub)) - streamImport = &jwt.Import{Account: string(fooPub), Subject: "private", To: "import.private", Type: jwt.Stream} + barAC = jwt.NewAccountClaims(barPub) + streamImport = &jwt.Import{Account: fooPub, Subject: "private", To: "import.private", Type: jwt.Stream} - activation = jwt.NewActivationClaims(string(barPub)) + activation = jwt.NewActivationClaims(barPub) activation.ImportSubject = "private" activation.ImportType = jwt.Stream actJWT, err = activation.Encode(fooKP) @@ -770,7 +717,7 @@ func TestJWTAccountBasicImportExport(t *testing.T) { if err != nil { t.Fatalf("Error generating account JWT: %v", err) } - addAccountToMemResolver(s, string(barPub), barJWT) + addAccountToMemResolver(s, barPub, barJWT) s.updateAccountClaims(acc, barAC) // Our stream import should have not succeeded. if les := len(acc.imports.streams); les != 1 { @@ -788,7 +735,7 @@ func TestJWTAccountImportExportUpdates(t *testing.T) { // Create accounts and imports/exports. fooKP, _ := nkeys.CreateAccount() fooPub, _ := fooKP.PublicKey() - fooAC := jwt.NewAccountClaims(string(fooPub)) + fooAC := jwt.NewAccountClaims(fooPub) streamExport := &jwt.Export{Subject: "foo", Type: jwt.Stream} fooAC.Exports.Add(streamExport) @@ -796,53 +743,38 @@ func TestJWTAccountImportExportUpdates(t *testing.T) { if err != nil { t.Fatalf("Error generating account JWT: %v", err) } - addAccountToMemResolver(s, string(fooPub), fooJWT) + addAccountToMemResolver(s, fooPub, fooJWT) barKP, _ := nkeys.CreateAccount() barPub, _ := barKP.PublicKey() - barAC := jwt.NewAccountClaims(string(barPub)) - streamImport := &jwt.Import{Account: string(fooPub), Subject: "foo", To: "import", Type: jwt.Stream} + barAC := jwt.NewAccountClaims(barPub) + streamImport := &jwt.Import{Account: fooPub, Subject: "foo", To: "import", Type: jwt.Stream} barAC.Imports.Add(streamImport) barJWT, err := barAC.Encode(okp) if err != nil { t.Fatalf("Error generating account JWT: %v", err) } - addAccountToMemResolver(s, string(barPub), barJWT) + addAccountToMemResolver(s, barPub, barJWT) - // Create a client. - nkp, _ := nkeys.CreateUser() - pub, _ := nkp.PublicKey() - nuc := jwt.NewUserClaims(string(pub)) - ujwt, err := nuc.Encode(barKP) - if err != nil { - t.Fatalf("Error generating user JWT: %v", err) + expectPong := func(cr *bufio.Reader) { + t.Helper() + l, _ := cr.ReadString('\n') + if !strings.HasPrefix(l, "PONG") { + t.Fatalf("Expected a PONG, got %q", l) + } } - c, cr, l := newClientForServer(s) + // Create a client. + c, cr, cs := createClient(t, s, barKP) + parseAsync, quit := genAsyncParser(c) + defer func() { quit <- true }() - // Sign Nonce - var info nonceInfo - json.Unmarshal([]byte(l[5:]), &info) - sigraw, _ := nkp.Sign([]byte(info.Nonce)) - sig := base64.StdEncoding.EncodeToString(sigraw) + parseAsync(cs) + expectPong(cr) - // PING needed to flush the +OK/-ERR to us. - // This should fail too since no account resolver is defined. - cs := fmt.Sprintf("CONNECT {\"jwt\":%q,\"sig\":\"%s\",\"verbose\":true,\"pedantic\":true}\r\nSUB import.foo 1\r\nPING\r\n", ujwt, sig) - go c.parse([]byte(cs)) - l, _ = cr.ReadString('\n') - if !strings.HasPrefix(l, "+OK") { - t.Fatalf("Expected an OK, got: %v", l) - } - l, _ = cr.ReadString('\n') - if !strings.HasPrefix(l, "+OK") { - t.Fatalf("Expected an OK, got: %v", l) - } - l, _ = cr.ReadString('\n') - if !strings.HasPrefix(l, "PONG\r\n") { - t.Fatalf("PONG response incorrect: %q\n", l) - } + parseAsync("SUB import.foo 1\r\nPING\r\n") + expectPong(cr) checkShadow := func(expected int) { t.Helper() @@ -858,13 +790,10 @@ func TestJWTAccountImportExportUpdates(t *testing.T) { checkShadow(1) // Now update bar and remove the import which should make the shadow go away. - barAC = jwt.NewAccountClaims(string(barPub)) - barJWT, err = barAC.Encode(okp) - if err != nil { - t.Fatalf("Error generating account JWT: %v", err) - } - addAccountToMemResolver(s, string(barPub), barJWT) - acc := s.LookupAccount(string(barPub)) + barAC = jwt.NewAccountClaims(barPub) + barJWT, _ = barAC.Encode(okp) + addAccountToMemResolver(s, barPub, barJWT) + acc := s.LookupAccount(barPub) s.updateAccountClaims(acc, barAC) checkShadow(0) @@ -872,48 +801,36 @@ func TestJWTAccountImportExportUpdates(t *testing.T) { // Now add it back and make sure the shadow comes back. streamImport = &jwt.Import{Account: string(fooPub), Subject: "foo", To: "import", Type: jwt.Stream} barAC.Imports.Add(streamImport) - barJWT, err = barAC.Encode(okp) - if err != nil { - t.Fatalf("Error generating account JWT: %v", err) - } - addAccountToMemResolver(s, string(barPub), barJWT) + barJWT, _ = barAC.Encode(okp) + addAccountToMemResolver(s, barPub, barJWT) s.updateAccountClaims(acc, barAC) checkShadow(1) // Now change export and make sure it goes away as well. So no exports anymore. - fooAC = jwt.NewAccountClaims(string(fooPub)) - fooJWT, err = fooAC.Encode(okp) - if err != nil { - t.Fatalf("Error generating account JWT: %v", err) - } - addAccountToMemResolver(s, string(fooPub), fooJWT) - s.updateAccountClaims(s.LookupAccount(string(fooPub)), fooAC) + fooAC = jwt.NewAccountClaims(fooPub) + fooJWT, _ = fooAC.Encode(okp) + addAccountToMemResolver(s, fooPub, fooJWT) + s.updateAccountClaims(s.LookupAccount(fooPub), fooAC) checkShadow(0) // Now add it in but with permission required. streamExport = &jwt.Export{Subject: "foo", Type: jwt.Stream, TokenReq: true} fooAC.Exports.Add(streamExport) - fooJWT, err = fooAC.Encode(okp) - if err != nil { - t.Fatalf("Error generating account JWT: %v", err) - } - addAccountToMemResolver(s, string(fooPub), fooJWT) - s.updateAccountClaims(s.LookupAccount(string(fooPub)), fooAC) + fooJWT, _ = fooAC.Encode(okp) + addAccountToMemResolver(s, fooPub, fooJWT) + s.updateAccountClaims(s.LookupAccount(fooPub), fooAC) checkShadow(0) // Now put it back as normal. - fooAC = jwt.NewAccountClaims(string(fooPub)) + fooAC = jwt.NewAccountClaims(fooPub) streamExport = &jwt.Export{Subject: "foo", Type: jwt.Stream} fooAC.Exports.Add(streamExport) - fooJWT, err = fooAC.Encode(okp) - if err != nil { - t.Fatalf("Error generating account JWT: %v", err) - } - addAccountToMemResolver(s, string(fooPub), fooJWT) - s.updateAccountClaims(s.LookupAccount(string(fooPub)), fooAC) + fooJWT, _ = fooAC.Encode(okp) + addAccountToMemResolver(s, fooPub, fooJWT) + s.updateAccountClaims(s.LookupAccount(fooPub), fooAC) checkShadow(1) } @@ -928,7 +845,7 @@ func TestJWTAccountImportActivationExpires(t *testing.T) { // Create accounts and imports/exports. fooKP, _ := nkeys.CreateAccount() fooPub, _ := fooKP.PublicKey() - fooAC := jwt.NewAccountClaims(string(fooPub)) + fooAC := jwt.NewAccountClaims(fooPub) streamExport := &jwt.Export{Subject: "foo", Type: jwt.Stream, TokenReq: true} fooAC.Exports.Add(streamExport) @@ -937,19 +854,19 @@ func TestJWTAccountImportActivationExpires(t *testing.T) { t.Fatalf("Error generating account JWT: %v", err) } - addAccountToMemResolver(s, string(fooPub), fooJWT) + addAccountToMemResolver(s, fooPub, fooJWT) - acc := s.LookupAccount(string(fooPub)) + acc := s.LookupAccount(fooPub) if acc == nil { t.Fatalf("Expected to retrieve the account") } barKP, _ := nkeys.CreateAccount() barPub, _ := barKP.PublicKey() - barAC := jwt.NewAccountClaims(string(barPub)) - streamImport := &jwt.Import{Account: string(fooPub), Subject: "foo", To: "import.", Type: jwt.Stream} + barAC := jwt.NewAccountClaims(barPub) + streamImport := &jwt.Import{Account: fooPub, Subject: "foo", To: "import.", Type: jwt.Stream} - activation := jwt.NewActivationClaims(string(barPub)) + activation := jwt.NewActivationClaims(barPub) activation.ImportSubject = "foo" activation.ImportType = jwt.Stream activation.IssuedAt = time.Now().Add(-10 * time.Second).Unix() @@ -964,40 +881,26 @@ func TestJWTAccountImportActivationExpires(t *testing.T) { if err != nil { t.Fatalf("Error generating account JWT: %v", err) } - addAccountToMemResolver(s, string(barPub), barJWT) + addAccountToMemResolver(s, barPub, barJWT) - // Create a client. - nkp, _ := nkeys.CreateUser() - pub, _ := nkp.PublicKey() - nuc := jwt.NewUserClaims(string(pub)) - ujwt, err := nuc.Encode(barKP) - if err != nil { - t.Fatalf("Error generating user JWT: %v", err) + expectPong := func(cr *bufio.Reader) { + t.Helper() + l, _ := cr.ReadString('\n') + if !strings.HasPrefix(l, "PONG") { + t.Fatalf("Expected a PONG, got %q", l) + } } - c, cr, l := newClientForServer(s) + // Create a client. + c, cr, cs := createClient(t, s, barKP) + parseAsync, quit := genAsyncParser(c) + defer func() { quit <- true }() - // Sign Nonce - var info nonceInfo - json.Unmarshal([]byte(l[5:]), &info) - sigraw, _ := nkp.Sign([]byte(info.Nonce)) - sig := base64.StdEncoding.EncodeToString(sigraw) + parseAsync(cs) + expectPong(cr) - // PING needed to flush the +OK/-ERR to us. - cs := fmt.Sprintf("CONNECT {\"jwt\":%q,\"sig\":\"%s\",\"verbose\":true,\"pedantic\":true}\r\nSUB import.foo 1\r\nPING\r\n", ujwt, sig) - go c.parse([]byte(cs)) - l, _ = cr.ReadString('\n') - if !strings.HasPrefix(l, "+OK") { - t.Fatalf("Expected an OK, got: %v", l) - } - l, _ = cr.ReadString('\n') - if !strings.HasPrefix(l, "+OK") { - t.Fatalf("Expected an OK, got: %v", l) - } - l, _ = cr.ReadString('\n') - if !strings.HasPrefix(l, "PONG\r\n") { - t.Fatalf("PONG response incorrect: %q\n", l) - } + parseAsync("SUB import.foo 1\r\nPING\r\n") + expectPong(cr) checkShadow := func(expected int) { t.Helper() @@ -1012,7 +915,7 @@ func TestJWTAccountImportActivationExpires(t *testing.T) { // We created a SUB on foo which should create a shadow subscription. checkShadow(1) - time.Sleep(2 * time.Second) + time.Sleep(1250 * time.Millisecond) // Should have expired and been removed. checkShadow(0) @@ -1028,66 +931,33 @@ func TestJWTAccountLimitsSubs(t *testing.T) { // Create accounts and imports/exports. fooKP, _ := nkeys.CreateAccount() fooPub, _ := fooKP.PublicKey() - fooAC := jwt.NewAccountClaims(string(fooPub)) + fooAC := jwt.NewAccountClaims(fooPub) fooAC.Limits.Subs = 10 fooJWT, err := fooAC.Encode(okp) if err != nil { t.Fatalf("Error generating account JWT: %v", err) } - addAccountToMemResolver(s, string(fooPub), fooJWT) + addAccountToMemResolver(s, fooPub, fooJWT) - // Create a client. - nkp, _ := nkeys.CreateUser() - pub, _ := nkp.PublicKey() - nuc := jwt.NewUserClaims(string(pub)) - ujwt, err := nuc.Encode(fooKP) - if err != nil { - t.Fatalf("Error generating user JWT: %v", err) + expectPong := func(cr *bufio.Reader) { + t.Helper() + l, _ := cr.ReadString('\n') + if !strings.HasPrefix(l, "PONG") { + t.Fatalf("Expected a PONG, got %q", l) + } } - c, cr, l := newClientForServer(s) - - // Sign Nonce - var info nonceInfo - json.Unmarshal([]byte(l[5:]), &info) - sigraw, _ := nkp.Sign([]byte(info.Nonce)) - sig := base64.StdEncoding.EncodeToString(sigraw) - - quit := make(chan bool) + // Create a client. + c, cr, cs := createClient(t, s, fooKP) + parseAsync, quit := genAsyncParser(c) defer func() { quit <- true }() - pab := make(chan []byte, 16) - - parseAsync := func(cs []byte) { - pab <- cs - } - - go func() { - for { - select { - case cs := <-pab: - c.parse(cs) - case <-quit: - return - } - } - }() - - // PING needed to flush the +OK/-ERR to us. - cs := fmt.Sprintf("CONNECT {\"jwt\":%q,\"sig\":\"%s\",\"verbose\":true,\"pedantic\":true}\r\nPING\r\n", ujwt, sig) - parseAsync([]byte(cs)) - l, _ = cr.ReadString('\n') - if !strings.HasPrefix(l, "+OK") { - t.Fatalf("Expected an OK, got: %v", l) - } - l, _ = cr.ReadString('\n') - if !strings.HasPrefix(l, "PONG") { - t.Fatalf("Expected a PONG") - } + parseAsync(cs) + expectPong(cr) // Check to make sure we have the limit set. // Account first - fooAcc := s.LookupAccount(string(fooPub)) + fooAcc := s.LookupAccount(fooPub) fooAcc.mu.RLock() if fooAcc.msubs != 10 { fooAcc.mu.RUnlock() @@ -1105,22 +975,13 @@ func TestJWTAccountLimitsSubs(t *testing.T) { // Now make sure its enforced. /// These should all work ok. for i := 0; i < 10; i++ { - cs := fmt.Sprintf("SUB foo %d\r\nPING\r\n", i) - parseAsync([]byte(cs)) - l, _ = cr.ReadString('\n') - if !strings.HasPrefix(l, "+OK") { - t.Fatalf("Expected an OK, got: %v", l) - } - l, _ = cr.ReadString('\n') - if !strings.HasPrefix(l, "PONG") { - t.Fatalf("Expected a PONG") - } + parseAsync(fmt.Sprintf("SUB foo %d\r\nPING\r\n", i)) + expectPong(cr) } // This one should fail. - cs = fmt.Sprintf("SUB foo 22\r\n") - parseAsync([]byte(cs)) - l, _ = cr.ReadString('\n') + parseAsync("SUB foo 22\r\n") + l, _ := cr.ReadString('\n') if !strings.HasPrefix(l, "-ERR") { t.Fatalf("Expected an ERR, got: %v", l) } @@ -1134,7 +995,7 @@ func TestJWTAccountLimitsSubs(t *testing.T) { if err != nil { t.Fatalf("Error generating account JWT: %v", err) } - addAccountToMemResolver(s, string(fooPub), fooJWT) + addAccountToMemResolver(s, fooPub, fooJWT) s.updateAccountClaims(fooAcc, fooAC) l, _ = cr.ReadString('\n') if !strings.HasPrefix(l, "-ERR") { @@ -1159,14 +1020,14 @@ func TestJWTAccountLimitsSubsButServerOverrides(t *testing.T) { // Create accounts and imports/exports. fooKP, _ := nkeys.CreateAccount() fooPub, _ := fooKP.PublicKey() - fooAC := jwt.NewAccountClaims(string(fooPub)) + fooAC := jwt.NewAccountClaims(fooPub) fooAC.Limits.Subs = 10 fooJWT, err := fooAC.Encode(okp) if err != nil { t.Fatalf("Error generating account JWT: %v", err) } - addAccountToMemResolver(s, string(fooPub), fooJWT) - fooAcc := s.LookupAccount(string(fooPub)) + addAccountToMemResolver(s, fooPub, fooJWT) + fooAcc := s.LookupAccount(fooPub) fooAcc.mu.RLock() if fooAcc.msubs != 10 { fooAcc.mu.RUnlock() @@ -1174,32 +1035,33 @@ func TestJWTAccountLimitsSubsButServerOverrides(t *testing.T) { } fooAcc.mu.RUnlock() - // Create a client. - nkp, _ := nkeys.CreateUser() - pub, _ := nkp.PublicKey() - nuc := jwt.NewUserClaims(string(pub)) - ujwt, err := nuc.Encode(fooKP) - if err != nil { - t.Fatalf("Error generating user JWT: %v", err) + expectPong := func(cr *bufio.Reader) { + t.Helper() + l, _ := cr.ReadString('\n') + if !strings.HasPrefix(l, "PONG") { + t.Fatalf("Expected a PONG, got %q", l) + } } - c, cr, l := newClientForServer(s) + // Create a client. + c, cr, cs := createClient(t, s, fooKP) + parseAsync, quit := genAsyncParser(c) + defer func() { quit <- true }() - // Sign Nonce - var info nonceInfo - json.Unmarshal([]byte(l[5:]), &info) - sigraw, _ := nkp.Sign([]byte(info.Nonce)) - sig := base64.StdEncoding.EncodeToString(sigraw) + parseAsync(cs) + expectPong(cr) + + parseAsync("SUB foo 1\r\nSUB bar 2\r\nSUB baz 3\r\nPING\r\n") + l, _ := cr.ReadString('\n') - cs := fmt.Sprintf("CONNECT {\"jwt\":%q,\"sig\":\"%s\"}\r\nSUB foo 1\r\nSUB bar 2\r\nSUB baz 3\r\nPING\r\n", ujwt, sig) - go c.parse([]byte(cs)) - l, _ = cr.ReadString('\n') if !strings.HasPrefix(l, "-ERR ") { t.Fatalf("Expected an error") } if !strings.Contains(l, "Maximum Subscriptions Exceeded") { t.Fatalf("Expected an ERR for max subscriptions exceeded, got: %v", l) } + // Read last PONG so does not hold up test. + cr.ReadString('\n') } func TestJWTAccountLimitsMaxPayload(t *testing.T) { @@ -1212,61 +1074,33 @@ func TestJWTAccountLimitsMaxPayload(t *testing.T) { // Create accounts and imports/exports. fooKP, _ := nkeys.CreateAccount() fooPub, _ := fooKP.PublicKey() - fooAC := jwt.NewAccountClaims(string(fooPub)) + fooAC := jwt.NewAccountClaims(fooPub) fooAC.Limits.Payload = 8 fooJWT, err := fooAC.Encode(okp) if err != nil { t.Fatalf("Error generating account JWT: %v", err) } - addAccountToMemResolver(s, string(fooPub), fooJWT) + addAccountToMemResolver(s, fooPub, fooJWT) - // Create a client. - nkp, _ := nkeys.CreateUser() - pub, _ := nkp.PublicKey() - nuc := jwt.NewUserClaims(string(pub)) - ujwt, err := nuc.Encode(fooKP) - if err != nil { - t.Fatalf("Error generating user JWT: %v", err) + expectPong := func(cr *bufio.Reader) { + t.Helper() + l, _ := cr.ReadString('\n') + if !strings.HasPrefix(l, "PONG") { + t.Fatalf("Expected a PONG, got %q", l) + } } - c, cr, l := newClientForServer(s) - - // Sign Nonce - var info nonceInfo - json.Unmarshal([]byte(l[5:]), &info) - sigraw, _ := nkp.Sign([]byte(info.Nonce)) - sig := base64.StdEncoding.EncodeToString(sigraw) - - quit := make(chan bool) + // Create a client. + c, cr, cs := createClient(t, s, fooKP) + parseAsync, quit := genAsyncParser(c) defer func() { quit <- true }() - pab := make(chan []byte, 16) - - parseAsync := func(cs []byte) { - pab <- cs - } - - go func() { - for { - select { - case cs := <-pab: - c.parse(cs) - case <-quit: - return - } - } - }() - - cs := fmt.Sprintf("CONNECT {\"jwt\":%q,\"sig\":\"%s\"}\r\nPING\r\n", ujwt, sig) - parseAsync([]byte(cs)) - l, _ = cr.ReadString('\n') - if !strings.HasPrefix(l, "PONG") { - t.Fatalf("Expected a PONG") - } + parseAsync(cs) + expectPong(cr) // Check to make sure we have the limit set. // Account first - fooAcc := s.LookupAccount(string(fooPub)) + fooAcc := s.LookupAccount(fooPub) fooAcc.mu.RLock() if fooAcc.mpay != 8 { fooAcc.mu.RUnlock() @@ -1281,16 +1115,11 @@ func TestJWTAccountLimitsMaxPayload(t *testing.T) { } c.mu.Unlock() - cs = fmt.Sprintf("PUB foo 4\r\nXXXX\r\nPING\r\n") - parseAsync([]byte(cs)) - l, _ = cr.ReadString('\n') - if !strings.HasPrefix(l, "PONG") { - t.Fatalf("Expected a PONG") - } + parseAsync("PUB foo 4\r\nXXXX\r\nPING\r\n") + expectPong(cr) - cs = fmt.Sprintf("PUB foo 10\r\nXXXXXXXXXX\r\nPING\r\n") - parseAsync([]byte(cs)) - l, _ = cr.ReadString('\n') + parseAsync("PUB foo 10\r\nXXXXXXXXXX\r\nPING\r\n") + l, _ := cr.ReadString('\n') if !strings.HasPrefix(l, "-ERR ") { t.Fatalf("Expected an error") } @@ -1313,34 +1142,32 @@ func TestJWTAccountLimitsMaxPayloadButServerOverrides(t *testing.T) { // Create accounts and imports/exports. fooKP, _ := nkeys.CreateAccount() fooPub, _ := fooKP.PublicKey() - fooAC := jwt.NewAccountClaims(string(fooPub)) + fooAC := jwt.NewAccountClaims(fooPub) fooAC.Limits.Payload = 8 fooJWT, err := fooAC.Encode(okp) if err != nil { t.Fatalf("Error generating account JWT: %v", err) } - addAccountToMemResolver(s, string(fooPub), fooJWT) + addAccountToMemResolver(s, fooPub, fooJWT) - // Create a client. - nkp, _ := nkeys.CreateUser() - pub, _ := nkp.PublicKey() - nuc := jwt.NewUserClaims(string(pub)) - ujwt, err := nuc.Encode(fooKP) - if err != nil { - t.Fatalf("Error generating user JWT: %v", err) + expectPong := func(cr *bufio.Reader) { + t.Helper() + l, _ := cr.ReadString('\n') + if !strings.HasPrefix(l, "PONG") { + t.Fatalf("Expected a PONG, got %q", l) + } } - c, cr, l := newClientForServer(s) + // Create a client. + c, cr, cs := createClient(t, s, fooKP) + parseAsync, quit := genAsyncParser(c) + defer func() { quit <- true }() - // Sign Nonce - var info nonceInfo - json.Unmarshal([]byte(l[5:]), &info) - sigraw, _ := nkp.Sign([]byte(info.Nonce)) - sig := base64.StdEncoding.EncodeToString(sigraw) + parseAsync(cs) + expectPong(cr) - cs := fmt.Sprintf("CONNECT {\"jwt\":%q,\"sig\":\"%s\"}\r\nPUB foo 6\r\nXXXXXX\r\nPING\r\n", ujwt, sig) - go c.parse([]byte(cs)) - l, _ = cr.ReadString('\n') + parseAsync("PUB foo 6\r\nXXXXXX\r\nPING\r\n") + l, _ := cr.ReadString('\n') if !strings.HasPrefix(l, "-ERR ") { t.Fatalf("Expected an error") } @@ -1361,42 +1188,172 @@ func TestJWTAccountLimitsMaxConns(t *testing.T) { // Create accounts and imports/exports. fooKP, _ := nkeys.CreateAccount() fooPub, _ := fooKP.PublicKey() - fooAC := jwt.NewAccountClaims(string(fooPub)) + fooAC := jwt.NewAccountClaims(fooPub) fooAC.Limits.Conn = 8 fooJWT, err := fooAC.Encode(okp) if err != nil { t.Fatalf("Error generating account JWT: %v", err) } - addAccountToMemResolver(s, string(fooPub), fooJWT) + addAccountToMemResolver(s, fooPub, fooJWT) newClient := func(expPre string) { t.Helper() // Create a client. - nkp, _ := nkeys.CreateUser() - pub, _ := nkp.PublicKey() - nuc := jwt.NewUserClaims(string(pub)) - ujwt, err := nuc.Encode(fooKP) - if err != nil { - t.Fatalf("Error generating user JWT: %v", err) - } - c, cr, l := newClientForServer(s) - - // Sign Nonce - var info nonceInfo - json.Unmarshal([]byte(l[5:]), &info) - sigraw, _ := nkp.Sign([]byte(info.Nonce)) - sig := base64.StdEncoding.EncodeToString(sigraw) - cs := fmt.Sprintf("CONNECT {\"jwt\":%q,\"sig\":\"%s\", \"verbose\":true}\r\nPING\r\n", ujwt, sig) + c, cr, cs := createClient(t, s, fooKP) go c.parse([]byte(cs)) - l, _ = cr.ReadString('\n') + l, _ := cr.ReadString('\n') if !strings.HasPrefix(l, expPre) { t.Fatalf("Expected a response starting with %q", expPre) } } for i := 0; i < 8; i++ { - newClient("+OK") + newClient("PONG") } // Now this one should fail. newClient("-ERR ") } + +func TestJWTAccountServiceImportExpires(t *testing.T) { + s := opTrustBasicSetup() + defer s.Shutdown() + buildMemAccResolver(s) + + okp, _ := nkeys.FromSeed(oSeed) + + // Create accounts and imports/exports. + fooKP, _ := nkeys.CreateAccount() + fooPub, _ := fooKP.PublicKey() + fooAC := jwt.NewAccountClaims(fooPub) + serviceExport := &jwt.Export{Subject: "foo", Type: jwt.Service} + + fooAC.Exports.Add(serviceExport) + fooJWT, err := fooAC.Encode(okp) + if err != nil { + t.Fatalf("Error generating account JWT: %v", err) + } + addAccountToMemResolver(s, fooPub, fooJWT) + + barKP, _ := nkeys.CreateAccount() + barPub, _ := barKP.PublicKey() + barAC := jwt.NewAccountClaims(barPub) + serviceImport := &jwt.Import{Account: fooPub, Subject: "foo", Type: jwt.Service} + + barAC.Imports.Add(serviceImport) + barJWT, err := barAC.Encode(okp) + if err != nil { + t.Fatalf("Error generating account JWT: %v", err) + } + addAccountToMemResolver(s, barPub, barJWT) + + expectPong := func(cr *bufio.Reader) { + t.Helper() + l, _ := cr.ReadString('\n') + if !strings.HasPrefix(l, "PONG") { + t.Fatalf("Expected a PONG, got %q", l) + } + } + + expectMsg := func(cr *bufio.Reader, sub, pay string) { + t.Helper() + l, _ := cr.ReadString('\n') + expected := "MSG " + sub + if !strings.HasPrefix(l, expected) { + t.Fatalf("Expected %q, got %q", expected, l) + } + l, _ = cr.ReadString('\n') + if l != pay+"\r\n" { + t.Fatalf("Expected %q, got %q", pay, l) + } + expectPong(cr) + } + + // Create a client that will send the request + ca, cra, csa := createClient(t, s, barKP) + parseAsyncA, quitA := genAsyncParser(ca) + defer func() { quitA <- true }() + parseAsyncA(csa) + expectPong(cra) + + // Create the client that will respond to the requests. + cb, crb, csb := createClient(t, s, fooKP) + parseAsyncB, quitB := genAsyncParser(cb) + defer func() { quitB <- true }() + parseAsyncB(csb) + expectPong(crb) + + // Create Subscriber. + parseAsyncB("SUB foo 1\r\nPING\r\n") + expectPong(crb) + + // Send Request + parseAsyncA("PUB foo 2\r\nhi\r\nPING\r\n") + expectPong(cra) + + // We should receive the request. PING needed to flush. + parseAsyncB("PING\r\n") + expectMsg(crb, "foo", "hi") + + // Now update the exported service to require auth. + fooAC = jwt.NewAccountClaims(fooPub) + serviceExport = &jwt.Export{Subject: "foo", Type: jwt.Service, TokenReq: true} + + fooAC.Exports.Add(serviceExport) + fooJWT, err = fooAC.Encode(okp) + if err != nil { + t.Fatalf("Error generating account JWT: %v", err) + } + addAccountToMemResolver(s, fooPub, fooJWT) + s.updateAccountClaims(s.LookupAccount(fooPub), fooAC) + + // Send Another Request + parseAsyncA("PUB foo 2\r\nhi\r\nPING\r\n") + expectPong(cra) + + // We should not receive the request this time. + parseAsyncB("PING\r\n") + expectPong(crb) + + // Now get an activation token such that it will work, but will expire. + barAC = jwt.NewAccountClaims(barPub) + serviceImport = &jwt.Import{Account: fooPub, Subject: "foo", Type: jwt.Service} + + activation := jwt.NewActivationClaims(barPub) + activation.ImportSubject = "foo" + activation.ImportType = jwt.Service + activation.IssuedAt = time.Now().Add(-10 * time.Second).Unix() + activation.Expires = time.Now().Add(time.Second).Unix() + actJWT, err := activation.Encode(fooKP) + if err != nil { + t.Fatalf("Error generating activation token: %v", err) + } + serviceImport.Token = actJWT + + barAC.Imports.Add(serviceImport) + barJWT, err = barAC.Encode(okp) + if err != nil { + t.Fatalf("Error generating account JWT: %v", err) + } + addAccountToMemResolver(s, barPub, barJWT) + s.updateAccountClaims(s.LookupAccount(barPub), barAC) + + // Now it should work again. + // Send Another Request + parseAsyncA("PUB foo 3\r\nhi2\r\nPING\r\n") + expectPong(cra) + + // We should receive the request. PING needed to flush. + parseAsyncB("PING\r\n") + expectMsg(crb, "foo", "hi2") + + // Now wait for it to expire, then retry. + time.Sleep(1250 * time.Millisecond) + + // Send Another Request + parseAsyncA("PUB foo 3\r\nhi3\r\nPING\r\n") + expectPong(cra) + + // We should receive the request. PING needed to flush. + parseAsyncB("PING\r\n") + expectPong(crb) +}