diff --git a/agent/recorder.go b/agent/recorder.go index 5d529375..2cdcbce0 100644 --- a/agent/recorder.go +++ b/agent/recorder.go @@ -38,7 +38,8 @@ type ( debugMode bool metadata map[string]interface{} - spans []tracer.RawSpan + payloadSpans []PayloadSpan + payloadEvents []PayloadEvent flushFrequency time.Duration url string @@ -62,6 +63,9 @@ type ( testSpansNotSent int64 testSpansRejected int64 } + + PayloadSpan map[string]interface{} + PayloadEvent map[string]interface{} ) func NewSpanRecorder(agent *Agent) *SpanRecorder { @@ -87,7 +91,7 @@ func (r *SpanRecorder) RecordSpan(span tracer.RawSpan) { if !r.t.Alive() { atomic.AddInt64(&r.stats.totalSpans, 1) atomic.AddInt64(&r.stats.spansRejected, 1) - if isTestSpan(span) { + if isTestSpan(span.Tags) { atomic.AddInt64(&r.stats.totalTestSpans, 1) atomic.AddInt64(&r.stats.testSpansRejected, 1) } @@ -98,15 +102,18 @@ func (r *SpanRecorder) RecordSpan(span tracer.RawSpan) { } func (r *SpanRecorder) loop() error { + defer func() { + r.logger.Println("recorder has been stopped.") + }() ticker := time.NewTicker(1 * time.Second) cTime := time.Now() for { select { case <-ticker.C: - hasSpans := r.hasSpans() - if hasSpans || time.Now().Sub(cTime) >= r.getFlushFrequency() { + hasPayloadData := r.hasPayloadData() + if hasPayloadData || time.Now().Sub(cTime) >= r.getFlushFrequency() { if r.debugMode { - if hasSpans { + if hasPayloadData { r.logger.Println("Ticker: Sending by buffer") } else { r.logger.Println("Ticker: Sending by time") @@ -135,26 +142,18 @@ func (r *SpanRecorder) loop() error { // Sends the spans in the buffer to Scope func (r *SpanRecorder) sendSpans() (error, bool) { atomic.AddInt64(&r.stats.sendSpansCalls, 1) - spans := r.popSpans() - const batchSize = 1000 - batchLength := len(spans) / batchSize - - r.logger.Printf("sending %d spans in %d batches", len(spans), batchLength+1) - var lastError error - for b := 0; b <= batchLength; b++ { - var batch []tracer.RawSpan - // We extract the batch of spans to be send - if b == batchLength { - // If we are in the last batch, we select the remaining spans - batch = spans[b*batchSize:] - } else { - batch = spans[b*batchSize : ((b + 1) * batchSize)] + for { + spans, spMore, spTotal := r.popPayloadSpan(batchSize) + events, evMore, evTotal := r.popPayloadEvents(batchSize) + + payload := map[string]interface{}{ + "metadata": r.metadata, + "spans": spans, + "events": events, + tags.AgentID: r.agentId, } - - payload := r.getPayload(batch) - buf, err := encodePayload(payload) if err != nil { atomic.AddInt64(&r.stats.sendSpansKo, 1) @@ -163,15 +162,13 @@ func (r *SpanRecorder) sendSpans() (error, bool) { } var testSpans int64 - for _, span := range batch { + for _, span := range spans { if isTestSpan(span) { testSpans++ } } - if batchLength > 0 { - r.logger.Printf("sending batch %d with %d spans", b+1, len(batch)) - } + r.logger.Printf("sending %d/%d spans with %d/%d events", len(spans), spTotal, len(events), evTotal) statusCode, err := r.callIngest(buf) if err != nil { atomic.AddInt64(&r.stats.sendSpansKo, 1) @@ -186,6 +183,10 @@ func (r *SpanRecorder) sendSpans() (error, bool) { return err, true } lastError = err + + if !spMore && !evMore { + break + } } return lastError, false } @@ -302,53 +303,45 @@ func (r *SpanRecorder) callIngest(payload *bytes.Buffer) (statusCode int, err er return statusCode, lastError } -// Combines `rawSpans` and `metadata` into a payload that the Scope backend can process -func (r *SpanRecorder) getPayload(rawSpans []tracer.RawSpan) map[string]interface{} { - spans := []map[string]interface{}{} - events := []map[string]interface{}{} - for _, span := range rawSpans { - var parentSpanID string - if span.ParentSpanID != 0 { - parentSpanID = fmt.Sprintf("%x", span.ParentSpanID) +// Get payload components +func (r *SpanRecorder) getPayloadComponents(span tracer.RawSpan) (PayloadSpan, []PayloadEvent) { + events := make([]PayloadEvent, 0) + var parentSpanID string + if span.ParentSpanID != 0 { + parentSpanID = fmt.Sprintf("%x", span.ParentSpanID) + } + payloadSpan := PayloadSpan{ + "context": map[string]interface{}{ + "trace_id": fmt.Sprintf("%x", span.Context.TraceID), + "span_id": fmt.Sprintf("%x", span.Context.SpanID), + "baggage": span.Context.Baggage, + }, + "parent_span_id": parentSpanID, + "operation": span.Operation, + "start": r.applyNTPOffset(span.Start).Format(time.RFC3339Nano), + "duration": span.Duration.Nanoseconds(), + "tags": span.Tags, + } + for _, event := range span.Logs { + var fields = make(map[string]interface{}) + for _, field := range event.Fields { + fields[field.Key()] = field.Value() + } + eventId, err := uuid.NewRandom() + if err != nil { + panic(err) } - spans = append(spans, map[string]interface{}{ + events = append(events, PayloadEvent{ "context": map[string]interface{}{ "trace_id": fmt.Sprintf("%x", span.Context.TraceID), "span_id": fmt.Sprintf("%x", span.Context.SpanID), - "baggage": span.Context.Baggage, + "event_id": eventId.String(), }, - "parent_span_id": parentSpanID, - "operation": span.Operation, - "start": r.applyNTPOffset(span.Start).Format(time.RFC3339Nano), - "duration": span.Duration.Nanoseconds(), - "tags": span.Tags, + "timestamp": r.applyNTPOffset(event.Timestamp).Format(time.RFC3339Nano), + "fields": fields, }) - for _, event := range span.Logs { - var fields = make(map[string]interface{}) - for _, field := range event.Fields { - fields[field.Key()] = field.Value() - } - eventId, err := uuid.NewRandom() - if err != nil { - panic(err) - } - events = append(events, map[string]interface{}{ - "context": map[string]interface{}{ - "trace_id": fmt.Sprintf("%x", span.Context.TraceID), - "span_id": fmt.Sprintf("%x", span.Context.SpanID), - "event_id": eventId.String(), - }, - "timestamp": r.applyNTPOffset(event.Timestamp).Format(time.RFC3339Nano), - "fields": fields, - }) - } - } - return map[string]interface{}{ - "metadata": r.metadata, - "spans": spans, - "events": events, - tags.AgentID: r.agentId, } + return payloadSpan, events } // Encodes `payload` using msgpack and compress it with gzip @@ -379,32 +372,64 @@ func (r *SpanRecorder) getFlushFrequency() time.Duration { } // Gets if there any span available to be send -func (r *SpanRecorder) hasSpans() bool { +func (r *SpanRecorder) hasPayloadData() bool { r.RLock() defer r.RUnlock() - return len(r.spans) > 0 + return len(r.payloadSpans) > 0 || len(r.payloadEvents) > 0 +} + +// Gets a number of payload spans from buffer +func (r *SpanRecorder) popPayloadSpan(count int) ([]PayloadSpan, bool, int) { + r.Lock() + defer r.Unlock() + var spans []PayloadSpan + length := len(r.payloadSpans) + if length <= count || count == -1 { + spans = r.payloadSpans + if spans == nil { + spans = make([]PayloadSpan, 0) + } + r.payloadSpans = nil + return spans, false, length + } + spans = r.payloadSpans[:count] + r.payloadSpans = r.payloadSpans[count:] + return spans, true, length } -// Gets the spans to be send and clears the buffer -func (r *SpanRecorder) popSpans() []tracer.RawSpan { +// Gets a number of payload events from buffer +func (r *SpanRecorder) popPayloadEvents(count int) ([]PayloadEvent, bool, int) { r.Lock() defer r.Unlock() - spans := r.spans - r.spans = nil - return spans + var events []PayloadEvent + length := len(r.payloadEvents) + if length <= count || count == -1 { + events = r.payloadEvents + if events == nil { + events = make([]PayloadEvent, 0) + } + r.payloadEvents = nil + return events, false, length + } + events = r.payloadEvents[:count] + r.payloadEvents = r.payloadEvents[count:] + return events, true, length } // Adds a span to the buffer func (r *SpanRecorder) addSpan(span tracer.RawSpan) { r.Lock() defer r.Unlock() - r.spans = append(r.spans, span) + payloadSpan, payloadEvents := r.getPayloadComponents(span) + r.payloadSpans = append(r.payloadSpans, payloadSpan) + r.payloadEvents = append(r.payloadEvents, payloadEvents...) + atomic.AddInt64(&r.stats.totalSpans, 1) - if isTestSpan(span) { + if isTestSpan(span.Tags) { atomic.AddInt64(&r.stats.totalTestSpans, 1) } } -func isTestSpan(span tracer.RawSpan) bool { - return span.Tags["span.kind"] == "test" +func isTestSpan(tags map[string]interface{}) bool { + return tags["span.kind"] == "test" }