From 1149c4cef996e9e5eb06dbfff14e14202e3bdb4a Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Mon, 4 Jan 2021 17:23:00 -0500 Subject: [PATCH] [Added] support for jwt export response threshold Signed-off-by: Matthias Hanel --- server/accounts.go | 13 +++++++-- server/jwt_test.go | 72 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 3 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index 4f77b8f8ce5..7613bda5e53 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -2903,9 +2903,11 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim } if err := a.AddServiceExportWithResponse(string(e.Subject), rt, authAccounts(e.TokenReq)); err != nil { s.Debugf("Error adding service export to account [%s]: %v", a.Name, err) + continue } + sub := string(e.Subject) if e.Latency != nil { - if err := a.TrackServiceExportWithSampling(string(e.Subject), string(e.Latency.Results), int(e.Latency.Sampling)); err != nil { + if err := a.TrackServiceExportWithSampling(sub, string(e.Latency.Results), int(e.Latency.Sampling)); err != nil { hdrNote := "" if e.Latency.Sampling == jwt.Headers { hdrNote = " (using headers)" @@ -2913,6 +2915,12 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim s.Debugf("Error adding latency tracking%s for service export to account [%s]: %v", hdrNote, a.Name, err) } } + if e.ResponseThreshold != 0 { + // Response threshold was set in options. + if err := a.SetServiceExportResponseThreshold(sub, e.ResponseThreshold); err != nil { + s.Debugf("Error adding service export response threshold for [%s]: %v", a.Name, err) + } + } } // We will track these at the account level. Should not have any collisions. if e.Revocations != nil { @@ -2945,7 +2953,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim to := i.GetTo() switch i.Type { case jwt.Stream: - if i.LocalSubject != "" { + if i.LocalSubject != _EMPTY_ { // set local subject implies to is empty to = string(i.LocalSubject) s.Debugf("Adding stream import %s:%q for %s:%q", acc.Name, from, a.Name, to) @@ -2959,7 +2967,6 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim incompleteImports = append(incompleteImports, i) } case jwt.Service: - // FIXME(dlc) - need to add in respThresh here eventually. if i.LocalSubject != _EMPTY_ { from = string(i.LocalSubject) to = string(i.Subject) diff --git a/server/jwt_test.go b/server/jwt_test.go index 2992279b592..2250e417073 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -4818,3 +4818,75 @@ func TestJWTAccountImportsWithWildcardSupport(t *testing.T) { "my.request.1.2.bar", "my.events.2.1.bar") }) } + +func TestJWTResponseThreshold(t *testing.T) { + respThresh := 20 * time.Millisecond + aExpKp, aExpPub := createKey(t) + aExpClaim := jwt.NewAccountClaims(aExpPub) + aExpClaim.Name = "Export" + aExpClaim.Exports.Add(&jwt.Export{ + Subject: "srvc", + Type: jwt.Service, + ResponseThreshold: respThresh, + }) + aExpJwt := encodeClaim(t, aExpClaim, aExpPub) + aExpCreds := newUser(t, aExpKp) + + defer os.Remove(aExpCreds) + aImpKp, aImpPub := createKey(t) + aImpClaim := jwt.NewAccountClaims(aImpPub) + aImpClaim.Name = "Import" + aImpClaim.Imports.Add(&jwt.Import{ + Subject: "srvc", + Type: jwt.Service, + Account: aExpPub, + }) + aImpJwt := encodeClaim(t, aImpClaim, aImpPub) + aImpCreds := newUser(t, aImpKp) + defer os.Remove(aImpCreds) + + cf := createConfFile(t, []byte(fmt.Sprintf(` + port: -1 + operator = %s + resolver = MEMORY + resolver_preload = { + %s : "%s" + %s : "%s" + } + `, ojwt, aExpPub, aExpJwt, aImpPub, aImpJwt))) + defer os.Remove(cf) + + s, opts := RunServerWithConfig(cf) + defer s.Shutdown() + + ncExp := natsConnect(t, fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port), nats.UserCredentials(aExpCreds)) + defer ncExp.Close() + + ncImp := natsConnect(t, fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port), nats.UserCredentials(aImpCreds)) + defer ncImp.Close() + + delayChan := make(chan time.Duration, 1) + + // Create subscriber for the service endpoint in foo. + _, err := ncExp.Subscribe("srvc", func(m *nats.Msg) { + time.Sleep(<-delayChan) + m.Respond([]byte("yes!")) + }) + require_NoError(t, err) + ncExp.Flush() + + t.Run("No-Timeout", func(t *testing.T) { + delayChan <- respThresh / 2 + if resp, err := ncImp.Request("srvc", []byte("yes?"), 4*respThresh); err != nil { + t.Fatalf("Expected a response to request srvc got: %v", err) + } else if string(resp.Data) != "yes!" { + t.Fatalf("Expected a response of %q, got %q", "yes!", resp.Data) + } + }) + t.Run("Timeout", func(t *testing.T) { + delayChan <- 2 * respThresh + if _, err := ncImp.Request("srvc", []byte("yes?"), 4*respThresh); err == nil || err != nats.ErrTimeout { + t.Fatalf("Expected a timeout") + } + }) +}