diff --git a/CHANGELOG.md b/CHANGELOG.md index e3a63ba2..49cb0600 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## Unreleased -- Added a semaphore package to limit bytes admitted and total number of waiters. [#174](https://github.com/open-telemetry/otel-arrow/pull/174) +- Refactor otelarrowreceiver to do stream.Recv, request processing, and stream.Send in separate goroutines. [#181](https://github.com/open-telemetry/otel-arrow/pull/181) + +- Add a semaphore package to limit bytes admitted and total number of waiters. [#174](https://github.com/open-telemetry/otel-arrow/pull/174) ## [0.22.0](https://github.com/open-telemetry/otel-arrow/releases/tag/v0.22.0) - 2024-04-16 diff --git a/api/experimental/arrow/v1/arrow_service.pb.go b/api/experimental/arrow/v1/arrow_service.pb.go index 8dd1045b..693de228 100644 --- a/api/experimental/arrow/v1/arrow_service.pb.go +++ b/api/experimental/arrow/v1/arrow_service.pb.go @@ -22,8 +22,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.31.0 -// protoc v3.19.4 +// protoc-gen-go v1.33.0 +// protoc v4.25.3 // source: opentelemetry/proto/experimental/arrow/v1/arrow_service.proto package v1 diff --git a/api/experimental/arrow/v1/arrow_service_grpc.pb.go b/api/experimental/arrow/v1/arrow_service_grpc.pb.go index 1d0a1b3b..fbc32a19 100644 --- a/api/experimental/arrow/v1/arrow_service_grpc.pb.go +++ b/api/experimental/arrow/v1/arrow_service_grpc.pb.go @@ -23,7 +23,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v3.19.4 +// - protoc v4.25.3 // source: opentelemetry/proto/experimental/arrow/v1/arrow_service.proto package v1 diff --git a/collector/examples/bridge/edge-collector.yaml b/collector/examples/bridge/edge-collector.yaml index db9922d3..38b23240 100644 --- a/collector/examples/bridge/edge-collector.yaml +++ b/collector/examples/bridge/edge-collector.yaml @@ -49,4 +49,4 @@ service: address: 127.0.0.1:8888 level: detailed logs: - level: info + level: info \ No newline at end of file diff --git a/collector/examples/bridge/saas-collector.yaml b/collector/examples/bridge/saas-collector.yaml index f1c91760..1bcbdd7c 100644 --- a/collector/examples/bridge/saas-collector.yaml +++ b/collector/examples/bridge/saas-collector.yaml @@ -57,4 +57,4 @@ service: telemetry: metrics: address: 127.0.0.1:8889 - level: normal + level: normal \ No newline at end of file diff --git a/collector/exporter/otelarrowexporter/go.mod b/collector/exporter/otelarrowexporter/go.mod index 1095729f..4a99cda6 100644 --- a/collector/exporter/otelarrowexporter/go.mod +++ b/collector/exporter/otelarrowexporter/go.mod @@ -16,7 +16,6 @@ require ( go.opentelemetry.io/collector/config/configgrpc v0.98.0 go.opentelemetry.io/collector/config/configopaque v1.5.0 go.opentelemetry.io/collector/config/configretry v0.98.0 - go.opentelemetry.io/collector/config/configtelemetry v0.98.0 go.opentelemetry.io/collector/config/configtls v0.98.0 go.opentelemetry.io/collector/confmap v0.98.0 go.opentelemetry.io/collector/consumer v0.98.0 @@ -75,6 +74,7 @@ require ( github.com/x448/float16 v0.8.4 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect go.opentelemetry.io/collector/config/confignet v0.98.0 // indirect + go.opentelemetry.io/collector/config/configtelemetry v0.98.0 // indirect go.opentelemetry.io/collector/config/internal v0.98.0 // indirect go.opentelemetry.io/collector/featuregate v1.5.0 // indirect go.opentelemetry.io/collector/receiver v0.98.0 // indirect diff --git a/collector/exporter/otelarrowexporter/internal/arrow/exporter.go b/collector/exporter/otelarrowexporter/internal/arrow/exporter.go index 0d80fb02..eb384bb2 100644 --- a/collector/exporter/otelarrowexporter/internal/arrow/exporter.go +++ b/collector/exporter/otelarrowexporter/internal/arrow/exporter.go @@ -7,6 +7,7 @@ import ( "context" "errors" "math/rand" + "strconv" "sync" "time" @@ -14,7 +15,6 @@ import ( "github.com/open-telemetry/otel-arrow/collector/netstats" arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" @@ -285,19 +285,22 @@ func (e *Exporter) SendAndWait(ctx context.Context, data any) (bool, error) { // exporter, because of the optimization phase performed in the // conversion to Arrow. var uncompSize int - if e.telemetry.MetricsLevel > configtelemetry.LevelNormal { - switch data := data.(type) { - case ptrace.Traces: - var sizer ptrace.ProtoMarshaler - uncompSize = sizer.TracesSize(data) - case plog.Logs: - var sizer plog.ProtoMarshaler - uncompSize = sizer.LogsSize(data) - case pmetric.Metrics: - var sizer pmetric.ProtoMarshaler - uncompSize = sizer.MetricsSize(data) - } + switch data := data.(type) { + case ptrace.Traces: + var sizer ptrace.ProtoMarshaler + uncompSize = sizer.TracesSize(data) + case plog.Logs: + var sizer plog.ProtoMarshaler + uncompSize = sizer.LogsSize(data) + case pmetric.Metrics: + var sizer pmetric.ProtoMarshaler + uncompSize = sizer.MetricsSize(data) + } + + if md == nil { + md = make(map[string]string) } + md["otlp-pdata-size"] = strconv.Itoa(uncompSize) wri := writeItem{ records: data, diff --git a/collector/exporter/otelarrowexporter/internal/arrow/exporter_test.go b/collector/exporter/otelarrowexporter/internal/arrow/exporter_test.go index 2a0feaff..2740cdb1 100644 --- a/collector/exporter/otelarrowexporter/internal/arrow/exporter_test.go +++ b/collector/exporter/otelarrowexporter/internal/arrow/exporter_test.go @@ -602,12 +602,15 @@ func TestArrowExporterHeaders(t *testing.T) { if times%2 == 1 { md := metadata.MD{ - "expected1": []string{"metadata1"}, - "expected2": []string{fmt.Sprint(times)}, + "expected1": []string{"metadata1"}, + "expected2": []string{fmt.Sprint(times)}, + "otlp-pdata-size": []string{"329"}, } expectOutput = append(expectOutput, md) } else { - expectOutput = append(expectOutput, nil) + expectOutput = append(expectOutput, metadata.MD{ + "otlp-pdata-size": []string{"329"}, + }) } sent, err := tc.exporter.SendAndWait(ctx, input) @@ -677,11 +680,14 @@ func TestArrowExporterIsTraced(t *testing.T) { propagation.TraceContext{}.Inject(ctx, propagation.MapCarrier(expectMap)) md := metadata.MD{ - "traceparent": []string{expectMap["traceparent"]}, + "traceparent": []string{expectMap["traceparent"]}, + "otlp-pdata-size": []string{"329"}, } expectOutput = append(expectOutput, md) } else { - expectOutput = append(expectOutput, nil) + expectOutput = append(expectOutput, metadata.MD{ + "otlp-pdata-size": []string{"329"}, + }) } sent, err := tc.exporter.SendAndWait(ctx, input) diff --git a/collector/receiver/otelarrowreceiver/config.go b/collector/receiver/otelarrowreceiver/config.go index 78f9b53e..6e064cc2 100644 --- a/collector/receiver/otelarrowreceiver/config.go +++ b/collector/receiver/otelarrowreceiver/config.go @@ -24,6 +24,13 @@ type ArrowConfig struct { // passing through, they will see ResourceExhausted errors. MemoryLimitMiB uint64 `mapstructure:"memory_limit_mib"` + AdmissionLimitMiB uint64 `mapstructure:"admission_limit_mib"` + + // WaiterLimit is the limit on the number of waiters waiting to be processed and consumed. + // This is a dimension of memory limiting to ensure waiters are not consuming an + // unexpectedly large amount of memory in the arrow receiver. + WaiterLimit int64 `mapstructure:"waiter_limit"` + // Zstd settings apply to OTel-Arrow use of gRPC specifically. Zstd zstd.DecoderConfig `mapstructure:"zstd"` } diff --git a/collector/receiver/otelarrowreceiver/config_test.go b/collector/receiver/otelarrowreceiver/config_test.go index 315f2968..7efbe884 100644 --- a/collector/receiver/otelarrowreceiver/config_test.go +++ b/collector/receiver/otelarrowreceiver/config_test.go @@ -77,7 +77,8 @@ func TestUnmarshalConfig(t *testing.T) { }, }, Arrow: ArrowConfig{ - MemoryLimitMiB: 123, + MemoryLimitMiB: 123, + AdmissionLimitMiB: 80, }, }, }, cfg) @@ -101,7 +102,8 @@ func TestUnmarshalConfigUnix(t *testing.T) { ReadBufferSize: 512 * 1024, }, Arrow: ArrowConfig{ - MemoryLimitMiB: defaultMemoryLimitMiB, + MemoryLimitMiB: defaultMemoryLimitMiB, + AdmissionLimitMiB: defaultAdmissionLimitMiB, }, }, }, cfg) diff --git a/collector/receiver/otelarrowreceiver/factory.go b/collector/receiver/otelarrowreceiver/factory.go index 4b190684..7269084f 100644 --- a/collector/receiver/otelarrowreceiver/factory.go +++ b/collector/receiver/otelarrowreceiver/factory.go @@ -20,6 +20,7 @@ const ( defaultGRPCEndpoint = "0.0.0.0:4317" defaultMemoryLimitMiB = 128 + defaultAdmissionLimitMiB = defaultMemoryLimitMiB / 2 ) // NewFactory creates a new OTLP receiver factory. @@ -46,6 +47,7 @@ func createDefaultConfig() component.Config { }, Arrow: ArrowConfig{ MemoryLimitMiB: defaultMemoryLimitMiB, + AdmissionLimitMiB: defaultAdmissionLimitMiB, }, }, } diff --git a/collector/receiver/otelarrowreceiver/go.mod b/collector/receiver/otelarrowreceiver/go.mod index 0fb7e0fa..bceb4b3e 100644 --- a/collector/receiver/otelarrowreceiver/go.mod +++ b/collector/receiver/otelarrowreceiver/go.mod @@ -28,13 +28,16 @@ require ( go.uber.org/zap v1.27.0 golang.org/x/net v0.24.0 google.golang.org/grpc v1.63.2 + google.golang.org/protobuf v1.33.0 ) require ( github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect github.com/apache/arrow/go/v14 v14.0.2 // indirect github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc // indirect + github.com/bahlo/generic-list-go v0.2.0 // indirect github.com/beorn7/perks v1.0.1 // indirect + github.com/buger/jsonparser v1.1.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc // indirect @@ -55,6 +58,7 @@ require ( github.com/knadh/koanf/maps v0.1.1 // indirect github.com/knadh/koanf/providers/confmap v0.1.0 // indirect github.com/knadh/koanf/v2 v2.1.1 // indirect + github.com/mailru/easyjson v0.7.7 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect @@ -66,6 +70,7 @@ require ( github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.48.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect + github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect go.opentelemetry.io/collector/config/configcompression v1.5.0 // indirect @@ -84,6 +89,5 @@ require ( golang.org/x/tools v0.15.0 // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect - google.golang.org/protobuf v1.33.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/collector/receiver/otelarrowreceiver/go.sum b/collector/receiver/otelarrowreceiver/go.sum index b337bb8e..a0285322 100644 --- a/collector/receiver/otelarrowreceiver/go.sum +++ b/collector/receiver/otelarrowreceiver/go.sum @@ -7,10 +7,14 @@ github.com/apache/arrow/go/v14 v14.0.2 h1:N8OkaJEOfI3mEZt07BIkvo4sC6XDbL+48MBPWO github.com/apache/arrow/go/v14 v14.0.2/go.mod h1:u3fgh3EdgN/YQ8cVQRguVW3R+seMybFg8QBQ5LU+eBY= github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc h1:Keo7wQ7UODUaHcEi7ltENhbAK2VgZjfat6mLy03tQzo= github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc/go.mod h1:k08r+Yj1PRAmuayFiRK6MYuR5Ve4IuZtTfxErMIh0+c= +github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= +github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/brianvoe/gofakeit/v6 v6.17.0 h1:obbQTJeHfktJtiZzq0Q1bEpsNUs+yHrYlPVWt7BtmJ4= github.com/brianvoe/gofakeit/v6 v6.17.0/go.mod h1:Ow6qC71xtwm79anlwKRlWZW6zVq9D2XHE4QSSMP/rU8= +github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs= +github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= @@ -49,6 +53,7 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek= github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= +github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= @@ -70,6 +75,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= +github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= @@ -84,8 +91,6 @@ github.com/mostynb/go-grpc-compression v1.2.2/go.mod h1:GOCr2KBxXcblCuczg3YdLQlc github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/open-telemetry/otel-arrow v0.22.0 h1:G1jgtqAM2ho5pyKQ4tyrDzk9Y0VcJ+GZQRJgN26vRlI= github.com/open-telemetry/otel-arrow v0.22.0/go.mod h1:F50XFaiNfkfB0MYftZIUKFULm6pxfGqjbgQzevi+65M= -github.com/open-telemetry/otel-arrow/collector v0.22.0 h1:lHFjzkh5PbsiW8B63SRntnP9W7bLCXV9lslO4zI0s/Y= -github.com/open-telemetry/otel-arrow/collector v0.22.0/go.mod h1:R7hRwuGDxoGLB27dkJUFKDK7mGG7Yb02ODnLHx8Whis= github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -105,6 +110,8 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc= +github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/collector/receiver/otelarrowreceiver/internal/arrow/arrow.go b/collector/receiver/otelarrowreceiver/internal/arrow/arrow.go index a058ec82..6fbafb57 100644 --- a/collector/receiver/otelarrowreceiver/internal/arrow/arrow.go +++ b/collector/receiver/otelarrowreceiver/internal/arrow/arrow.go @@ -8,7 +8,11 @@ import ( "errors" "fmt" "io" + "net" + "runtime" + "strconv" "strings" + "sync" arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1" "github.com/open-telemetry/otel-arrow/collector/netstats" @@ -37,6 +41,9 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" + + "github.com/open-telemetry/otel-arrow/collector/admission" ) const ( @@ -75,6 +82,9 @@ type Receiver struct { recvInFlightBytes metric.Int64UpDownCounter recvInFlightItems metric.Int64UpDownCounter recvInFlightRequests metric.Int64UpDownCounter + boundedQueue *admission.BoundedQueue + inFlightWG sync.WaitGroup + pendingCh chan batchResp } // New creates a new Receiver reference. @@ -85,19 +95,21 @@ func New( gsettings configgrpc.ServerConfig, authServer auth.Server, newConsumer func() arrowRecord.ConsumerAPI, + bq *admission.BoundedQueue, netReporter netstats.Interface, ) (*Receiver, error) { tracer := set.TelemetrySettings.TracerProvider.Tracer("otel-arrow-receiver") var errors, err error recv := &Receiver{ - Consumers: cs, - obsrecv: obsrecv, - telemetry: set.TelemetrySettings, - tracer: tracer, - authServer: authServer, - newConsumer: newConsumer, - gsettings: gsettings, - netReporter: netReporter, + Consumers: cs, + obsrecv: obsrecv, + telemetry: set.TelemetrySettings, + tracer: tracer, + authServer: authServer, + newConsumer: newConsumer, + gsettings: gsettings, + netReporter: netReporter, + boundedQueue: bq, } meter := recv.telemetry.MeterProvider.Meter(scopeName) @@ -329,10 +341,16 @@ type anyStreamServer interface { grpc.ServerStream } +type batchResp struct { + addr net.Addr + id int64 + err error + bytesToRelease int64 +} + func (r *Receiver) anyStream(serverStream anyStreamServer, method string) (retErr error) { streamCtx := serverStream.Context() ac := r.newConsumer() - hrcv := newHeaderReceiver(serverStream.Context(), r.authServer, r.gsettings.IncludeMetadata) defer func() { if err := recover(); err != nil { @@ -350,48 +368,193 @@ func (r *Receiver) anyStream(serverStream anyStreamServer, method string) (retEr } }() - for { - // Receive a batch corresponding with one ptrace.Traces, pmetric.Metrics, - // or plog.Logs item. - req, err := serverStream.Recv() + doneCtx, doneCancel := context.WithCancel(streamCtx) + streamErrCh := make(chan error, 2) + pendingCh := make(chan batchResp, runtime.NumCPU()) + + go func() { + err := r.srvReceiveLoop(doneCtx, serverStream, pendingCh, method, ac) + streamErrCh <- err + }() + + // WG is used to ensure main thread only returns once sender is finished sending responses for all requests. + var senderWG sync.WaitGroup + senderWG.Add(1) + go func() { + err := r.srvSendLoop(doneCtx, serverStream, pendingCh) + streamErrCh <- err + senderWG.Done() + }() + + select { + case <-doneCtx.Done(): + senderWG.Wait() + return status.Error(codes.Canceled, "server stream shutdown") + case retErr = <-streamErrCh: + doneCancel() + senderWG.Wait() + return + } +} + +func (r *Receiver) recvOne(ctx context.Context, serverStream anyStreamServer, hrcv *headerReceiver, pendingCh chan<- batchResp, method string, ac arrowRecord.ConsumerAPI) (retErr error) { + r.inFlightWG.Add(1) + defer func() { + if retErr != nil { + r.inFlightWG.Done() // not processed + r.logStreamError(retErr) + } + }() + + // Receive a batch corresponding with one ptrace.Traces, pmetric.Metrics, + // or plog.Logs item. + req, err := serverStream.Recv() + + if err != nil { + if errors.Is(err, io.EOF) { + return status.Error(codes.Canceled, "client stream shutdown") + } else if errors.Is(err, context.Canceled) { + return status.Error(codes.Canceled, "server stream shutdown") + } + return err + } + + // Check for optional headers and set the incoming context. + thisCtx, authHdrs, err := hrcv.combineHeaders(ctx, req.GetHeaders()) + if err != nil { + // Failing to parse the incoming headers breaks the stream. + return fmt.Errorf("arrow metadata error: %v", err) + } + var prevAcquiredBytes int + uncompSizeStr, sizeHeaderFound := authHdrs["otlp-pdata-size"] + if !sizeHeaderFound || len(uncompSizeStr) == 0 { + // This is a compressed size so make sure to acquire the difference when request is decompressed. + prevAcquiredBytes = proto.Size(req) + } else { + prevAcquiredBytes, err = strconv.Atoi(uncompSizeStr[0]) if err != nil { - // This includes the case where a client called CloseSend(), in - // which case we see an EOF error here. - r.logStreamError(err) - - if errors.Is(err, io.EOF) { - return status.Error(codes.Canceled, "client stream shutdown") - } else if errors.Is(err, context.Canceled) { - return status.Error(codes.Canceled, "server stream shutdown") + return fmt.Errorf("failed to convert string to request size: %v", err) + } + } + // bounded queue to memory limit based on incoming uncompressed request size and waiters. + // Acquire will fail immediately if there are too many waiters, + // or will otherwise block until timeout or enough memory becomes available. + err = r.boundedQueue.Acquire(ctx, int64(prevAcquiredBytes)) + if err != nil { + return fmt.Errorf("breaking stream: %v", err) + } + + resp := batchResp{ + addr: hrcv.connInfo.Addr, + id: req.GetBatchId(), + bytesToRelease: int64(prevAcquiredBytes), + } + + var authErr error + if r.authServer != nil { + var newCtx context.Context + if newCtx, err = r.authServer.Authenticate(thisCtx, authHdrs); err != nil { + authErr = err + } else { + thisCtx = newCtx + } + } + + // processAndConsume will process and send an error to the sender loop + go func() { + defer r.inFlightWG.Done() // done processing + err = r.processAndConsume(thisCtx, method, ac, req, serverStream, authErr, resp, sizeHeaderFound) + resp.err = err + pendingCh <- resp + }() + + return nil +} + +func (r *Receiver) srvReceiveLoop(ctx context.Context, serverStream anyStreamServer, pendingCh chan<- batchResp, method string, ac arrowRecord.ConsumerAPI) (retErr error) { + hrcv := newHeaderReceiver(ctx, r.authServer, r.gsettings.IncludeMetadata) + for { + select { + case <-ctx.Done(): + return status.Error(codes.Canceled, "server stream shutdown") + default: + err := r.recvOne(ctx, serverStream, hrcv, pendingCh, method, ac) + if err != nil { + return err } - return err } + } +} - // Check for optional headers and set the incoming context. - thisCtx, authHdrs, err := hrcv.combineHeaders(streamCtx, req.GetHeaders()) - if err != nil { - // Failing to parse the incoming headers breaks the stream. - r.telemetry.Logger.Error("arrow metadata error", zap.Error(err)) - return err +func (r *Receiver) sendOne(serverStream anyStreamServer, resp batchResp) error { + // Note: Statuses can be batched, but we do not take + // advantage of this feature. + status := &arrowpb.BatchStatus{ + BatchId: resp.id, + } + if resp.err == nil { + status.StatusCode = arrowpb.StatusCode_OK + } else { + status.StatusMessage = resp.err.Error() + switch { + case errors.Is(resp.err, arrowRecord.ErrConsumerMemoryLimit): + r.telemetry.Logger.Error("arrow resource exhausted", zap.Error(resp.err)) + status.StatusCode = arrowpb.StatusCode_RESOURCE_EXHAUSTED + case consumererror.IsPermanent(resp.err): + r.telemetry.Logger.Error("arrow data error", zap.Error(resp.err)) + status.StatusCode = arrowpb.StatusCode_INVALID_ARGUMENT + default: + r.telemetry.Logger.Debug("arrow consumer error", zap.Error(resp.err)) + status.StatusCode = arrowpb.StatusCode_UNAVAILABLE } + } - var authErr error - if r.authServer != nil { - var newCtx context.Context - if newCtx, err = r.authServer.Authenticate(thisCtx, authHdrs); err != nil { - authErr = err - } else { - thisCtx = newCtx + err := serverStream.Send(status) + if err != nil { + r.logStreamError(err) + return err + } + r.boundedQueue.Release(resp.bytesToRelease) + + return nil +} + +func (r *Receiver) flushSender(serverStream anyStreamServer, pendingCh <-chan batchResp) error { + var err error + // wait for all in flight requests to successfully be processed or fail. + r.inFlightWG.Wait() + for { + select { + case resp := <-pendingCh: + err = r.sendOne(serverStream, resp) + if err != nil { + return err } + default: + // Currently nothing left in pendingCh. + return nil } + } +} - if err := r.processAndConsume(thisCtx, method, ac, req, serverStream, authErr); err != nil { +func (r *Receiver) srvSendLoop(ctx context.Context, serverStream anyStreamServer, pendingCh <-chan batchResp) error { + var err error + for { + select { + case <-ctx.Done(): + err = r.flushSender(serverStream, pendingCh) + // err := multierr.Append(err, ctx.Err()) return err + case resp := <-pendingCh: + err = r.sendOne(serverStream, resp) + if err != nil { + return err + } } } } -func (r *Receiver) processAndConsume(ctx context.Context, method string, arrowConsumer arrowRecord.ConsumerAPI, req *arrowpb.BatchArrowRecords, serverStream anyStreamServer, authErr error) (retErr error) { +func (r *Receiver) processAndConsume(ctx context.Context, method string, arrowConsumer arrowRecord.ConsumerAPI, req *arrowpb.BatchArrowRecords, serverStream anyStreamServer, authErr error, response batchResp, sizeHeaderFound bool) error { var err error ctx, span := r.tracer.Start(ctx, "otel_arrow_stream_recv") @@ -401,9 +564,9 @@ func (r *Receiver) processAndConsume(ctx context.Context, method string, arrowCo defer func() { r.recvInFlightRequests.Add(ctx, -1) // Set span status if an error is returned. - if retErr != nil { + if err != nil { span := trace.SpanFromContext(ctx) - span.SetStatus(otelcodes.Error, retErr.Error()) + span.SetStatus(otelcodes.Error, err.Error()) } }() @@ -412,44 +575,17 @@ func (r *Receiver) processAndConsume(ctx context.Context, method string, arrowCo if authErr != nil { err = authErr } else { - err = r.processRecords(ctx, method, arrowConsumer, req) + err = r.processRecords(ctx, method, arrowConsumer, req, response, sizeHeaderFound) } - // Note: Statuses can be batched, but we do not take - // advantage of this feature. - status := &arrowpb.BatchStatus{ - BatchId: req.GetBatchId(), - } - if err == nil { - status.StatusCode = arrowpb.StatusCode_OK - } else { - status.StatusMessage = err.Error() - switch { - case errors.Is(err, arrowRecord.ErrConsumerMemoryLimit): - r.telemetry.Logger.Error("arrow resource exhausted", zap.Error(err)) - status.StatusCode = arrowpb.StatusCode_RESOURCE_EXHAUSTED - case consumererror.IsPermanent(err): - r.telemetry.Logger.Error("arrow data error", zap.Error(err)) - status.StatusCode = arrowpb.StatusCode_INVALID_ARGUMENT - default: - r.telemetry.Logger.Debug("arrow consumer error", zap.Error(err)) - status.StatusCode = arrowpb.StatusCode_UNAVAILABLE - } - } - - err = serverStream.Send(status) - if err != nil { - r.logStreamError(err) - return err - } - return nil + return err } // processRecords returns an error and a boolean indicating whether // the error (true) was from processing the data (i.e., invalid // argument) or (false) from the consuming pipeline. The boolean is // not used when success (nil error) is returned. -func (r *Receiver) processRecords(ctx context.Context, method string, arrowConsumer arrowRecord.ConsumerAPI, records *arrowpb.BatchArrowRecords) error { +func (r *Receiver) processRecords(ctx context.Context, method string, arrowConsumer arrowRecord.ConsumerAPI, records *arrowpb.BatchArrowRecords, response batchResp, sizeHeaderFound bool) error { payloads := records.GetArrowPayloads() if len(payloads) == 0 { return nil @@ -498,6 +634,15 @@ func (r *Receiver) processRecords(ctx context.Context, method string, arrowConsu r.Metrics().ConsumeMetrics(ctx, metrics), ) } + + acquireErr := r.acquireAdditionalBytes(ctx, uncompSize, response, sizeHeaderFound) + + if acquireErr != nil { + err = multierr.Append(err, acquireErr) + // if acquireAdditionalBytes() failed then the previously acquired bytes were already released i.e. nothing to releaes. + return err + } + // entire request has been processed, decrement counter. r.recvInFlightBytes.Add(ctx, -uncompSize) r.recvInFlightItems.Add(ctx, int64(-numPts)) @@ -529,6 +674,15 @@ func (r *Receiver) processRecords(ctx context.Context, method string, arrowConsu r.Logs().ConsumeLogs(ctx, logs), ) } + + acquireErr := r.acquireAdditionalBytes(ctx, uncompSize, response, sizeHeaderFound) + + if acquireErr != nil { + err = multierr.Append(err, acquireErr) + // if acquireAdditionalBytes() failed then the previously acquired bytes were already released i.e. nothing to releaes. + return err + } + // entire request has been processed, decrement counter. r.recvInFlightBytes.Add(ctx, -uncompSize) r.recvInFlightItems.Add(ctx, int64(-numLogs)) @@ -562,6 +716,14 @@ func (r *Receiver) processRecords(ctx context.Context, method string, arrowConsu ) } + acquireErr := r.acquireAdditionalBytes(ctx, uncompSize, response, sizeHeaderFound) + + if acquireErr != nil { + err = multierr.Append(err, acquireErr) + // if acquireAdditionalBytes() failed then the previously acquired bytes were already released i.e. nothing to releaes. + return err + } + // entire request has been processed, decrement counter. r.recvInFlightBytes.Add(ctx, -uncompSize) r.recvInFlightItems.Add(ctx, int64(-numSpans)) @@ -573,3 +735,39 @@ func (r *Receiver) processRecords(ctx context.Context, method string, arrowConsu return ErrUnrecognizedPayload } } + +func (r *Receiver) acquireAdditionalBytes(ctx context.Context, uncompSize int64, response batchResp, sizeHeaderFound bool) error { + diff := uncompSize - response.bytesToRelease + + var err error + if diff != 0 { + if sizeHeaderFound { + var clientAddr string + if response.addr != nil { + clientAddr = response.addr.String() + } + // a mismatch between header set by exporter and the uncompSize just calculated. + r.telemetry.Logger.Debug("mismatch between uncompressed size in receiver and otlp-pdata-size header", zap.String("client-address", clientAddr), zap.Int("uncompsize", int(uncompSize)), zap.Int("otlp-pdata-size", int(response.bytesToRelease))) + } else if diff < 0 { + // proto.Size() on compressed request was greater than pdata uncompressed size. + r.telemetry.Logger.Debug("uncompressed size is less than compressed size", zap.Int("uncompressed", int(uncompSize)), zap.Int("compressed", int(response.bytesToRelease))) + } + + if diff > 0 { + // diff > 0 means we previously acquired too few bytes initially and we need to correct this. Release previously + // acquired bytes to prevent deadlock and reacquire the uncompressed size we just calculated. + // Note: No need to release and reacquire bytes if diff < 0 because this has less impact and no reason to potentially block + // a request that is in flight by reacquiring the correct size. + r.boundedQueue.Release(response.bytesToRelease) + err = r.boundedQueue.Acquire(ctx, uncompSize) + if err != nil { + response.bytesToRelease = int64(0) + } else { + response.bytesToRelease = uncompSize + } + } + + } + + return err +} diff --git a/collector/receiver/otelarrowreceiver/internal/arrow/arrow_test.go b/collector/receiver/otelarrowreceiver/internal/arrow/arrow_test.go index 76b1792c..530bb35a 100644 --- a/collector/receiver/otelarrowreceiver/internal/arrow/arrow_test.go +++ b/collector/receiver/otelarrowreceiver/internal/arrow/arrow_test.go @@ -9,6 +9,7 @@ import ( "encoding/json" "fmt" "io" + "strconv" "strings" "sync" "testing" @@ -43,9 +44,12 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" + "github.com/open-telemetry/otel-arrow/collector/admission" "github.com/open-telemetry/otel-arrow/collector/receiver/otelarrowreceiver/internal/arrow/mock" ) +var defaultBQ = admission.NewBoundedQueue(int64(100000), int64(10)) + type compareJSONTraces struct{ ptrace.Traces } type compareJSONMetrics struct{ pmetric.Metrics } type compareJSONLogs struct{ plog.Logs } @@ -320,7 +324,7 @@ func (ctc *commonTestCase) newOOMConsumer() arrowRecord.ConsumerAPI { return mock } -func (ctc *commonTestCase) start(newConsumer func() arrowRecord.ConsumerAPI, opts ...func(*configgrpc.ServerConfig, *auth.Server)) { +func (ctc *commonTestCase) start(newConsumer func() arrowRecord.ConsumerAPI, bq *admission.BoundedQueue, opts ...func(*configgrpc.ServerConfig, *auth.Server)) { var authServer auth.Server var gsettings configgrpc.ServerConfig for _, gf := range opts { @@ -344,6 +348,7 @@ func (ctc *commonTestCase) start(newConsumer func() arrowRecord.ConsumerAPI, opt gsettings, authServer, newConsumer, + bq, netstats.Noop{}, ) require.NoError(ctc.T, err) @@ -359,7 +364,113 @@ func requireCanceledStatus(t *testing.T, err error) { require.Equal(t, codes.Canceled, status.Code()) } +func TestBoundedQueueWithPdataHeaders(t *testing.T) { + var sizer ptrace.ProtoMarshaler + stdTesting := otelAssert.NewStdUnitTest(t) + pdataSizeTenTraces := sizer.TracesSize(testdata.GenerateTraces(10)) + defaultBoundedQueueLimit := int64(100000) + tests := []struct { + name string + numTraces int + includePdataHeader bool + pdataSize string + rejected bool + }{ + { + name: "no header compressed greater than uncompressed", + numTraces: 10, + }, + { + name: "no header compressed less than uncompressed", + numTraces: 100, + }, + { + name: "pdata header less than uncompressedSize", + numTraces: 10, + pdataSize: strconv.Itoa(pdataSizeTenTraces / 2), + includePdataHeader: true, + }, + { + name: "pdata header equal uncompressedSize", + numTraces: 10, + pdataSize: strconv.Itoa(pdataSizeTenTraces), + includePdataHeader: true, + }, + { + name: "pdata header greater than uncompressedSize", + numTraces: 10, + pdataSize: strconv.Itoa(pdataSizeTenTraces * 2), + includePdataHeader: true, + }, + { + name: "no header compressed accepted uncompressed rejected", + numTraces: 100, + rejected: true, + }, + { + name: "pdata header accepted uncompressed rejected", + numTraces: 100, + rejected: true, + pdataSize: strconv.Itoa(pdataSizeTenTraces), + includePdataHeader: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tc := healthyTestChannel{} + ctc := newCommonTestCase(t, tc) + + td := testdata.GenerateTraces(tt.numTraces) + batch, err := ctc.testProducer.BatchArrowRecordsFromTraces(td) + require.NoError(t, err) + if tt.includePdataHeader { + var hpb bytes.Buffer + hpe := hpack.NewEncoder(&hpb) + err := hpe.WriteField(hpack.HeaderField{ + Name: "otlp-pdata-size", + Value: tt.pdataSize, + }) + assert.NoError(t, err) + batch.Headers = make([]byte, hpb.Len()) + copy(batch.Headers, hpb.Bytes()) + } + + var bq *admission.BoundedQueue + if tt.rejected { + ctc.stream.EXPECT().Send(statusUnavailableFor(batch.BatchId, "rejecting request, request size larger than configured limit")).Times(1).Return(fmt.Errorf("rejecting request, request size larger than configured limit")) + // make the boundedqueue limit be slightly less than the uncompressed size + bq = admission.NewBoundedQueue(int64(sizer.TracesSize(td)-100), int64(10)) + } else { + ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(nil) + bq = admission.NewBoundedQueue(defaultBoundedQueueLimit, int64(10)) + } + + ctc.start(ctc.newRealConsumer, bq) + ctc.putBatch(batch, nil) + + if tt.rejected { + ctc.cancel() + } + + select { + case data := <-ctc.consume: + actualTD := data.Data.(ptrace.Traces) + otelAssert.Equiv(stdTesting, []json.Marshaler{ + compareJSONTraces{td}, + }, []json.Marshaler{ + compareJSONTraces{actualTD}, + }) + err = ctc.cancelAndWait() + requireCanceledStatus(t, err) + case err = <-ctc.streamErr: + requireCanceledStatus(t, err) + } + }) + } +} + func TestReceiverTraces(t *testing.T) { + stdTesting := otelAssert.NewStdUnitTest(t) tc := healthyTestChannel{} ctc := newCommonTestCase(t, tc) @@ -369,10 +480,14 @@ func TestReceiverTraces(t *testing.T) { ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(nil) - ctc.start(ctc.newRealConsumer) + ctc.start(ctc.newRealConsumer, defaultBQ) ctc.putBatch(batch, nil) - assert.EqualValues(t, td, (<-ctc.consume).Data) + otelAssert.Equiv(stdTesting, []json.Marshaler{ + compareJSONTraces{td}, + }, []json.Marshaler{ + compareJSONTraces{(<-ctc.consume).Data.(ptrace.Traces)}, + }) err = ctc.cancelAndWait() requireCanceledStatus(t, err) @@ -388,7 +503,7 @@ func TestReceiverLogs(t *testing.T) { ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(nil) - ctc.start(ctc.newRealConsumer) + ctc.start(ctc.newRealConsumer, defaultBQ) ctc.putBatch(batch, nil) assert.EqualValues(t, []json.Marshaler{compareJSONLogs{ld}}, []json.Marshaler{compareJSONLogs{(<-ctc.consume).Data.(plog.Logs)}}) @@ -408,7 +523,7 @@ func TestReceiverMetrics(t *testing.T) { ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(nil) - ctc.start(ctc.newRealConsumer) + ctc.start(ctc.newRealConsumer, defaultBQ) ctc.putBatch(batch, nil) otelAssert.Equiv(stdTesting, []json.Marshaler{ @@ -425,7 +540,7 @@ func TestReceiverRecvError(t *testing.T) { tc := healthyTestChannel{} ctc := newCommonTestCase(t, tc) - ctc.start(ctc.newRealConsumer) + ctc.start(ctc.newRealConsumer, defaultBQ) ctc.putBatch(nil, fmt.Errorf("test recv error")) @@ -444,7 +559,7 @@ func TestReceiverSendError(t *testing.T) { ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(fmt.Errorf("test send error")) - ctc.start(ctc.newRealConsumer) + ctc.start(ctc.newRealConsumer, defaultBQ) ctc.putBatch(batch, nil) assert.EqualValues(t, ld, (<-ctc.consume).Data) @@ -485,7 +600,7 @@ func TestReceiverConsumeError(t *testing.T) { ctc.stream.EXPECT().Send(statusUnavailableFor(batch.BatchId, "consumer unhealthy")).Times(1).Return(nil) - ctc.start(ctc.newRealConsumer) + ctc.start(ctc.newRealConsumer, defaultBQ) ctc.putBatch(batch, nil) @@ -544,7 +659,7 @@ func TestReceiverInvalidData(t *testing.T) { ctc.stream.EXPECT().Send(statusInvalidFor(batch.BatchId, "Permanent error: test invalid error")).Times(1).Return(nil) - ctc.start(ctc.newErrorConsumer) + ctc.start(ctc.newErrorConsumer, defaultBQ) ctc.putBatch(batch, nil) err = ctc.cancelAndWait() @@ -581,7 +696,7 @@ func TestReceiverMemoryLimit(t *testing.T) { ctc.stream.EXPECT().Send(statusExhaustedFor(batch.BatchId, "Permanent error: test oom error "+arrowRecord.ErrConsumerMemoryLimit.Error())).Times(1).Return(nil) - ctc.start(ctc.newOOMConsumer) + ctc.start(ctc.newOOMConsumer, defaultBQ) ctc.putBatch(batch, nil) err = ctc.cancelAndWait() @@ -618,6 +733,7 @@ func copyBatch(in *arrowpb.BatchArrowRecords) *arrowpb.BatchArrowRecords { func TestReceiverEOF(t *testing.T) { tc := healthyTestChannel{} ctc := newCommonTestCase(t, tc) + stdTesting := otelAssert.NewStdUnitTest(t) // send a sequence of data then simulate closing the connection. const times = 10 @@ -627,7 +743,7 @@ func TestReceiverEOF(t *testing.T) { ctc.stream.EXPECT().Send(gomock.Any()).Times(times).Return(nil) - ctc.start(ctc.newRealConsumer) + ctc.start(ctc.newRealConsumer, defaultBQ) go func() { for i := 0; i < times; i++ { @@ -658,7 +774,15 @@ func TestReceiverEOF(t *testing.T) { actualData = append(actualData, (<-ctc.consume).Data.(ptrace.Traces)) } - assert.EqualValues(t, expectData, actualData) + assert.Equal(t, len(expectData), len(actualData)) + + for i := 0; i < len(expectData); i++ { + otelAssert.Equiv(stdTesting, []json.Marshaler{ + compareJSONTraces{expectData[i]}, + }, []json.Marshaler{ + compareJSONTraces{actualData[i]}, + }) + } wg.Wait() } @@ -684,7 +808,7 @@ func testReceiverHeaders(t *testing.T, includeMeta bool) { ctc.stream.EXPECT().Send(gomock.Any()).Times(len(expectData)).Return(nil) - ctc.start(ctc.newRealConsumer, func(gsettings *configgrpc.ServerConfig, _ *auth.Server) { + ctc.start(ctc.newRealConsumer, defaultBQ, func(gsettings *configgrpc.ServerConfig, _ *auth.Server) { gsettings.IncludeMetadata = includeMeta }) @@ -756,7 +880,7 @@ func TestReceiverCancel(t *testing.T) { ctc := newCommonTestCase(t, tc) ctc.cancel() - ctc.start(ctc.newRealConsumer) + ctc.start(ctc.newRealConsumer, defaultBQ) err := ctc.wait() requireCanceledStatus(t, err) @@ -1046,7 +1170,7 @@ func testReceiverAuthHeaders(t *testing.T, includeMeta bool, dataAuth bool) { }) var authCall *gomock.Call - ctc.start(ctc.newRealConsumer, func(gsettings *configgrpc.ServerConfig, authPtr *auth.Server) { + ctc.start(ctc.newRealConsumer, defaultBQ, func(gsettings *configgrpc.ServerConfig, authPtr *auth.Server) { gsettings.IncludeMetadata = includeMeta as := mock.NewMockServer(ctc.ctrl) diff --git a/collector/receiver/otelarrowreceiver/otelarrow.go b/collector/receiver/otelarrowreceiver/otelarrow.go index 273f74de..0b81dc30 100644 --- a/collector/receiver/otelarrowreceiver/otelarrow.go +++ b/collector/receiver/otelarrowreceiver/otelarrow.go @@ -24,6 +24,7 @@ import ( "go.uber.org/zap" "google.golang.org/grpc" + "github.com/open-telemetry/otel-arrow/collector/admission" "github.com/open-telemetry/otel-arrow/collector/receiver/otelarrowreceiver/internal/arrow" "github.com/open-telemetry/otel-arrow/collector/receiver/otelarrowreceiver/internal/logs" "github.com/open-telemetry/otel-arrow/collector/receiver/otelarrowreceiver/internal/metrics" @@ -113,6 +114,7 @@ func (r *otelArrowReceiver) startProtocolServers(host component.Host) error { return err } } + bq := admission.NewBoundedQueue(int64(r.cfg.Arrow.AdmissionLimitMiB<<20), r.cfg.Arrow.WaiterLimit) r.arrowReceiver, err = arrow.New(arrow.Consumers(r), r.settings, r.obsrepGRPC, r.cfg.GRPC, authServer, func() arrowRecord.ConsumerAPI { var opts []arrowRecord.Option @@ -124,7 +126,7 @@ func (r *otelArrowReceiver) startProtocolServers(host component.Host) error { opts = append(opts, arrowRecord.WithMeterProvider(r.settings.TelemetrySettings.MeterProvider, r.settings.TelemetrySettings.MetricsLevel)) } return arrowRecord.NewConsumer(opts...) - }, r.netReporter) + }, bq, r.netReporter) if err != nil { return err diff --git a/collector/receiver/otelarrowreceiver/testdata/config.yaml b/collector/receiver/otelarrowreceiver/testdata/config.yaml index 0db44373..77a849af 100644 --- a/collector/receiver/otelarrowreceiver/testdata/config.yaml +++ b/collector/receiver/otelarrowreceiver/testdata/config.yaml @@ -27,3 +27,4 @@ protocols: permit_without_stream: true arrow: memory_limit_mib: 123 + admission_limit_mib: 80