Skip to content

Commit

Permalink
Merge pull request #1793 from nats-io/jwt-resp-threshold
Browse files Browse the repository at this point in the history
[Added] support for jwt export response threshold
  • Loading branch information
matthiashanel committed Jan 6, 2021
2 parents 4649f9e + 1149c4c commit 9bdbe57
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 3 deletions.
13 changes: 10 additions & 3 deletions server/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -2903,16 +2903,24 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
}
if err := a.AddServiceExportWithResponse(string(e.Subject), rt, authAccounts(e.TokenReq)); err != nil {
s.Debugf("Error adding service export to account [%s]: %v", a.Name, err)
continue
}
sub := string(e.Subject)
if e.Latency != nil {
if err := a.TrackServiceExportWithSampling(string(e.Subject), string(e.Latency.Results), int(e.Latency.Sampling)); err != nil {
if err := a.TrackServiceExportWithSampling(sub, string(e.Latency.Results), int(e.Latency.Sampling)); err != nil {
hdrNote := ""
if e.Latency.Sampling == jwt.Headers {
hdrNote = " (using headers)"
}
s.Debugf("Error adding latency tracking%s for service export to account [%s]: %v", hdrNote, a.Name, err)
}
}
if e.ResponseThreshold != 0 {
// Response threshold was set in options.
if err := a.SetServiceExportResponseThreshold(sub, e.ResponseThreshold); err != nil {
s.Debugf("Error adding service export response threshold for [%s]: %v", a.Name, err)
}
}
}
// We will track these at the account level. Should not have any collisions.
if e.Revocations != nil {
Expand Down Expand Up @@ -2945,7 +2953,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
to := i.GetTo()
switch i.Type {
case jwt.Stream:
if i.LocalSubject != "" {
if i.LocalSubject != _EMPTY_ {
// set local subject implies to is empty
to = string(i.LocalSubject)
s.Debugf("Adding stream import %s:%q for %s:%q", acc.Name, from, a.Name, to)
Expand All @@ -2959,7 +2967,6 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
incompleteImports = append(incompleteImports, i)
}
case jwt.Service:
// FIXME(dlc) - need to add in respThresh here eventually.
if i.LocalSubject != _EMPTY_ {
from = string(i.LocalSubject)
to = string(i.Subject)
Expand Down
72 changes: 72 additions & 0 deletions server/jwt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4818,3 +4818,75 @@ func TestJWTAccountImportsWithWildcardSupport(t *testing.T) {
"my.request.1.2.bar", "my.events.2.1.bar")
})
}

func TestJWTResponseThreshold(t *testing.T) {
respThresh := 20 * time.Millisecond
aExpKp, aExpPub := createKey(t)
aExpClaim := jwt.NewAccountClaims(aExpPub)
aExpClaim.Name = "Export"
aExpClaim.Exports.Add(&jwt.Export{
Subject: "srvc",
Type: jwt.Service,
ResponseThreshold: respThresh,
})
aExpJwt := encodeClaim(t, aExpClaim, aExpPub)
aExpCreds := newUser(t, aExpKp)

defer os.Remove(aExpCreds)
aImpKp, aImpPub := createKey(t)
aImpClaim := jwt.NewAccountClaims(aImpPub)
aImpClaim.Name = "Import"
aImpClaim.Imports.Add(&jwt.Import{
Subject: "srvc",
Type: jwt.Service,
Account: aExpPub,
})
aImpJwt := encodeClaim(t, aImpClaim, aImpPub)
aImpCreds := newUser(t, aImpKp)
defer os.Remove(aImpCreds)

cf := createConfFile(t, []byte(fmt.Sprintf(`
port: -1
operator = %s
resolver = MEMORY
resolver_preload = {
%s : "%s"
%s : "%s"
}
`, ojwt, aExpPub, aExpJwt, aImpPub, aImpJwt)))
defer os.Remove(cf)

s, opts := RunServerWithConfig(cf)
defer s.Shutdown()

ncExp := natsConnect(t, fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port), nats.UserCredentials(aExpCreds))
defer ncExp.Close()

ncImp := natsConnect(t, fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port), nats.UserCredentials(aImpCreds))
defer ncImp.Close()

delayChan := make(chan time.Duration, 1)

// Create subscriber for the service endpoint in foo.
_, err := ncExp.Subscribe("srvc", func(m *nats.Msg) {
time.Sleep(<-delayChan)
m.Respond([]byte("yes!"))
})
require_NoError(t, err)
ncExp.Flush()

t.Run("No-Timeout", func(t *testing.T) {
delayChan <- respThresh / 2
if resp, err := ncImp.Request("srvc", []byte("yes?"), 4*respThresh); err != nil {
t.Fatalf("Expected a response to request srvc got: %v", err)
} else if string(resp.Data) != "yes!" {
t.Fatalf("Expected a response of %q, got %q", "yes!", resp.Data)
}
})
t.Run("Timeout", func(t *testing.T) {
delayChan <- 2 * respThresh
if _, err := ncImp.Request("srvc", []byte("yes?"), 4*respThresh); err == nil || err != nats.ErrTimeout {
t.Fatalf("Expected a timeout")
}
})
}

0 comments on commit 9bdbe57

Please sign in to comment.