Skip to content
This repository was archived by the owner on Aug 17, 2020. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 102 additions & 77 deletions agent/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ type (
debugMode bool
metadata map[string]interface{}

spans []tracer.RawSpan
payloadSpans []PayloadSpan
payloadEvents []PayloadEvent

flushFrequency time.Duration
url string
Expand All @@ -62,6 +63,9 @@ type (
testSpansNotSent int64
testSpansRejected int64
}

PayloadSpan map[string]interface{}
PayloadEvent map[string]interface{}
)

func NewSpanRecorder(agent *Agent) *SpanRecorder {
Expand All @@ -87,7 +91,7 @@ func (r *SpanRecorder) RecordSpan(span tracer.RawSpan) {
if !r.t.Alive() {
atomic.AddInt64(&r.stats.totalSpans, 1)
atomic.AddInt64(&r.stats.spansRejected, 1)
if isTestSpan(span) {
if isTestSpan(span.Tags) {
atomic.AddInt64(&r.stats.totalTestSpans, 1)
atomic.AddInt64(&r.stats.testSpansRejected, 1)
}
Expand All @@ -98,15 +102,18 @@ func (r *SpanRecorder) RecordSpan(span tracer.RawSpan) {
}

func (r *SpanRecorder) loop() error {
defer func() {
r.logger.Println("recorder has been stopped.")
}()
ticker := time.NewTicker(1 * time.Second)
cTime := time.Now()
for {
select {
case <-ticker.C:
hasSpans := r.hasSpans()
if hasSpans || time.Now().Sub(cTime) >= r.getFlushFrequency() {
hasPayloadData := r.hasPayloadData()
if hasPayloadData || time.Now().Sub(cTime) >= r.getFlushFrequency() {
if r.debugMode {
if hasSpans {
if hasPayloadData {
r.logger.Println("Ticker: Sending by buffer")
} else {
r.logger.Println("Ticker: Sending by time")
Expand Down Expand Up @@ -135,26 +142,18 @@ func (r *SpanRecorder) loop() error {
// Sends the spans in the buffer to Scope
func (r *SpanRecorder) sendSpans() (error, bool) {
atomic.AddInt64(&r.stats.sendSpansCalls, 1)
spans := r.popSpans()

const batchSize = 1000
batchLength := len(spans) / batchSize

r.logger.Printf("sending %d spans in %d batches", len(spans), batchLength+1)

var lastError error
for b := 0; b <= batchLength; b++ {
var batch []tracer.RawSpan
// We extract the batch of spans to be send
if b == batchLength {
// If we are in the last batch, we select the remaining spans
batch = spans[b*batchSize:]
} else {
batch = spans[b*batchSize : ((b + 1) * batchSize)]
for {
spans, spMore, spTotal := r.popPayloadSpan(batchSize)
events, evMore, evTotal := r.popPayloadEvents(batchSize)

payload := map[string]interface{}{
"metadata": r.metadata,
"spans": spans,
"events": events,
tags.AgentID: r.agentId,
}

payload := r.getPayload(batch)

buf, err := encodePayload(payload)
if err != nil {
atomic.AddInt64(&r.stats.sendSpansKo, 1)
Expand All @@ -163,15 +162,13 @@ func (r *SpanRecorder) sendSpans() (error, bool) {
}

var testSpans int64
for _, span := range batch {
for _, span := range spans {
if isTestSpan(span) {
testSpans++
}
}

if batchLength > 0 {
r.logger.Printf("sending batch %d with %d spans", b+1, len(batch))
}
r.logger.Printf("sending %d/%d spans with %d/%d events", len(spans), spTotal, len(events), evTotal)
statusCode, err := r.callIngest(buf)
if err != nil {
atomic.AddInt64(&r.stats.sendSpansKo, 1)
Expand All @@ -186,6 +183,10 @@ func (r *SpanRecorder) sendSpans() (error, bool) {
return err, true
}
lastError = err

if !spMore && !evMore {
break
}
}
return lastError, false
}
Expand Down Expand Up @@ -302,53 +303,45 @@ func (r *SpanRecorder) callIngest(payload *bytes.Buffer) (statusCode int, err er
return statusCode, lastError
}

// Combines `rawSpans` and `metadata` into a payload that the Scope backend can process
func (r *SpanRecorder) getPayload(rawSpans []tracer.RawSpan) map[string]interface{} {
spans := []map[string]interface{}{}
events := []map[string]interface{}{}
for _, span := range rawSpans {
var parentSpanID string
if span.ParentSpanID != 0 {
parentSpanID = fmt.Sprintf("%x", span.ParentSpanID)
// Get payload components
func (r *SpanRecorder) getPayloadComponents(span tracer.RawSpan) (PayloadSpan, []PayloadEvent) {
events := make([]PayloadEvent, 0)
var parentSpanID string
if span.ParentSpanID != 0 {
parentSpanID = fmt.Sprintf("%x", span.ParentSpanID)
}
payloadSpan := PayloadSpan{
"context": map[string]interface{}{
"trace_id": fmt.Sprintf("%x", span.Context.TraceID),
"span_id": fmt.Sprintf("%x", span.Context.SpanID),
"baggage": span.Context.Baggage,
},
"parent_span_id": parentSpanID,
"operation": span.Operation,
"start": r.applyNTPOffset(span.Start).Format(time.RFC3339Nano),
"duration": span.Duration.Nanoseconds(),
"tags": span.Tags,
}
for _, event := range span.Logs {
var fields = make(map[string]interface{})
for _, field := range event.Fields {
fields[field.Key()] = field.Value()
}
eventId, err := uuid.NewRandom()
if err != nil {
panic(err)
}
spans = append(spans, map[string]interface{}{
events = append(events, PayloadEvent{
"context": map[string]interface{}{
"trace_id": fmt.Sprintf("%x", span.Context.TraceID),
"span_id": fmt.Sprintf("%x", span.Context.SpanID),
"baggage": span.Context.Baggage,
"event_id": eventId.String(),
},
"parent_span_id": parentSpanID,
"operation": span.Operation,
"start": r.applyNTPOffset(span.Start).Format(time.RFC3339Nano),
"duration": span.Duration.Nanoseconds(),
"tags": span.Tags,
"timestamp": r.applyNTPOffset(event.Timestamp).Format(time.RFC3339Nano),
"fields": fields,
})
for _, event := range span.Logs {
var fields = make(map[string]interface{})
for _, field := range event.Fields {
fields[field.Key()] = field.Value()
}
eventId, err := uuid.NewRandom()
if err != nil {
panic(err)
}
events = append(events, map[string]interface{}{
"context": map[string]interface{}{
"trace_id": fmt.Sprintf("%x", span.Context.TraceID),
"span_id": fmt.Sprintf("%x", span.Context.SpanID),
"event_id": eventId.String(),
},
"timestamp": r.applyNTPOffset(event.Timestamp).Format(time.RFC3339Nano),
"fields": fields,
})
}
}
return map[string]interface{}{
"metadata": r.metadata,
"spans": spans,
"events": events,
tags.AgentID: r.agentId,
}
return payloadSpan, events
}

// Encodes `payload` using msgpack and compress it with gzip
Expand Down Expand Up @@ -379,32 +372,64 @@ func (r *SpanRecorder) getFlushFrequency() time.Duration {
}

// Gets if there any span available to be send
func (r *SpanRecorder) hasSpans() bool {
func (r *SpanRecorder) hasPayloadData() bool {
r.RLock()
defer r.RUnlock()
return len(r.spans) > 0
return len(r.payloadSpans) > 0 || len(r.payloadEvents) > 0
}

// Gets a number of payload spans from buffer
func (r *SpanRecorder) popPayloadSpan(count int) ([]PayloadSpan, bool, int) {
r.Lock()
defer r.Unlock()
var spans []PayloadSpan
length := len(r.payloadSpans)
if length <= count || count == -1 {
spans = r.payloadSpans
if spans == nil {
spans = make([]PayloadSpan, 0)
}
r.payloadSpans = nil
return spans, false, length
}
spans = r.payloadSpans[:count]
r.payloadSpans = r.payloadSpans[count:]
return spans, true, length
}

// Gets the spans to be send and clears the buffer
func (r *SpanRecorder) popSpans() []tracer.RawSpan {
// Gets a number of payload events from buffer
func (r *SpanRecorder) popPayloadEvents(count int) ([]PayloadEvent, bool, int) {
r.Lock()
defer r.Unlock()
spans := r.spans
r.spans = nil
return spans
var events []PayloadEvent
length := len(r.payloadEvents)
if length <= count || count == -1 {
events = r.payloadEvents
if events == nil {
events = make([]PayloadEvent, 0)
}
r.payloadEvents = nil
return events, false, length
}
events = r.payloadEvents[:count]
r.payloadEvents = r.payloadEvents[count:]
return events, true, length
}

// Adds a span to the buffer
func (r *SpanRecorder) addSpan(span tracer.RawSpan) {
r.Lock()
defer r.Unlock()
r.spans = append(r.spans, span)
payloadSpan, payloadEvents := r.getPayloadComponents(span)
r.payloadSpans = append(r.payloadSpans, payloadSpan)
r.payloadEvents = append(r.payloadEvents, payloadEvents...)

atomic.AddInt64(&r.stats.totalSpans, 1)
if isTestSpan(span) {
if isTestSpan(span.Tags) {
atomic.AddInt64(&r.stats.totalTestSpans, 1)
}
}

func isTestSpan(span tracer.RawSpan) bool {
return span.Tags["span.kind"] == "test"
func isTestSpan(tags map[string]interface{}) bool {
return tags["span.kind"] == "test"
}