From e092105a73130beeb5b76ec7403be6107c48c80a Mon Sep 17 00:00:00 2001 From: Daniel Redondo Date: Mon, 24 Feb 2020 17:34:51 +0100 Subject: [PATCH 1/7] changes --- agent/recorder.go | 179 +++++++++++++++++++++++++++------------------- 1 file changed, 104 insertions(+), 75 deletions(-) diff --git a/agent/recorder.go b/agent/recorder.go index 5d529375..a91406df 100644 --- a/agent/recorder.go +++ b/agent/recorder.go @@ -38,7 +38,8 @@ type ( debugMode bool metadata map[string]interface{} - spans []tracer.RawSpan + payloadSpans []PayloadSpan + payloadEvents []PayloadEvent flushFrequency time.Duration url string @@ -62,6 +63,9 @@ type ( testSpansNotSent int64 testSpansRejected int64 } + + PayloadSpan map[string]interface{} + PayloadEvent map[string]interface{} ) func NewSpanRecorder(agent *Agent) *SpanRecorder { @@ -94,7 +98,7 @@ func (r *SpanRecorder) RecordSpan(span tracer.RawSpan) { r.logger.Printf("a span has been received but the recorder is not running") return } - r.addSpan(span) + r.addPayloadComponents(span) } func (r *SpanRecorder) loop() error { @@ -103,10 +107,10 @@ func (r *SpanRecorder) loop() error { for { select { case <-ticker.C: - hasSpans := r.hasSpans() - if hasSpans || time.Now().Sub(cTime) >= r.getFlushFrequency() { + hasPayloadData := r.hasPayloadData() + if hasPayloadData || time.Now().Sub(cTime) >= r.getFlushFrequency() { if r.debugMode { - if hasSpans { + if hasPayloadData { r.logger.Println("Ticker: Sending by buffer") } else { r.logger.Println("Ticker: Sending by time") @@ -135,26 +139,21 @@ func (r *SpanRecorder) loop() error { // Sends the spans in the buffer to Scope func (r *SpanRecorder) sendSpans() (error, bool) { atomic.AddInt64(&r.stats.sendSpansCalls, 1) - spans := r.popSpans() - const batchSize = 1000 - batchLength := len(spans) / batchSize - - r.logger.Printf("sending %d spans in %d batches", len(spans), batchLength+1) - var lastError error - for b := 0; b <= batchLength; b++ { - var batch []tracer.RawSpan - // We extract the batch of spans to be send - if b == batchLength { - // If we are in the last batch, we select the remaining spans - batch = spans[b*batchSize:] - } else { - batch = spans[b*batchSize : ((b + 1) * batchSize)] + for { + spans, spMore := r.takePayloadSpan(batchSize) + events, evMore := r.takePayloadEvents(batchSize) + if len(spans) == 0 && len(events) == 0 { + break } - payload := r.getPayload(batch) - + payload := map[string]interface{}{ + "metadata": r.metadata, + "spans": spans, + "events": events, + tags.AgentID: r.agentId, + } buf, err := encodePayload(payload) if err != nil { atomic.AddInt64(&r.stats.sendSpansKo, 1) @@ -163,15 +162,13 @@ func (r *SpanRecorder) sendSpans() (error, bool) { } var testSpans int64 - for _, span := range batch { - if isTestSpan(span) { + for _, span := range spans { + if isTestPayloadSpan(span) { testSpans++ } } - if batchLength > 0 { - r.logger.Printf("sending batch %d with %d spans", b+1, len(batch)) - } + r.logger.Printf("sending %d spans with %d events", len(spans), len(events)) statusCode, err := r.callIngest(buf) if err != nil { atomic.AddInt64(&r.stats.sendSpansKo, 1) @@ -186,6 +183,10 @@ func (r *SpanRecorder) sendSpans() (error, bool) { return err, true } lastError = err + + if !spMore && !evMore { + break + } } return lastError, false } @@ -302,53 +303,45 @@ func (r *SpanRecorder) callIngest(payload *bytes.Buffer) (statusCode int, err er return statusCode, lastError } -// Combines `rawSpans` and `metadata` into a payload that the Scope backend can process -func (r *SpanRecorder) getPayload(rawSpans []tracer.RawSpan) map[string]interface{} { - spans := []map[string]interface{}{} - events := []map[string]interface{}{} - for _, span := range rawSpans { - var parentSpanID string - if span.ParentSpanID != 0 { - parentSpanID = fmt.Sprintf("%x", span.ParentSpanID) +// Get payload components +func (r *SpanRecorder) getPayloadComponents(span tracer.RawSpan) (PayloadSpan, []PayloadEvent) { + events := make([]PayloadEvent, 0) + var parentSpanID string + if span.ParentSpanID != 0 { + parentSpanID = fmt.Sprintf("%x", span.ParentSpanID) + } + payloadSpan := PayloadSpan{ + "context": map[string]interface{}{ + "trace_id": fmt.Sprintf("%x", span.Context.TraceID), + "span_id": fmt.Sprintf("%x", span.Context.SpanID), + "baggage": span.Context.Baggage, + }, + "parent_span_id": parentSpanID, + "operation": span.Operation, + "start": r.applyNTPOffset(span.Start).Format(time.RFC3339Nano), + "duration": span.Duration.Nanoseconds(), + "tags": span.Tags, + } + for _, event := range span.Logs { + var fields = make(map[string]interface{}) + for _, field := range event.Fields { + fields[field.Key()] = field.Value() + } + eventId, err := uuid.NewRandom() + if err != nil { + panic(err) } - spans = append(spans, map[string]interface{}{ + events = append(events, PayloadEvent{ "context": map[string]interface{}{ "trace_id": fmt.Sprintf("%x", span.Context.TraceID), "span_id": fmt.Sprintf("%x", span.Context.SpanID), - "baggage": span.Context.Baggage, + "event_id": eventId.String(), }, - "parent_span_id": parentSpanID, - "operation": span.Operation, - "start": r.applyNTPOffset(span.Start).Format(time.RFC3339Nano), - "duration": span.Duration.Nanoseconds(), - "tags": span.Tags, + "timestamp": r.applyNTPOffset(event.Timestamp).Format(time.RFC3339Nano), + "fields": fields, }) - for _, event := range span.Logs { - var fields = make(map[string]interface{}) - for _, field := range event.Fields { - fields[field.Key()] = field.Value() - } - eventId, err := uuid.NewRandom() - if err != nil { - panic(err) - } - events = append(events, map[string]interface{}{ - "context": map[string]interface{}{ - "trace_id": fmt.Sprintf("%x", span.Context.TraceID), - "span_id": fmt.Sprintf("%x", span.Context.SpanID), - "event_id": eventId.String(), - }, - "timestamp": r.applyNTPOffset(event.Timestamp).Format(time.RFC3339Nano), - "fields": fields, - }) - } - } - return map[string]interface{}{ - "metadata": r.metadata, - "spans": spans, - "events": events, - tags.AgentID: r.agentId, } + return payloadSpan, events } // Encodes `payload` using msgpack and compress it with gzip @@ -379,26 +372,58 @@ func (r *SpanRecorder) getFlushFrequency() time.Duration { } // Gets if there any span available to be send -func (r *SpanRecorder) hasSpans() bool { +func (r *SpanRecorder) hasPayloadData() bool { r.RLock() defer r.RUnlock() - return len(r.spans) > 0 + return len(r.payloadSpans) > 0 || len(r.payloadEvents) > 0 } -// Gets the spans to be send and clears the buffer -func (r *SpanRecorder) popSpans() []tracer.RawSpan { +// Take a number of payload spans from buffer +func (r *SpanRecorder) takePayloadSpan(count int) ([]PayloadSpan, bool) { r.Lock() defer r.Unlock() - spans := r.spans - r.spans = nil - return spans + var spans []PayloadSpan + length := len(r.payloadSpans) + if length <= count || count == -1 { + spans = r.payloadSpans + if spans == nil { + spans = make([]PayloadSpan, 0) + } + r.payloadSpans = make([]PayloadSpan, 0) + return spans, false + } + spans = r.payloadSpans[:count] + r.payloadSpans = r.payloadSpans[count:] + return spans, true +} + +// Take a number of payload events from buffer +func (r *SpanRecorder) takePayloadEvents(count int) ([]PayloadEvent, bool) { + r.Lock() + defer r.Unlock() + var events []PayloadEvent + length := len(r.payloadEvents) + if length <= count || count == -1 { + events = r.payloadEvents + if events == nil { + events = make([]PayloadEvent, 0) + } + r.payloadEvents = make([]PayloadEvent, 0) + return events, false + } + events = r.payloadEvents[:count] + r.payloadEvents = r.payloadEvents[count:] + return events, true } // Adds a span to the buffer -func (r *SpanRecorder) addSpan(span tracer.RawSpan) { +func (r *SpanRecorder) addPayloadComponents(span tracer.RawSpan) { r.Lock() defer r.Unlock() - r.spans = append(r.spans, span) + payloadSpan, payloadEvents := r.getPayloadComponents(span) + r.payloadSpans = append(r.payloadSpans, payloadSpan) + r.payloadEvents = append(r.payloadEvents, payloadEvents...) + atomic.AddInt64(&r.stats.totalSpans, 1) if isTestSpan(span) { atomic.AddInt64(&r.stats.totalTestSpans, 1) @@ -408,3 +433,7 @@ func (r *SpanRecorder) addSpan(span tracer.RawSpan) { func isTestSpan(span tracer.RawSpan) bool { return span.Tags["span.kind"] == "test" } + +func isTestPayloadSpan(span PayloadSpan) bool { + return span["span.kind"] == "test" +} From ffe27279e1607f70d5157e76424dd532a513436c Mon Sep 17 00:00:00 2001 From: Daniel Redondo Date: Mon, 24 Feb 2020 23:46:04 +0100 Subject: [PATCH 2/7] changes --- agent/recorder.go | 21 ++++++++++++--------- go.mod | 3 +++ 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/agent/recorder.go b/agent/recorder.go index a91406df..4e01490c 100644 --- a/agent/recorder.go +++ b/agent/recorder.go @@ -102,6 +102,9 @@ func (r *SpanRecorder) RecordSpan(span tracer.RawSpan) { } func (r *SpanRecorder) loop() error { + defer func() { + r.logger.Println("recorder has been stopped.") + }() ticker := time.NewTicker(1 * time.Second) cTime := time.Now() for { @@ -142,8 +145,8 @@ func (r *SpanRecorder) sendSpans() (error, bool) { const batchSize = 1000 var lastError error for { - spans, spMore := r.takePayloadSpan(batchSize) - events, evMore := r.takePayloadEvents(batchSize) + spans, spMore, spTotal := r.takePayloadSpan(batchSize) + events, evMore, evTotal := r.takePayloadEvents(batchSize) if len(spans) == 0 && len(events) == 0 { break } @@ -168,7 +171,7 @@ func (r *SpanRecorder) sendSpans() (error, bool) { } } - r.logger.Printf("sending %d spans with %d events", len(spans), len(events)) + r.logger.Printf("sending %d/%d spans with %d/%d events", len(spans), spTotal, len(events), evTotal) statusCode, err := r.callIngest(buf) if err != nil { atomic.AddInt64(&r.stats.sendSpansKo, 1) @@ -379,7 +382,7 @@ func (r *SpanRecorder) hasPayloadData() bool { } // Take a number of payload spans from buffer -func (r *SpanRecorder) takePayloadSpan(count int) ([]PayloadSpan, bool) { +func (r *SpanRecorder) takePayloadSpan(count int) ([]PayloadSpan, bool, int) { r.Lock() defer r.Unlock() var spans []PayloadSpan @@ -390,15 +393,15 @@ func (r *SpanRecorder) takePayloadSpan(count int) ([]PayloadSpan, bool) { spans = make([]PayloadSpan, 0) } r.payloadSpans = make([]PayloadSpan, 0) - return spans, false + return spans, false, length } spans = r.payloadSpans[:count] r.payloadSpans = r.payloadSpans[count:] - return spans, true + return spans, true, length } // Take a number of payload events from buffer -func (r *SpanRecorder) takePayloadEvents(count int) ([]PayloadEvent, bool) { +func (r *SpanRecorder) takePayloadEvents(count int) ([]PayloadEvent, bool, int) { r.Lock() defer r.Unlock() var events []PayloadEvent @@ -409,11 +412,11 @@ func (r *SpanRecorder) takePayloadEvents(count int) ([]PayloadEvent, bool) { events = make([]PayloadEvent, 0) } r.payloadEvents = make([]PayloadEvent, 0) - return events, false + return events, false, length } events = r.payloadEvents[:count] r.payloadEvents = r.payloadEvents[count:] - return events, true + return events, true, length } // Adds a span to the buffer diff --git a/go.mod b/go.mod index 4e5c18b3..62141a05 100644 --- a/go.mod +++ b/go.mod @@ -15,10 +15,13 @@ require ( github.com/stretchr/testify v1.5.1 github.com/undefinedlabs/go-mpatch v0.0.0-20200122175732-0044123dbb98 github.com/vmihailenco/msgpack v4.0.4+incompatible + golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3 // indirect golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa golang.org/x/sys v0.0.0-20200122134326-e047566fdf82 // indirect + golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135 // indirect google.golang.org/appengine v1.6.5 // indirect google.golang.org/grpc v1.27.1 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 + honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc // indirect ) From 00b4f80732fe5d76cef53c8c2d6fc561adb1b836 Mon Sep 17 00:00:00 2001 From: Daniel Redondo Date: Mon, 24 Feb 2020 23:47:23 +0100 Subject: [PATCH 3/7] fix --- go.mod | 3 --- 1 file changed, 3 deletions(-) diff --git a/go.mod b/go.mod index 62141a05..4e5c18b3 100644 --- a/go.mod +++ b/go.mod @@ -15,13 +15,10 @@ require ( github.com/stretchr/testify v1.5.1 github.com/undefinedlabs/go-mpatch v0.0.0-20200122175732-0044123dbb98 github.com/vmihailenco/msgpack v4.0.4+incompatible - golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3 // indirect golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa golang.org/x/sys v0.0.0-20200122134326-e047566fdf82 // indirect - golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135 // indirect google.golang.org/appengine v1.6.5 // indirect google.golang.org/grpc v1.27.1 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 - honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc // indirect ) From 062ac0fb64001432669b578516c9edae97ed82c6 Mon Sep 17 00:00:00 2001 From: Daniel Redondo Date: Tue, 25 Feb 2020 10:23:20 +0100 Subject: [PATCH 4/7] changes --- agent/recorder.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/agent/recorder.go b/agent/recorder.go index 4e01490c..cb0beb46 100644 --- a/agent/recorder.go +++ b/agent/recorder.go @@ -392,7 +392,7 @@ func (r *SpanRecorder) takePayloadSpan(count int) ([]PayloadSpan, bool, int) { if spans == nil { spans = make([]PayloadSpan, 0) } - r.payloadSpans = make([]PayloadSpan, 0) + r.payloadSpans = nil return spans, false, length } spans = r.payloadSpans[:count] @@ -411,7 +411,7 @@ func (r *SpanRecorder) takePayloadEvents(count int) ([]PayloadEvent, bool, int) if events == nil { events = make([]PayloadEvent, 0) } - r.payloadEvents = make([]PayloadEvent, 0) + r.payloadEvents = nil return events, false, length } events = r.payloadEvents[:count] From ba72276c695941ae437660899677a1be7f991fb2 Mon Sep 17 00:00:00 2001 From: Daniel Redondo Date: Tue, 25 Feb 2020 11:52:53 +0100 Subject: [PATCH 5/7] fix healthcheck --- agent/recorder.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/agent/recorder.go b/agent/recorder.go index cb0beb46..9cdee17b 100644 --- a/agent/recorder.go +++ b/agent/recorder.go @@ -147,9 +147,6 @@ func (r *SpanRecorder) sendSpans() (error, bool) { for { spans, spMore, spTotal := r.takePayloadSpan(batchSize) events, evMore, evTotal := r.takePayloadEvents(batchSize) - if len(spans) == 0 && len(events) == 0 { - break - } payload := map[string]interface{}{ "metadata": r.metadata, From 8fa7948e6329da5b37c039f9bf01f163e19e9260 Mon Sep 17 00:00:00 2001 From: Daniel Redondo Date: Wed, 11 Mar 2020 12:28:50 +0100 Subject: [PATCH 6/7] changes based in the review --- agent/recorder.go | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/agent/recorder.go b/agent/recorder.go index 9cdee17b..bf631e93 100644 --- a/agent/recorder.go +++ b/agent/recorder.go @@ -91,14 +91,14 @@ func (r *SpanRecorder) RecordSpan(span tracer.RawSpan) { if !r.t.Alive() { atomic.AddInt64(&r.stats.totalSpans, 1) atomic.AddInt64(&r.stats.spansRejected, 1) - if isTestSpan(span) { + if isTestSpan(span.Tags) { atomic.AddInt64(&r.stats.totalTestSpans, 1) atomic.AddInt64(&r.stats.testSpansRejected, 1) } r.logger.Printf("a span has been received but the recorder is not running") return } - r.addPayloadComponents(span) + r.addSpan(span) } func (r *SpanRecorder) loop() error { @@ -163,7 +163,7 @@ func (r *SpanRecorder) sendSpans() (error, bool) { var testSpans int64 for _, span := range spans { - if isTestPayloadSpan(span) { + if isTestSpan(span) { testSpans++ } } @@ -417,7 +417,7 @@ func (r *SpanRecorder) takePayloadEvents(count int) ([]PayloadEvent, bool, int) } // Adds a span to the buffer -func (r *SpanRecorder) addPayloadComponents(span tracer.RawSpan) { +func (r *SpanRecorder) addSpan(span tracer.RawSpan) { r.Lock() defer r.Unlock() payloadSpan, payloadEvents := r.getPayloadComponents(span) @@ -425,15 +425,11 @@ func (r *SpanRecorder) addPayloadComponents(span tracer.RawSpan) { r.payloadEvents = append(r.payloadEvents, payloadEvents...) atomic.AddInt64(&r.stats.totalSpans, 1) - if isTestSpan(span) { + if isTestSpan(span.Tags) { atomic.AddInt64(&r.stats.totalTestSpans, 1) } } -func isTestSpan(span tracer.RawSpan) bool { - return span.Tags["span.kind"] == "test" -} - -func isTestPayloadSpan(span PayloadSpan) bool { - return span["span.kind"] == "test" +func isTestSpan(tags map[string]interface{}) bool { + return tags["span.kind"] == "test" } From 8c1dcc84ea8c38e309de2b691b290b87125021b6 Mon Sep 17 00:00:00 2001 From: Daniel Redondo Date: Wed, 11 Mar 2020 14:52:12 +0100 Subject: [PATCH 7/7] changes based in the review --- agent/recorder.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/agent/recorder.go b/agent/recorder.go index bf631e93..2cdcbce0 100644 --- a/agent/recorder.go +++ b/agent/recorder.go @@ -145,8 +145,8 @@ func (r *SpanRecorder) sendSpans() (error, bool) { const batchSize = 1000 var lastError error for { - spans, spMore, spTotal := r.takePayloadSpan(batchSize) - events, evMore, evTotal := r.takePayloadEvents(batchSize) + spans, spMore, spTotal := r.popPayloadSpan(batchSize) + events, evMore, evTotal := r.popPayloadEvents(batchSize) payload := map[string]interface{}{ "metadata": r.metadata, @@ -378,8 +378,8 @@ func (r *SpanRecorder) hasPayloadData() bool { return len(r.payloadSpans) > 0 || len(r.payloadEvents) > 0 } -// Take a number of payload spans from buffer -func (r *SpanRecorder) takePayloadSpan(count int) ([]PayloadSpan, bool, int) { +// Gets a number of payload spans from buffer +func (r *SpanRecorder) popPayloadSpan(count int) ([]PayloadSpan, bool, int) { r.Lock() defer r.Unlock() var spans []PayloadSpan @@ -397,8 +397,8 @@ func (r *SpanRecorder) takePayloadSpan(count int) ([]PayloadSpan, bool, int) { return spans, true, length } -// Take a number of payload events from buffer -func (r *SpanRecorder) takePayloadEvents(count int) ([]PayloadEvent, bool, int) { +// Gets a number of payload events from buffer +func (r *SpanRecorder) popPayloadEvents(count int) ([]PayloadEvent, bool, int) { r.Lock() defer r.Unlock() var events []PayloadEvent