Skip to content

Commit

Permalink
Parse Trace ID From Telemetry Payloads (#113)
Browse files Browse the repository at this point in the history
* Parse Trace ID From Telemetry Payloads

* Wiring up trace ID log decoration
  • Loading branch information
kolanos committed May 12, 2022
1 parent db36850 commit dd3683d
Show file tree
Hide file tree
Showing 9 changed files with 235 additions and 16 deletions.
5 changes: 3 additions & 2 deletions checks/runtime_check.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package checks

import (
"errors"
"io/ioutil"
"net/http"
"path/filepath"
Expand Down Expand Up @@ -66,7 +65,9 @@ func latestAgentTag(r *runtimeConfig) error {

rs := re.FindStringSubmatch(string(body))
if len(rs) != 2 {
return errors.New("Can't determine latest agent version.")
// FIXME: This is way too brittle, should use https://api.github.com/repos/
util.Debugf("Can't determine latest agent version: %v", rs)
return nil
}

r.AgentVersion = rs[1]
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func main() {
}

// Init the telemetry sending client
telemetryClient := telemetry.New(registrationResponse.FunctionName, licenseKey, conf.TelemetryEndpoint, conf.LogEndpoint)
telemetryClient := telemetry.New(registrationResponse.FunctionName, licenseKey, conf.TelemetryEndpoint, conf.LogEndpoint, batch)
telemetryChan, err := telemetry.InitTelemetryChannel()
if err != nil {
err2 := invocationClient.InitError(ctx, "telemetryClient.init", err)
Expand Down
18 changes: 18 additions & 0 deletions telemetry/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ func (b *Batch) AddInvocation(requestId string, start time.Time) {
func (b *Batch) AddTelemetry(requestId string, telemetry []byte) *Invocation {
inv, ok := b.invocations[requestId]
if ok {
traceId, err := ExtractTraceID(telemetry)
if err != nil {
util.Debugln(err)
}
// We don't want to unset a previously set trace ID
if traceId != "" {
inv.TraceId = traceId
}
inv.Telemetry = append(inv.Telemetry, telemetry)
if b.eldest.Equal(epochStart) {
b.eldest = inv.Start
Expand Down Expand Up @@ -115,6 +123,7 @@ func (b *Batch) ripeHarvest(now time.Time) []*Invocation {
type Invocation struct {
Start time.Time
RequestId string
TraceId string
Telemetry [][]byte
}

Expand All @@ -136,3 +145,12 @@ func (inv *Invocation) IsRipe() bool {
func (inv *Invocation) IsEmpty() bool {
return len(inv.Telemetry) == 0
}

// RetrieveTraceID looks up a trace ID using the provided request ID
func (b *Batch) RetrieveTraceID(requestId string) string {
inv, ok := b.invocations[requestId]
if ok {
return inv.TraceId
}
return ""
}
17 changes: 13 additions & 4 deletions telemetry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,20 @@ type Client struct {
telemetryEndpoint string
logEndpoint string
functionName string
batch *Batch
}

// New creates a telemetry client with sensible defaults
func New(functionName string, licenseKey string, telemetryEndpointOverride string, logEndpointOverride string) *Client {
func New(functionName string, licenseKey string, telemetryEndpointOverride string, logEndpointOverride string, batch *Batch) *Client {
httpClient := &http.Client{
Timeout: time.Second * 2,
}

return NewWithHTTPClient(httpClient, functionName, licenseKey, telemetryEndpointOverride, logEndpointOverride)
return NewWithHTTPClient(httpClient, functionName, licenseKey, telemetryEndpointOverride, logEndpointOverride, batch)
}

// NewWithHTTPClient is just like New, but the HTTP client can be overridden
func NewWithHTTPClient(httpClient *http.Client, functionName string, licenseKey string, telemetryEndpointOverride string, logEndpointOverride string) *Client {
func NewWithHTTPClient(httpClient *http.Client, functionName string, licenseKey string, telemetryEndpointOverride string, logEndpointOverride string, batch *Batch) *Client {
telemetryEndpoint := getInfraEndpointURL(licenseKey, telemetryEndpointOverride)
logEndpoint := getLogEndpointURL(licenseKey, logEndpointOverride)
return &Client{
Expand All @@ -50,6 +51,7 @@ func NewWithHTTPClient(httpClient *http.Client, functionName string, licenseKey
telemetryEndpoint: telemetryEndpoint,
logEndpoint: logEndpoint,
functionName: functionName,
batch: batch,
}
}

Expand Down Expand Up @@ -192,7 +194,14 @@ func (c *Client) SendFunctionLogs(ctx context.Context, lines []logserver.LogLine
for _, l := range lines {
// Unix time in ms
ts := l.Time.UnixNano() / 1e6
logMessages = append(logMessages, NewFunctionLogMessage(ts, l.RequestID, string(l.Content)))
var traceId string
if c.batch != nil {
// There is a race condition here. Telemetry batch may be late, so the trace
// ID would be blank. This would require a lock to handle, which would delay
// logs being sent. Not sure if worth the performance hit yet.
traceId = c.batch.RetrieveTraceID(l.RequestID)
}
logMessages = append(logMessages, NewFunctionLogMessage(ts, l.RequestID, traceId, string(l.Content)))
util.Debugf("Sending function logs for request %s", l.RequestID)
}
// The Log API expects an array
Expand Down
10 changes: 5 additions & 5 deletions telemetry/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestClientSend(t *testing.T) {

defer srv.Close()

client := NewWithHTTPClient(srv.Client(), "", "a mock license key", srv.URL, srv.URL)
client := NewWithHTTPClient(srv.Client(), "", "a mock license key", srv.URL, srv.URL, &Batch{})

ctx := context.Background()
bytes := []byte("foobar")
Expand All @@ -51,7 +51,7 @@ func TestClientSend(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 1, successCount)

client = New("", "mock license key", srv.URL, srv.URL)
client = New("", "mock license key", srv.URL, srv.URL, &Batch{})
assert.NotNil(t, client)
}

Expand Down Expand Up @@ -93,7 +93,7 @@ func TestClientSendRetry(t *testing.T) {

httpClient := srv.Client()
httpClient.Timeout = 200 * time.Millisecond
client := NewWithHTTPClient(httpClient, "", "a mock license key", srv.URL, srv.URL)
client := NewWithHTTPClient(httpClient, "", "a mock license key", srv.URL, srv.URL, &Batch{})

ctx := context.Background()
bytes := []byte("foobar")
Expand All @@ -115,7 +115,7 @@ func TestClientSendOutOfRetries(t *testing.T) {

httpClient := srv.Client()
httpClient.Timeout = 200 * time.Millisecond
client := NewWithHTTPClient(httpClient, "", "a mock license key", srv.URL, srv.URL)
client := NewWithHTTPClient(httpClient, "", "a mock license key", srv.URL, srv.URL, &Batch{})

ctx := context.Background()
bytes := []byte("foobar")
Expand All @@ -131,7 +131,7 @@ func TestClientUnreachableEndpoint(t *testing.T) {
Timeout: time.Millisecond * 1,
}

client := NewWithHTTPClient(httpClient, "", "a mock license key", "http://10.123.123.123:12345", "http://10.123.123.123:12345")
client := NewWithHTTPClient(httpClient, "", "a mock license key", "http://10.123.123.123:12345", "http://10.123.123.123:12345", &Batch{})

ctx := context.Background()
bytes := []byte("foobar")
Expand Down
116 changes: 116 additions & 0 deletions telemetry/payload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package telemetry

import (
"bytes"
"compress/gzip"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"strings"
)

func parsePayload(data []byte) (metadata, uncompressedData map[string]json.RawMessage, err error) {
var arr [4]json.RawMessage

if err = json.Unmarshal(data, &arr); nil != err {
err = fmt.Errorf("unable to unmarshal payload data array: %v", err)
return
}

var dataJSON []byte
compressed := strings.Trim(string(arr[3]), `"`)

if dataJSON, err = decodeUncompress(compressed); nil != err {
err = fmt.Errorf("unable to uncompress payload: %v", err)
return
}

if err = json.Unmarshal(dataJSON, &uncompressedData); nil != err {
err = fmt.Errorf("unable to unmarshal uncompressed payload: %v", err)
return
}

if err = json.Unmarshal(arr[2], &metadata); nil != err {
err = fmt.Errorf("unable to unmarshal payload metadata: %v", err)
return
}

return
}

func decodeUncompress(input string) ([]byte, error) {
decoded, err := base64.StdEncoding.DecodeString(input)
if nil != err {
return nil, err
}

buf := bytes.NewBuffer(decoded)
gz, err := gzip.NewReader(buf)
if nil != err {
return nil, err
}

var out bytes.Buffer
io.Copy(&out, gz)
gz.Close()

return out.Bytes(), nil
}

// ExtracTraceID extracts the trace ID within a payload, if present
func ExtractTraceID(data []byte) (string, error) {
if !bytes.Contains(data, []byte("NR_LAMBDA_MONITORING")) {
return "", nil
}

_, segments, err := parsePayload(data)
if err != nil {
return "", err
}

analyticEvents, ok := segments["analytic_event_data"]
if ok {
var parsedAnalyticEvents []json.RawMessage
if err := json.Unmarshal(analyticEvents, &parsedAnalyticEvents); err != nil {
return "", err
}

if len(parsedAnalyticEvents) == 3 {
var analyticEvent [][]struct {
TraceID string `json:"traceId"`
}
if err := json.Unmarshal(parsedAnalyticEvents[2], &analyticEvent); err != nil {
return "", err
}
if len(analyticEvent) > 0 && len(analyticEvent[0]) > 0 {
return analyticEvent[0][0].TraceID, nil
}
}
}

spanEvents, ok := segments["span_event_data"]
if ok {
var parsedSpanEvents []json.RawMessage
if err := json.Unmarshal(spanEvents, &parsedSpanEvents); err != nil {
return "", err
}

if len(parsedSpanEvents) == 3 {
var spanEvent [][]struct {
TraceID string `json:"traceId"`
}

if err := json.Unmarshal(parsedSpanEvents[2], &spanEvent); err != nil {
return "", err
}

if len(spanEvent) > 0 && len(spanEvent[0]) > 0 {
return spanEvent[0][0].TraceID, nil
}
}
}

return "", errors.New("No trace ID found in payload")
}

0 comments on commit dd3683d

Please sign in to comment.