Skip to content

Commit

Permalink
Review - second round
Browse files Browse the repository at this point in the history
Signed-off-by: Juraci Paixão Kröhling <juraci@kroehling.de>
  • Loading branch information
jpkrohling committed Jul 28, 2020
1 parent bbed79a commit 74f8846
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 85 deletions.
2 changes: 1 addition & 1 deletion processor/groupbytraceprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type Config struct {
configmodels.ProcessorSettings `mapstructure:",squash"`

// NumTraces is the max number of traces to keep in memory waiting for the duration.
// Default: 1.000.000.
// Default: 1_000_000.
NumTraces int `mapstructure:"num_traces"`

// WaitDuration tells the processor to wait for the specified duration for the trace to be complete.
Expand Down
9 changes: 1 addition & 8 deletions processor/groupbytraceprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
"fmt"
"time"

"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configerror"
"go.opentelemetry.io/collector/config/configmodels"
Expand Down Expand Up @@ -86,15 +84,10 @@ func (f *Factory) CreateTraceProcessor(
return nil, errDiscardOrphansNotSupported
}

logger := params.Logger
if logger == nil {
logger = zap.NewNop()
}

// the only supported storage for now
st = newMemoryStorage()

return newGroupByTraceProcessor(logger, st, nextConsumer, *oCfg)
return newGroupByTraceProcessor(params.Logger, st, nextConsumer, *oCfg)
}

// CreateMetricsProcessor creates a metric processor based on this config.
Expand Down
10 changes: 9 additions & 1 deletion processor/groupbytraceprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configerror"
Expand All @@ -42,7 +44,13 @@ func TestCreateTestProcessor(t *testing.T) {
// prepare
f := &Factory{}
c := f.CreateDefaultConfig().(*Config)
params := component.ProcessorCreateParams{}

logger, err := zap.NewDevelopment()
require.NoError(t, err)

params := component.ProcessorCreateParams{
Logger: logger,
}
next := &mockProcessor{}

// test
Expand Down
27 changes: 0 additions & 27 deletions processor/groupbytraceprocessor/hash.go

This file was deleted.

33 changes: 0 additions & 33 deletions processor/groupbytraceprocessor/hash_test.go

This file was deleted.

29 changes: 14 additions & 15 deletions processor/groupbytraceprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type groupByTraceProcessor struct {
// used for the ring buffer
index int
ids []pdata.TraceID
idToIndex map[string]int // key is hash(traceID), value is the index on the 'ids' slice
idToIndex map[string]int // key is traceID as string, value is the index on the 'ids' slice

// the trace storage
st storage
Expand Down Expand Up @@ -181,9 +181,8 @@ func (sp *groupByTraceProcessor) processTrace(trace pdata.ResourceSpans) error {
return errNilTraceID
}

hashedTraceID := traceID.String()

if _, ok := sp.idToIndex[hashedTraceID]; ok {
sTraceID := traceID.String()
if _, ok := sp.idToIndex[sTraceID]; ok {
// it exists in memory already, just append the spans to the trace in the storage
if err := sp.addSpans(traceID, trace); err != nil {
return fmt.Errorf("couldn't add spans to existing trace: %w", err)
Expand All @@ -210,12 +209,12 @@ func (sp *groupByTraceProcessor) processTrace(trace pdata.ResourceSpans) error {
sp.removed <- idAtRingItem

// TODO: do we want another channel that receives evicted items? record a metric perhaps?
sp.logger.Debug("trace evicted", zap.ByteString("traceID", idAtRingItem))
sp.logger.Debug("trace evicted", zap.Stringer("traceID", idAtRingItem))
}

// place the traceID in memory
sp.ids[sp.index] = traceID
sp.idToIndex[hashedTraceID] = sp.index
sp.idToIndex[sTraceID] = sp.index

// we have the traceID in the memory, place the spans in the storage too
if err := sp.addSpans(traceID, trace); err != nil {
Expand All @@ -227,11 +226,11 @@ func (sp *groupByTraceProcessor) processTrace(trace pdata.ResourceSpans) error {
time.AfterFunc(sp.config.WaitDuration, func() {
sp.cancelLock.RLock()
if sp.cancelReleases {
sp.logger.Debug("processor is stopping, canceling the release of the trace", zap.ByteString("traceID", traceID))
sp.logger.Debug("processor is stopping, canceling the release of the trace", zap.Stringer("traceID", traceID))
return
}

sp.logger.Debug("releasing trace", zap.ByteString("traceID", traceID))
sp.logger.Debug("releasing trace", zap.Stringer("traceID", traceID))
sp.expired <- traceID
sp.cancelLock.RUnlock()
})
Expand All @@ -247,7 +246,7 @@ func (sp *groupByTraceProcessor) processExpired(traceID pdata.TraceID) error {
if !found {
// we likely received multiple batches with spans for the same trace
// and released this trace already
sp.logger.Debug("skipping the processing of expired trace", zap.ByteString("traceID", traceID))
sp.logger.Debug("skipping the processing of expired trace", zap.Stringer("traceID", traceID))
return nil
}

Expand All @@ -256,13 +255,13 @@ func (sp *groupByTraceProcessor) processExpired(traceID pdata.TraceID) error {
sp.ids[indexToRemove] = nil

// this might block, but we don't need to wait
sp.logger.Debug("marking the trace as released", zap.ByteString("traceID", traceID))
sp.logger.Debug("marking the trace as released", zap.Stringer("traceID", traceID))
go sp.markAsReleased(traceID)

return nil
}

func (sp *groupByTraceProcessor) markAsReleased(traceID []byte) error {
func (sp *groupByTraceProcessor) markAsReleased(traceID pdata.TraceID) error {
// #get is a potentially blocking operation
trace, err := sp.st.get(traceID)
if err != nil {
Expand All @@ -274,7 +273,7 @@ func (sp *groupByTraceProcessor) markAsReleased(traceID []byte) error {
}

// signal that the trace is ready to be released
sp.logger.Debug("trace marked as released", zap.ByteString("traceID", traceID))
sp.logger.Debug("trace marked as released", zap.Stringer("traceID", traceID))
sp.released <- trace
return nil
}
Expand All @@ -295,14 +294,14 @@ func (sp *groupByTraceProcessor) onRemoved() {
}
}

func (sp *groupByTraceProcessor) processRemoved(traceID []byte) error {
func (sp *groupByTraceProcessor) processRemoved(traceID pdata.TraceID) error {
trace, err := sp.st.delete(traceID)
if err != nil {
return fmt.Errorf("couldn't delete trace %q from the storage: %w", string(traceID), err)
return fmt.Errorf("couldn't delete trace %q from the storage: %w", traceID.String(), err)
}

if trace.IsNil() {
return fmt.Errorf("trace %q not found at the storage", traceID)
return fmt.Errorf("trace %q not found at the storage", traceID.String())
}

return nil
Expand Down

0 comments on commit 74f8846

Please sign in to comment.