Skip to content

Commit

Permalink
Use only internal format, not OTLP
Browse files Browse the repository at this point in the history
Signed-off-by: Juraci Paixão Kröhling <juraci@kroehling.de>
  • Loading branch information
jpkrohling committed Jul 27, 2020
1 parent 1ffac6f commit f81445e
Show file tree
Hide file tree
Showing 5 changed files with 253 additions and 201 deletions.
74 changes: 37 additions & 37 deletions processor/groupbytraceprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
v1 "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/trace/v1"
)

var (
Expand Down Expand Up @@ -55,8 +54,8 @@ type groupByTraceProcessor struct {

// used for the ring buffer
index int
ids [][]byte
idToIndex map[uint64]int // key is hash(traceID), value is the index on the 'ids' slice
ids []pdata.TraceID
idToIndex map[string]int // key is hash(traceID), value is the index on the 'ids' slice

// the trace storage
st storage
Expand All @@ -65,13 +64,13 @@ type groupByTraceProcessor struct {
received chan pdata.Traces

// traceID to be released
expired chan []byte
expired chan pdata.TraceID

// released traces
released chan *v1.ResourceSpans
released chan pdata.ResourceSpans

// traceID to be removed
removed chan []byte
removed chan pdata.TraceID

// shutdown synchronization
stopped chan context.Context // used in the main loop
Expand All @@ -92,15 +91,15 @@ func newGroupByTraceProcessor(logger *zap.Logger, st storage, nextConsumer consu
nextConsumer: nextConsumer,
config: config,
st: st,
ids: make([][]byte, config.NumTraces),
idToIndex: make(map[uint64]int),
ids: make([]pdata.TraceID, config.NumTraces),
idToIndex: make(map[string]int),
index: -1, // the first span to be received will be placed at position '0'

// open the channels
removed: make(chan []byte, 50), // TODO: perf tests to decide what's a good buffer size
expired: make(chan []byte, 50),
removed: make(chan pdata.TraceID, 50), // TODO: perf tests to decide what's a good buffer size
expired: make(chan pdata.TraceID, 50),
received: make(chan pdata.Traces, 50),
released: make(chan *v1.ResourceSpans, 50),
released: make(chan pdata.ResourceSpans, 50),

// shutdown synchronization
stopped: make(chan context.Context),
Expand Down Expand Up @@ -152,24 +151,23 @@ func (sp *groupByTraceProcessor) onTrace() {
sp.processExpired(traceID)
case item := <-sp.received:
// in this implementation, this won't return errors
batch := pdata.TracesToOtlp(item)
sp.processBatch(batch)
sp.processBatch(item)
}
}
}

func (sp *groupByTraceProcessor) processBatch(batch []*v1.ResourceSpans) error {
for _, trace := range batch {
if err := sp.processTrace(trace); err != nil {
func (sp *groupByTraceProcessor) processBatch(batch pdata.Traces) error {
for i := 0; i < batch.ResourceSpans().Len(); i++ {
if err := sp.processTrace(batch.ResourceSpans().At(i)); err != nil {
sp.logger.Info("failed to process trace", zap.Error(err))
}
}

return nil
}

func (sp *groupByTraceProcessor) processTrace(trace *v1.ResourceSpans) error {
if trace == nil {
func (sp *groupByTraceProcessor) processTrace(trace pdata.ResourceSpans) error {
if trace.IsNil() {
// should not happen with the current code
return errNilTrace
}
Expand All @@ -183,7 +181,7 @@ func (sp *groupByTraceProcessor) processTrace(trace *v1.ResourceSpans) error {
return errNilTraceID
}

hashedTraceID := hash(traceID)
hashedTraceID := traceID.String()

if _, ok := sp.idToIndex[hashedTraceID]; ok {
// it exists in memory already, just append the spans to the trace in the storage
Expand All @@ -205,7 +203,7 @@ func (sp *groupByTraceProcessor) processTrace(trace *v1.ResourceSpans) error {
idAtRingItem := sp.ids[sp.index]
if idAtRingItem != nil {
// delete from the map and erase its memory entry
delete(sp.idToIndex, hash(idAtRingItem))
delete(sp.idToIndex, idAtRingItem.String())
sp.ids[sp.index] = nil

// delete from the storage
Expand Down Expand Up @@ -241,10 +239,10 @@ func (sp *groupByTraceProcessor) processTrace(trace *v1.ResourceSpans) error {
return nil
}

func (sp *groupByTraceProcessor) processExpired(traceID []byte) error {
sp.logger.Debug("processing expired", zap.ByteString("traceID", traceID))
hashed := hash(traceID)
indexToRemove, found := sp.idToIndex[hashed]
func (sp *groupByTraceProcessor) processExpired(traceID pdata.TraceID) error {
sTraceID := traceID.String()
sp.logger.Debug("processing expired", zap.String("traceID", sTraceID))
indexToRemove, found := sp.idToIndex[sTraceID]

if !found {
// we likely received multiple batches with spans for the same trace
Expand All @@ -254,7 +252,7 @@ func (sp *groupByTraceProcessor) processExpired(traceID []byte) error {
}

// delete from the map and erase its memory entry
delete(sp.idToIndex, hashed)
delete(sp.idToIndex, sTraceID)
sp.ids[indexToRemove] = nil

// this might block, but we don't need to wait
Expand All @@ -271,7 +269,7 @@ func (sp *groupByTraceProcessor) markAsReleased(traceID []byte) error {
return fmt.Errorf("couldn't retrieve trace %q from the storage: %w", traceID, err)
}

if trace == nil {
if trace.IsNil() {
return fmt.Errorf("the trace %q couldn't be found at the storage", traceID)
}

Expand All @@ -283,7 +281,9 @@ func (sp *groupByTraceProcessor) markAsReleased(traceID []byte) error {

func (sp *groupByTraceProcessor) onReleased() {
for item := range sp.released {
sp.nextConsumer.ConsumeTraces(context.Background(), pdata.TracesFromOtlp([]*v1.ResourceSpans{item}))
trace := pdata.NewTraces()
trace.ResourceSpans().Append(&item)
sp.nextConsumer.ConsumeTraces(context.Background(), trace)
}
}

Expand All @@ -300,15 +300,15 @@ func (sp *groupByTraceProcessor) processRemoved(traceID []byte) error {
return fmt.Errorf("couldn't delete trace %q from the storage: %w", string(traceID), err)
}

if trace == nil {
return fmt.Errorf("trace %q not found at the storage", string(traceID))
if trace.IsNil() {
return fmt.Errorf("trace %q not found at the storage", traceID)
}

return nil
}

func (sp *groupByTraceProcessor) addSpans(traceID []byte, trace *v1.ResourceSpans) error {
sp.logger.Debug("creating trace at the storage", zap.ByteString("traceID", traceID))
func (sp *groupByTraceProcessor) addSpans(traceID pdata.TraceID, trace pdata.ResourceSpans) error {
sp.logger.Debug("creating trace at the storage", zap.Stringer("traceID", traceID))
return sp.st.createOrAppend(traceID, trace)
}

Expand All @@ -328,16 +328,16 @@ func (sp *groupByTraceProcessor) processShutdown(ctx context.Context) {
sp.cancelLock.Unlock()
}

func extractTraceID(trace *v1.ResourceSpans) ([]byte, error) {
ilSpans := trace.InstrumentationLibrarySpans
if len(ilSpans) <= 0 {
func extractTraceID(trace pdata.ResourceSpans) (pdata.TraceID, error) {
ilSpans := trace.InstrumentationLibrarySpans()
if ilSpans.Len() <= 0 {
return nil, errNoInstrumentationLibrarySpans
}

spans := ilSpans[0].Spans
if len(spans) <= 0 {
spans := ilSpans.At(0).Spans()
if spans.Len() <= 0 {
return nil, errNoSpans
}

return spans[0].TraceId, nil
return spans.At(0).TraceID(), nil
}
Loading

0 comments on commit f81445e

Please sign in to comment.