Skip to content

Commit

Permalink
[ADDED] Distributed Message Tracing (#5014)
Browse files Browse the repository at this point in the history
When an incoming message contains the header `Nats-Trace-Dest` set to a
valid subject, the server will send there messages representing events
happening in each server where the message is processed.

The events will give details about ingress, subject mapping, stream
export and service imports, jetstream and egress to subscriptions and/or
routes, leafnodes and gateways. Except for a standalone server it is
expected to receive multiple trace messages for a given inbound message.

The header `Nats-Trace-Only`, if set to `true`, will produce the same
tracing events than without that header, except that the message will
not be delivered to subscriptions or inserted in the JetStream stream.
This is usefull to see the path that a message would take but without
affecting running applications.

Note that in a mixed environment where not all servers would be running
at 2.11+, when a message is supposed to be traced only and the remote
does not understand message tracing, the message will not be forwared
and the Egress event for that remote will indicate the reason.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>

---------

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic authored and derekcollison committed Feb 10, 2024
1 parent 8d207d4 commit 0b20579
Show file tree
Hide file tree
Showing 19 changed files with 5,523 additions and 121 deletions.
41 changes: 37 additions & 4 deletions server/accounts.go
@@ -1,4 +1,4 @@
// Copyright 2018-2023 The NATS Authors
// Copyright 2018-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -128,6 +128,10 @@ type streamImport struct {
claim *jwt.Import
usePub bool
invalid bool
// This is `allow_trace` and when true and message tracing is happening,
// we will trace egresses past the account boundary, if `false`, we stop
// at the account boundary.
atrc bool
}

const ClientInfoHdr = "Nats-Request-Info"
Expand Down Expand Up @@ -209,6 +213,11 @@ type serviceExport struct {
latency *serviceLatency
rtmr *time.Timer
respThresh time.Duration
// This is `allow_trace` and when true and message tracing is happening,
// when processing a service import we will go through account boundary
// and trace egresses on that other account. If `false`, we stop at the
// account boundary.
atrc bool
}

// Used to track service latency.
Expand Down Expand Up @@ -2367,6 +2376,18 @@ func (a *Account) SetServiceExportResponseThreshold(export string, maxTime time.
return nil
}

func (a *Account) SetServiceExportAllowTrace(export string, allowTrace bool) error {
a.mu.Lock()
se := a.getServiceExport(export)
if se == nil {
a.mu.Unlock()
return fmt.Errorf("no export defined for %q", export)
}
se.atrc = allowTrace
a.mu.Unlock()
return nil
}

// This is for internal service import responses.
func (a *Account) addRespServiceImport(dest *Account, to string, osi *serviceImport, tracking bool, header http.Header) *serviceImport {
nrr := string(osi.acc.newServiceReply(tracking))
Expand Down Expand Up @@ -2405,6 +2426,10 @@ func (a *Account) addRespServiceImport(dest *Account, to string, osi *serviceImp

// AddStreamImportWithClaim will add in the stream import from a specific account with optional token.
func (a *Account) AddStreamImportWithClaim(account *Account, from, prefix string, imClaim *jwt.Import) error {
return a.addStreamImportWithClaim(account, from, prefix, false, imClaim)
}

func (a *Account) addStreamImportWithClaim(account *Account, from, prefix string, allowTrace bool, imClaim *jwt.Import) error {
if account == nil {
return ErrMissingAccount
}
Expand All @@ -2427,7 +2452,7 @@ func (a *Account) AddStreamImportWithClaim(account *Account, from, prefix string
}
}

return a.AddMappedStreamImportWithClaim(account, from, prefix+from, imClaim)
return a.addMappedStreamImportWithClaim(account, from, prefix+from, allowTrace, imClaim)
}

// AddMappedStreamImport helper for AddMappedStreamImportWithClaim
Expand All @@ -2437,6 +2462,10 @@ func (a *Account) AddMappedStreamImport(account *Account, from, to string) error

// AddMappedStreamImportWithClaim will add in the stream import from a specific account with optional token.
func (a *Account) AddMappedStreamImportWithClaim(account *Account, from, to string, imClaim *jwt.Import) error {
return a.addMappedStreamImportWithClaim(account, from, to, false, imClaim)
}

func (a *Account) addMappedStreamImportWithClaim(account *Account, from, to string, allowTrace bool, imClaim *jwt.Import) error {
if account == nil {
return ErrMissingAccount
}
Expand Down Expand Up @@ -2478,7 +2507,11 @@ func (a *Account) AddMappedStreamImportWithClaim(account *Account, from, to stri
a.mu.Unlock()
return ErrStreamImportDuplicate
}
a.imports.streams = append(a.imports.streams, &streamImport{account, from, to, tr, nil, imClaim, usePub, false})
// TODO(ik): When AllowTrace is added to JWT, uncomment those lines:
// if imClaim != nil {
// allowTrace = imClaim.AllowTrace
// }
a.imports.streams = append(a.imports.streams, &streamImport{account, from, to, tr, nil, imClaim, usePub, false, allowTrace})
a.mu.Unlock()
return nil
}
Expand All @@ -2496,7 +2529,7 @@ func (a *Account) isStreamImportDuplicate(acc *Account, from string) bool {

// AddStreamImport will add in the stream import from a specific account.
func (a *Account) AddStreamImport(account *Account, from, prefix string) error {
return a.AddStreamImportWithClaim(account, from, prefix, nil)
return a.addStreamImportWithClaim(account, from, prefix, false, nil)
}

// IsPublicExport is a placeholder to denote a public export.
Expand Down
10 changes: 10 additions & 0 deletions server/accounts_test.go
Expand Up @@ -622,6 +622,13 @@ func TestAccountParseConfigImportsExports(t *testing.T) {
if lis := len(natsAcc.imports.streams); lis != 2 {
t.Fatalf("Expected 2 imported streams, got %d\n", lis)
}
for _, si := range natsAcc.imports.streams {
if si.from == "public.synadia" {
require_True(t, si.atrc)
} else {
require_False(t, si.atrc)
}
}
if lis := len(natsAcc.imports.services); lis != 1 {
t.Fatalf("Expected 1 imported service, got %d\n", lis)
}
Expand All @@ -639,20 +646,23 @@ func TestAccountParseConfigImportsExports(t *testing.T) {
if ea.respType != Streamed {
t.Fatalf("Expected to get a Streamed response type, got %q", ea.respType)
}
require_True(t, ea.atrc)
ea = natsAcc.exports.services["nats.photo"]
if ea == nil {
t.Fatalf("Expected to get a non-nil exportAuth for service")
}
if ea.respType != Chunked {
t.Fatalf("Expected to get a Chunked response type, got %q", ea.respType)
}
require_False(t, ea.atrc)
ea = natsAcc.exports.services["nats.add"]
if ea == nil {
t.Fatalf("Expected to get a non-nil exportAuth for service")
}
if ea.respType != Singleton {
t.Fatalf("Expected to get a Singleton response type, got %q", ea.respType)
}
require_True(t, ea.atrc)

if synAcc == nil {
t.Fatalf("Error retrieving account for 'synadia'")
Expand Down

0 comments on commit 0b20579

Please sign in to comment.