Skip to content

Commit

Permalink
revert
Browse files Browse the repository at this point in the history
  • Loading branch information
moh-osman3 committed May 7, 2024
1 parent f925e98 commit 25dd516
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 36 deletions.
16 changes: 1 addition & 15 deletions collector/processor/concurrentbatchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/sync/semaphore"
"google.golang.org/grpc/metadata"

"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -201,15 +200,9 @@ func newBatchProcessor(set processor.CreateSettings, cfg *Config, batchFunc func

// newShard gets or creates a batcher corresponding with attrs.
func (bp *batchProcessor) newShard(md map[string][]string) *shard {
// client.NewContext adds the metadata to client.Info object. In some cases the md
// keys were found in the incoming context and this incoming context may be used
// by downstream collector components that don't know about or use the client.Info object.
incoming := metadata.NewIncomingContext(context.Background(), md)

exportCtx := client.NewContext(incoming, client.Info{
exportCtx := client.NewContext(context.Background(), client.Info{
Metadata: client.NewMetadata(md),
})

b := &shard{
processor: bp,
newItem: make(chan dataItem, runtime.NumCPU()),
Expand Down Expand Up @@ -561,25 +554,18 @@ func (mb *multiShardBatcher) consume(ctx context.Context, data any) error {
info := client.FromContext(ctx)
md := map[string][]string{}
var attrs []attribute.KeyValue

incomingHeaders, headersFound := metadata.FromIncomingContext(ctx)
for _, k := range mb.metadataKeys {
// Lookup the value in the incoming metadata, copy it
// into the outgoing metadata, and create a unique
// value for the attributeSet.
vs := info.Metadata.Get(k)
if len(vs) == 0 && headersFound {
// not found in client.Info so try the metadata directly from incoming context.
vs = incomingHeaders[strings.ToLower(k)]
}
md[k] = vs
if len(vs) == 1 {
attrs = append(attrs, attribute.String(k, vs[0]))
} else {
attrs = append(attrs, attribute.StringSlice(k, vs))
}
}

aset := attribute.NewSet(attrs...)

b, ok := mb.batchers.Load(aset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"golang.org/x/sync/semaphore"
"google.golang.org/grpc/metadata"

"github.com/open-telemetry/otel-arrow/collector/processor/concurrentbatchprocessor/testdata"
"go.opentelemetry.io/collector/client"
Expand Down Expand Up @@ -1408,16 +1407,8 @@ func formatTwo(first, second []string) string {

func (mts *metadataTracesSink) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
info := client.FromContext(ctx)
incomingHeaders, headersFound := metadata.FromIncomingContext(ctx)
token1 := info.Metadata.Get("token1")
if len(token1) == 0 && headersFound {
token1 = incomingHeaders["token1"]
}
token2 := info.Metadata.Get("token2")
if len(token2) == 0 && headersFound {
token2 = incomingHeaders["token2"]
}

mts.lock.Lock()
defer mts.lock.Unlock()

Expand All @@ -1443,10 +1434,6 @@ func TestBatchProcessorSpansBatchedByMetadata(t *testing.T) {
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

incoming := metadata.NewIncomingContext(context.Background(), map[string][]string{
"token1": {"incoming1"},
"token2": {"incoming2"},
})
bg := context.Background()
callCtxs := []context.Context{
client.NewContext(bg, client.Info{
Expand Down Expand Up @@ -1476,14 +1463,6 @@ func TestBatchProcessorSpansBatchedByMetadata(t *testing.T) {
"token4": {"n/a", "d/c"},
}),
}),
// empty client.Info with empty metadata.FromIncomingContext.
client.NewContext(bg, client.Info{
Metadata: client.NewMetadata(map[string][]string{}),
}),
// empty client.Info with existing metadata.FromIncomingContext.
client.NewContext(incoming, client.Info{
Metadata: client.NewMetadata(map[string][]string{}),
}),
}
expectByContext := make([]int, len(callCtxs))

Expand Down

0 comments on commit 25dd516

Please sign in to comment.