Skip to content

Commit

Permalink
Added OpenTelemetry support
Browse files Browse the repository at this point in the history
If the `Nats-Trace-Dest` header is not present, but `Traceparent` is
and its last token is `01`, then message tracing is triggered. This
also requires that the account be defined with a `trace_dest` subject
so that traces can be sent there.
Note that `Nats-Trace-Only` is not applicable for `Traceparent`.

Addition to PR #5014
Resolves #5052

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Feb 11, 2024
1 parent 368e0f5 commit d58aeae
Show file tree
Hide file tree
Showing 9 changed files with 454 additions and 49 deletions.
2 changes: 2 additions & 0 deletions server/accounts.go
Expand Up @@ -54,6 +54,7 @@ type Account struct {
Name string
Nkey string
Issuer string
TraceDest string
claimJWT string
updated time.Time
mu sync.RWMutex
Expand Down Expand Up @@ -260,6 +261,7 @@ func (a *Account) String() string {
func (a *Account) shallowCopy(na *Account) {
na.Nkey = a.Nkey
na.Issuer = a.Issuer
na.TraceDest = a.TraceDest

if a.imports.streams != nil {
na.imports.streams = make([]*streamImport, 0, len(a.imports.streams))
Expand Down
7 changes: 5 additions & 2 deletions server/accounts_test.go
Expand Up @@ -509,7 +509,8 @@ func TestAccountSimpleConfig(t *testing.T) {
}

func TestAccountParseConfig(t *testing.T) {
confFileName := createConfFile(t, []byte(`
traceDest := "my.trace.dest"
confFileName := createConfFile(t, []byte(fmt.Sprintf(`
accounts {
synadia {
users = [
Expand All @@ -518,13 +519,14 @@ func TestAccountParseConfig(t *testing.T) {
]
}
nats.io {
trace_dest: %q
users = [
{user: derek, password: foo}
{user: ivan, password: bar}
]
}
}
`))
`, traceDest)))
opts, err := ProcessConfigFile(confFileName)
if err != nil {
t.Fatalf("Received an error processing config file: %v", err)
Expand All @@ -548,6 +550,7 @@ func TestAccountParseConfig(t *testing.T) {
if natsAcc == nil {
t.Fatalf("Error retrieving account for 'nats.io'")
}
require_Equal[string](t, natsAcc.TraceDest, traceDest)

for _, u := range opts.Users {
if u.Username == "derek" {
Expand Down
8 changes: 4 additions & 4 deletions server/client.go
Expand Up @@ -4349,11 +4349,11 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
// We do so by setting the c.pa.trace to nil (it will be restored
// with c.pa = pacopy).
c.pa.trace = nil
// We also need to disable the trace destination header so that
// if message is routed, it does not initialize tracing in the
// We also need to disable the message trace headers so that
// if the message is routed, it does not initialize tracing in the
// remote.
pos := mt.disableTraceHeader(c, msg)
defer mt.enableTraceHeader(c, msg, pos)
positions := mt.disableTraceHeaders(c, msg)
defer mt.enableTraceHeaders(c, msg, positions)
}
}
}
Expand Down
22 changes: 22 additions & 0 deletions server/config_check_test.go
Expand Up @@ -913,6 +913,28 @@ func TestConfigCheck(t *testing.T) {
errorLine: 7,
errorPos: 25,
},
{
name: "when account trace destination is of the wrong type",
config: `
accounts {
A { trace_dest: 123 }
}
`,
err: errors.New(`interface conversion: interface {} is int64, not string`),
errorLine: 3,
errorPos: 23,
},
{
name: "when account trace destination is not a valid destination",
config: `
accounts {
A { trace_dest: "invalid..dest" }
}
`,
err: errors.New(`Trace destination "invalid..dest" is not valid`),
errorLine: 3,
errorPos: 23,
},
{
name: "when user authorization config has both token and users",
config: `
Expand Down
117 changes: 80 additions & 37 deletions server/msgtrace.go
Expand Up @@ -330,12 +330,25 @@ func (c *client) initMsgTrace() *msgTrace {
return nil
}
hdr := c.msgBuf[:c.pa.hdr]
var external bool
// Do not call c.parseState.getHeader() yet for performance reasons.
// We first do a "manual" search of the "send-to" destination's header.
// If not present, no need to lift the message headers.
td := getHeader(MsgTraceSendTo, hdr)
if len(td) <= 0 {
return nil
// If the NATS trace destination header is not present, look for
// the external trace header.
trcCtxHdrVal := getHeader(trcCtx, hdr)
if len(trcCtxHdrVal) <= 0 {
return nil
}
// See if we need to trigger the race based on the last token
// that should be set to "01".
tk := bytes.Split(trcCtxHdrVal, stringToBytes("-"))
if len(tk) != 4 || len([]byte(tk[3])) != 2 || !bytes.Equal(tk[3], stringToBytes("01")) {
return nil
}
external = true
}
// Now we know that this is a message that requested tracing, we
// will lift the headers since we also need to transfer them to
Expand All @@ -346,11 +359,14 @@ func (c *client) initMsgTrace() *msgTrace {
}
ct := getCompressionType(headers.Get(acceptEncodingHeader))
var traceOnly bool
if to := headers.Get(MsgTraceOnly); to != _EMPTY_ {
tos := strings.ToLower(to)
switch tos {
case "1", "true", "on":
traceOnly = true
// Check for traceOnly only if not external.
if !external {
if to := headers.Get(MsgTraceOnly); to != _EMPTY_ {
tos := strings.ToLower(to)
switch tos {
case "1", "true", "on":
traceOnly = true
}
}
}
var (
Expand Down Expand Up @@ -403,11 +419,26 @@ func (c *client) initMsgTrace() *msgTrace {
acc = c.acc
ian = acc.GetName()
}
// If external, we need to have the account's trace destination set,
// otherwise, we are not enabling tracing.
var dest string
if external {
if acc != nil {
acc.mu.RLock()
dest = acc.TraceDest
acc.mu.RUnlock()
}
if dest == _EMPTY_ {
return nil
}
} else {
dest = string(td)
}
c.pa.trace = &msgTrace{
srv: c.srv,
acc: acc,
oan: oan,
dest: string(td),
dest: dest,
ct: ct,
hop: hop,
event: &MsgTraceEvent{
Expand Down Expand Up @@ -496,49 +527,61 @@ func (t *msgTrace) setHopHeader(c *client, msg []byte) []byte {
return c.setHeader(MsgTraceHop, t.nhop, msg)
}

// Will look for the MsgTraceSendTo header and change the first character
// to an 'X' so that if this message is sent to a remote, the remote will
// not initialize tracing since it won't find the actual MsgTraceSendTo
// header. The function returns the position of the header so it can
// efficiently be re-enabled by calling enableTraceHeader.
// Will look for the MsgTraceSendTo and trcCtx headers and change the first
// character to an 'X' so that if this message is sent to a remote, the remote
// will not initialize tracing since it won't find the actual trace headers.
// The function returns the position of the headers so it can efficiently be
// re-enabled by calling enableTraceHeaders.
// Note that if `msg` can be either the header alone or the full message
// (header and payload). This function will use c.pa.hdr to limit the
// search to the header section alone.
func (t *msgTrace) disableTraceHeader(c *client, msg []byte) int {
func (t *msgTrace) disableTraceHeaders(c *client, msg []byte) []int {
// Code largely copied from getHeader(), except that we don't need the value
if c.pa.hdr <= 0 {
return -1
return []int{-1, -1}
}
hdr := msg[:c.pa.hdr]
key := stringToBytes(MsgTraceSendTo)
pos := bytes.Index(hdr, key)
if pos < 0 {
return -1
}
// Make sure this key does not have additional prefix.
if pos < 2 || hdr[pos-1] != '\n' || hdr[pos-2] != '\r' {
return -1
}
index := pos + len(key)
if index >= len(hdr) {
return -1
}
if hdr[index] != ':' {
return -1
headers := [2]string{MsgTraceSendTo, trcCtx}
positions := [2]int{}
for i := 0; i < 2; i++ {
key := stringToBytes(headers[i])
pos := bytes.Index(hdr, key)
if pos < 0 {
positions[i] = -1
continue
}
// Make sure this key does not have additional prefix.
if pos < 2 || hdr[pos-1] != '\n' || hdr[pos-2] != '\r' {
positions[i] = -1
continue
}
index := pos + len(key)
if index >= len(hdr) {
positions[i] = -1
continue
}
if hdr[index] != ':' {
positions[i] = -1
continue
}
// Disable the trace by altering the first character of the header
hdr[pos] = 'X'
positions[i] = pos
}
// Disable the trace by altering the first character of the header
hdr[pos] = 'X'
// Return the position of that character so we can re-enable it.
return pos
// Return the positions of those characters so we can re-enable the headers.
return positions[:2]
}

// Changes back the character at the given position `pos` in the `msg`
// byte slice to the first character of the MsgTraceSendTo header.
func (t *msgTrace) enableTraceHeader(c *client, msg []byte, pos int) {
if pos <= 0 {
return
func (t *msgTrace) enableTraceHeaders(c *client, msg []byte, positions []int) {
headers := [2]string{MsgTraceSendTo, trcCtx}
for i, pos := range positions {
if pos == -1 {
continue
}
msg[pos] = headers[i][0]
}
msg[pos] = MsgTraceSendTo[0]
}

func (t *msgTrace) setIngressError(err string) {
Expand Down

0 comments on commit d58aeae

Please sign in to comment.