Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] Distributed Message Tracing #5014

Merged
merged 7 commits into from Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
39 changes: 36 additions & 3 deletions server/accounts.go
Expand Up @@ -128,6 +128,10 @@ type streamImport struct {
claim *jwt.Import
usePub bool
invalid bool
// This is `allow_trace` and when true and message tracing is happening,
kozlovic marked this conversation as resolved.
Show resolved Hide resolved
// we will trace egresses past the account boundary, if `false`, we stop
// at the account boundary.
atrc bool
}

const ClientInfoHdr = "Nats-Request-Info"
Expand Down Expand Up @@ -209,6 +213,11 @@ type serviceExport struct {
latency *serviceLatency
rtmr *time.Timer
respThresh time.Duration
// This is `allow_trace` and when true and message tracing is happening,
// when processing a service import we will go through account boundary
// and trace egresses on that other account. If `false`, we stop at the
// account boundary.
atrc bool
}

// Used to track service latency.
Expand Down Expand Up @@ -2367,6 +2376,18 @@ func (a *Account) SetServiceExportResponseThreshold(export string, maxTime time.
return nil
}

func (a *Account) SetServiceExportAllowTrace(export string, allowTrace bool) error {
a.mu.Lock()
se := a.getServiceExport(export)
if se == nil {
a.mu.Unlock()
return fmt.Errorf("no export defined for %q", export)
}
se.atrc = allowTrace
a.mu.Unlock()
return nil
}

// This is for internal service import responses.
func (a *Account) addRespServiceImport(dest *Account, to string, osi *serviceImport, tracking bool, header http.Header) *serviceImport {
nrr := string(osi.acc.newServiceReply(tracking))
Expand Down Expand Up @@ -2405,6 +2426,10 @@ func (a *Account) addRespServiceImport(dest *Account, to string, osi *serviceImp

// AddStreamImportWithClaim will add in the stream import from a specific account with optional token.
func (a *Account) AddStreamImportWithClaim(account *Account, from, prefix string, imClaim *jwt.Import) error {
return a.addStreamImportWithClaim(account, from, prefix, false, imClaim)
}

func (a *Account) addStreamImportWithClaim(account *Account, from, prefix string, allowTrace bool, imClaim *jwt.Import) error {
if account == nil {
return ErrMissingAccount
}
Expand All @@ -2427,7 +2452,7 @@ func (a *Account) AddStreamImportWithClaim(account *Account, from, prefix string
}
}

return a.AddMappedStreamImportWithClaim(account, from, prefix+from, imClaim)
return a.addMappedStreamImportWithClaim(account, from, prefix+from, allowTrace, imClaim)
}

// AddMappedStreamImport helper for AddMappedStreamImportWithClaim
Expand All @@ -2437,6 +2462,10 @@ func (a *Account) AddMappedStreamImport(account *Account, from, to string) error

// AddMappedStreamImportWithClaim will add in the stream import from a specific account with optional token.
func (a *Account) AddMappedStreamImportWithClaim(account *Account, from, to string, imClaim *jwt.Import) error {
return a.addMappedStreamImportWithClaim(account, from, to, false, imClaim)
}

func (a *Account) addMappedStreamImportWithClaim(account *Account, from, to string, allowTrace bool, imClaim *jwt.Import) error {
if account == nil {
return ErrMissingAccount
}
Expand Down Expand Up @@ -2478,7 +2507,11 @@ func (a *Account) AddMappedStreamImportWithClaim(account *Account, from, to stri
a.mu.Unlock()
return ErrStreamImportDuplicate
}
a.imports.streams = append(a.imports.streams, &streamImport{account, from, to, tr, nil, imClaim, usePub, false})
// TODO(ik): When AllowTrace is added to JWT, uncomment those lines:
// if imClaim != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe coordinate with @aricart to get it added.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There would be also some code to add in updateAccountClaimsWithRefresh() to possibly call the function to set the service export allow trace property. And I would like to add a test with JWT of course. I would prefer to wait and add the JWT support before merging, but that would depend on how long it would take to get the JWT library updated. It can of course all be added after the merge. Your call Derek.
@aricart Would you mind adding a AllowTrace boolean property to a ServiceExport and StreamImport? If you are using common structures, validation should make it an error if this property is set on a ServiceImport or a StreamExport (the opposite of where it is valid).

// allowTrace = imClaim.AllowTrace
// }
a.imports.streams = append(a.imports.streams, &streamImport{account, from, to, tr, nil, imClaim, usePub, false, allowTrace})
a.mu.Unlock()
return nil
}
Expand All @@ -2496,7 +2529,7 @@ func (a *Account) isStreamImportDuplicate(acc *Account, from string) bool {

// AddStreamImport will add in the stream import from a specific account.
func (a *Account) AddStreamImport(account *Account, from, prefix string) error {
return a.AddStreamImportWithClaim(account, from, prefix, nil)
return a.addStreamImportWithClaim(account, from, prefix, false, nil)
}

// IsPublicExport is a placeholder to denote a public export.
Expand Down
10 changes: 10 additions & 0 deletions server/accounts_test.go
Expand Up @@ -622,6 +622,13 @@ func TestAccountParseConfigImportsExports(t *testing.T) {
if lis := len(natsAcc.imports.streams); lis != 2 {
t.Fatalf("Expected 2 imported streams, got %d\n", lis)
}
for _, si := range natsAcc.imports.streams {
if si.from == "public.synadia" {
require_True(t, si.atrc)
} else {
require_False(t, si.atrc)
}
}
if lis := len(natsAcc.imports.services); lis != 1 {
t.Fatalf("Expected 1 imported service, got %d\n", lis)
}
Expand All @@ -639,20 +646,23 @@ func TestAccountParseConfigImportsExports(t *testing.T) {
if ea.respType != Streamed {
t.Fatalf("Expected to get a Streamed response type, got %q", ea.respType)
}
require_True(t, ea.atrc)
ea = natsAcc.exports.services["nats.photo"]
if ea == nil {
t.Fatalf("Expected to get a non-nil exportAuth for service")
}
if ea.respType != Chunked {
t.Fatalf("Expected to get a Chunked response type, got %q", ea.respType)
}
require_False(t, ea.atrc)
ea = natsAcc.exports.services["nats.add"]
if ea == nil {
t.Fatalf("Expected to get a non-nil exportAuth for service")
}
if ea.respType != Singleton {
t.Fatalf("Expected to get a Singleton response type, got %q", ea.respType)
}
require_True(t, ea.atrc)

if synAcc == nil {
t.Fatalf("Error retrieving account for 'synadia'")
Expand Down