Skip to content

Commit

Permalink
Made updates to message tracing code to support JWT updates
Browse files Browse the repository at this point in the history
Related to #5014

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Feb 14, 2024
1 parent bd7ef64 commit 3608bcf
Show file tree
Hide file tree
Showing 5 changed files with 428 additions and 115 deletions.
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -5,7 +5,7 @@ go 1.20
require (
github.com/klauspost/compress v1.17.6
github.com/minio/highwayhash v1.0.2
github.com/nats-io/jwt/v2 v2.5.3
github.com/nats-io/jwt/v2 v2.5.4-0.20240214164243-40f01dce329c
github.com/nats-io/nats.go v1.32.0
github.com/nats-io/nkeys v0.4.7
github.com/nats-io/nuid v1.0.1
Expand Down
10 changes: 2 additions & 8 deletions go.sum
@@ -1,12 +1,10 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/klauspost/compress v1.17.5 h1:d4vBd+7CHydUqpFBgUEKkSdtSugf9YFmSkvUYPquI5E=
github.com/klauspost/compress v1.17.5/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/klauspost/compress v1.17.6 h1:60eq2E/jlfwQXtvZEeBUYADs+BwKBWURIY+Gj2eRGjI=
github.com/klauspost/compress v1.17.6/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo=
github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4=
github.com/nats-io/jwt/v2 v2.5.4-0.20240214164243-40f01dce329c h1:MxTE8IDAMTeHunOmrm4DojsMiZfOO+5ovBD+Ca3Guy0=
github.com/nats-io/jwt/v2 v2.5.4-0.20240214164243-40f01dce329c/go.mod h1:9d5GwImcMyYc5qCEt6N3ebkyviwwVBssCnHz9yRqPCM=
github.com/nats-io/nats.go v1.32.0 h1:Bx9BZS+aXYlxW08k8Gd3yR2s73pV5XSoAQUyp1Kwvp0=
github.com/nats-io/nats.go v1.32.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
Expand All @@ -18,13 +16,9 @@ github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8=
go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc=
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
Expand Down
38 changes: 28 additions & 10 deletions server/accounts.go
Expand Up @@ -157,6 +157,7 @@ type serviceImport struct {
share bool
tracking bool
didDeliver bool
atrc bool // allow trace (got from service export)
trackingHdr http.Header // header from request
}

Expand Down Expand Up @@ -1908,11 +1909,13 @@ func (a *Account) addServiceImport(dest *Account, from, to string, claim *jwt.Im
return nil, ErrMissingAccount
}

var atrc bool
dest.mu.RLock()
se := dest.getServiceExport(to)
if se != nil {
rt = se.respType
lat = se.latency
atrc = se.atrc
}
dest.mu.RUnlock()

Expand Down Expand Up @@ -1956,7 +1959,7 @@ func (a *Account) addServiceImport(dest *Account, from, to string, claim *jwt.Im
if claim != nil {
share = claim.Share
}
si := &serviceImport{dest, claim, se, nil, from, to, tr, 0, rt, lat, nil, nil, usePub, false, false, share, false, false, nil}
si := &serviceImport{dest, claim, se, nil, from, to, tr, 0, rt, lat, nil, nil, usePub, false, false, share, false, false, atrc, nil}
a.imports.services[from] = si
a.mu.Unlock()

Expand Down Expand Up @@ -2421,7 +2424,7 @@ func (a *Account) addRespServiceImport(dest *Account, to string, osi *serviceImp

// dest is the requestor's account. a is the service responder with the export.
// Marked as internal here, that is how we distinguish.
si := &serviceImport{dest, nil, osi.se, nil, nrr, to, nil, 0, rt, nil, nil, nil, false, true, false, osi.share, false, false, nil}
si := &serviceImport{dest, nil, osi.se, nil, nrr, to, nil, 0, rt, nil, nil, nil, false, true, false, osi.share, false, false, false, nil}

if a.exports.responses == nil {
a.exports.responses = make(map[string]*serviceImport)
Expand Down Expand Up @@ -2531,10 +2534,9 @@ func (a *Account) addMappedStreamImportWithClaim(account *Account, from, to stri
a.mu.Unlock()
return ErrStreamImportDuplicate
}
// TODO(ik): When AllowTrace is added to JWT, uncomment those lines:
// if imClaim != nil {
// allowTrace = imClaim.AllowTrace
// }
if imClaim != nil {
allowTrace = imClaim.AllowTrace
}
a.imports.streams = append(a.imports.streams, &streamImport{account, from, to, tr, nil, imClaim, usePub, false, allowTrace})
a.mu.Unlock()
return nil
Expand Down Expand Up @@ -2869,7 +2871,9 @@ func (a *Account) checkStreamImportsEqual(b *Account) bool {
bm[bim.acc.Name+bim.from+bim.to] = bim
}
for _, aim := range a.imports.streams {
if _, ok := bm[aim.acc.Name+aim.from+aim.to]; !ok {
if bim, ok := bm[aim.acc.Name+aim.from+aim.to]; !ok {
return false
} else if aim.atrc != bim.atrc {
return false
}
}
Expand Down Expand Up @@ -2955,6 +2959,9 @@ func isServiceExportEqual(a, b *serviceExport) bool {
return false
}
}
if a.atrc != b.atrc {
return false
}
return true
}

Expand Down Expand Up @@ -3232,6 +3239,9 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
a.nameTag = ac.Name
a.tags = ac.Tags

// Update TraceDest
a.traceDest = string(ac.TraceDest)

// Check for external authorization.
if ac.HasExternalAuthorization() {
a.extAuth = &jwt.ExternalAuthorization{}
Expand Down Expand Up @@ -3360,6 +3370,9 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
s.Debugf("Error adding service export response threshold for [%s]: %v", a.traceLabel(), err)
}
}
if err := a.SetServiceExportAllowTrace(sub, e.AllowTrace); err != nil {
s.Debugf("Error adding allow_trace for %q: %v", sub, err)
}
}

var revocationChanged *bool
Expand Down Expand Up @@ -3500,10 +3513,15 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
if si != nil && si.acc.Name == a.Name {
// Check for if we are still authorized for an import.
si.invalid = !a.checkServiceImportAuthorized(acc, si.to, si.claim)
if si.latency != nil && !si.response {
// Make sure we should still be tracking latency.
// Make sure we should still be tracking latency and if we
// are allowed to trace.
if !si.response {
if se := a.getServiceExport(si.to); se != nil {
si.latency = se.latency
if si.latency != nil {
si.latency = se.latency
}
// Update allow trace.
si.atrc = se.atrc
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/client.go
Expand Up @@ -4198,7 +4198,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
}
}
siAcc := si.acc
allowTrace := si.se != nil && si.se.atrc
allowTrace := si.atrc
acc.mu.RUnlock()

// We have a special case where JetStream pulls in all service imports through one export.
Expand Down

0 comments on commit 3608bcf

Please sign in to comment.