Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(otelarrowreceiver): asynchronous stream operations #181

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
9a73c2a
add semaphore files
moh-osman3 Apr 9, 2024
e30c220
add uncompSz to proto defintion and apply semaphore
moh-osman3 Apr 14, 2024
3ec41e4
add AdmissionLimitMiB separate from MemoryLimitMiB, always record unc…
moh-osman3 Apr 16, 2024
8e16255
closer to working
moh-osman3 Apr 18, 2024
2a3fa37
return error in goroutine
moh-osman3 Apr 23, 2024
45f2f9a
refactor solve race, broken tests still
moh-osman3 Apr 23, 2024
3e68d50
almost
moh-osman3 Apr 24, 2024
212fca9
painful but fixed?
moh-osman3 Apr 25, 2024
0d9f180
rm
moh-osman3 Apr 25, 2024
366936e
rebase and fix tests again
moh-osman3 Apr 25, 2024
4c84297
fix race
moh-osman3 Apr 25, 2024
e1b920a
remove unneded package
moh-osman3 Apr 25, 2024
f93a37c
remove replace
moh-osman3 Apr 25, 2024
77364fe
review feedback
moh-osman3 Apr 26, 2024
a8448e4
remove unneeded arg
moh-osman3 Apr 26, 2024
12f1b8c
rm uncompressed_size from proto
moh-osman3 May 1, 2024
6f8ba19
add header, acquire diff
moh-osman3 May 1, 2024
af29366
fix potential deadlock
moh-osman3 May 2, 2024
3b09b6c
unit tests
moh-osman3 May 3, 2024
1258605
use assertEqualUnsortedSpans
moh-osman3 May 3, 2024
25366e0
remove unneeded helper functions and use otelAssert.Equiv
moh-osman3 May 3, 2024
40f7fa9
update changelog
moh-osman3 May 3, 2024
7db9a40
improve readability with response.bytesToRelease
moh-osman3 May 7, 2024
8d45b90
remove deprecated
moh-osman3 May 7, 2024
8ea4949
gofmt
moh-osman3 May 7, 2024
aacba4e
rm newline
moh-osman3 May 7, 2024
b97b100
go mod tidy
moh-osman3 May 7, 2024
e9e2d31
add client address if available
moh-osman3 May 9, 2024
985ab4b
Merge branch 'main' into mohosman/apply-bounded-semaphore
jmacd May 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## Unreleased

- Added a semaphore package to limit bytes admitted and total number of waiters. [#174](https://github.com/open-telemetry/otel-arrow/pull/174)
- Refactor otelarrowreceiver to do stream.Recv, request processing, and stream.Send in separate goroutines. [#181](https://github.com/open-telemetry/otel-arrow/pull/181)

- Add a semaphore package to limit bytes admitted and total number of waiters. [#174](https://github.com/open-telemetry/otel-arrow/pull/174)

## [0.22.0](https://github.com/open-telemetry/otel-arrow/releases/tag/v0.22.0) - 2024-04-16

Expand Down
4 changes: 2 additions & 2 deletions api/experimental/arrow/v1/arrow_service.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion api/experimental/arrow/v1/arrow_service_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion collector/examples/bridge/edge-collector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,4 @@ service:
address: 127.0.0.1:8888
level: detailed
logs:
level: info
level: info
2 changes: 1 addition & 1 deletion collector/examples/bridge/saas-collector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,4 @@ service:
telemetry:
metrics:
address: 127.0.0.1:8889
level: normal
level: normal
2 changes: 1 addition & 1 deletion collector/exporter/otelarrowexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ require (
go.opentelemetry.io/collector/config/configgrpc v0.98.0
go.opentelemetry.io/collector/config/configopaque v1.5.0
go.opentelemetry.io/collector/config/configretry v0.98.0
go.opentelemetry.io/collector/config/configtelemetry v0.98.0
go.opentelemetry.io/collector/config/configtls v0.98.0
go.opentelemetry.io/collector/confmap v0.98.0
go.opentelemetry.io/collector/consumer v0.98.0
Expand Down Expand Up @@ -75,6 +74,7 @@ require (
github.com/x448/float16 v0.8.4 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.opentelemetry.io/collector/config/confignet v0.98.0 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.98.0 // indirect
go.opentelemetry.io/collector/config/internal v0.98.0 // indirect
go.opentelemetry.io/collector/featuregate v1.5.0 // indirect
go.opentelemetry.io/collector/receiver v0.98.0 // indirect
Expand Down
29 changes: 16 additions & 13 deletions collector/exporter/otelarrowexporter/internal/arrow/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ import (
"context"
"errors"
"math/rand"
"strconv"
"sync"
"time"

arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1"
"github.com/open-telemetry/otel-arrow/collector/netstats"
arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
Expand Down Expand Up @@ -285,19 +285,22 @@ func (e *Exporter) SendAndWait(ctx context.Context, data any) (bool, error) {
// exporter, because of the optimization phase performed in the
// conversion to Arrow.
var uncompSize int
if e.telemetry.MetricsLevel > configtelemetry.LevelNormal {
switch data := data.(type) {
case ptrace.Traces:
var sizer ptrace.ProtoMarshaler
uncompSize = sizer.TracesSize(data)
case plog.Logs:
var sizer plog.ProtoMarshaler
uncompSize = sizer.LogsSize(data)
case pmetric.Metrics:
var sizer pmetric.ProtoMarshaler
uncompSize = sizer.MetricsSize(data)
}
switch data := data.(type) {
case ptrace.Traces:
var sizer ptrace.ProtoMarshaler
uncompSize = sizer.TracesSize(data)
case plog.Logs:
var sizer plog.ProtoMarshaler
uncompSize = sizer.LogsSize(data)
case pmetric.Metrics:
var sizer pmetric.ProtoMarshaler
uncompSize = sizer.MetricsSize(data)
}

if md == nil {
md = make(map[string]string)
}
md["otlp-pdata-size"] = strconv.Itoa(uncompSize)
moh-osman3 marked this conversation as resolved.
Show resolved Hide resolved

wri := writeItem{
records: data,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,12 +602,15 @@ func TestArrowExporterHeaders(t *testing.T) {

if times%2 == 1 {
md := metadata.MD{
"expected1": []string{"metadata1"},
"expected2": []string{fmt.Sprint(times)},
"expected1": []string{"metadata1"},
"expected2": []string{fmt.Sprint(times)},
"otlp-pdata-size": []string{"329"},
}
expectOutput = append(expectOutput, md)
} else {
expectOutput = append(expectOutput, nil)
expectOutput = append(expectOutput, metadata.MD{
"otlp-pdata-size": []string{"329"},
})
}

sent, err := tc.exporter.SendAndWait(ctx, input)
Expand Down Expand Up @@ -677,11 +680,14 @@ func TestArrowExporterIsTraced(t *testing.T) {
propagation.TraceContext{}.Inject(ctx, propagation.MapCarrier(expectMap))

md := metadata.MD{
"traceparent": []string{expectMap["traceparent"]},
"traceparent": []string{expectMap["traceparent"]},
"otlp-pdata-size": []string{"329"},
}
expectOutput = append(expectOutput, md)
} else {
expectOutput = append(expectOutput, nil)
expectOutput = append(expectOutput, metadata.MD{
"otlp-pdata-size": []string{"329"},
})
}

sent, err := tc.exporter.SendAndWait(ctx, input)
Expand Down
7 changes: 7 additions & 0 deletions collector/receiver/otelarrowreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ type ArrowConfig struct {
// passing through, they will see ResourceExhausted errors.
MemoryLimitMiB uint64 `mapstructure:"memory_limit_mib"`

AdmissionLimitMiB uint64 `mapstructure:"admission_limit_mib"`

// WaiterLimit is the limit on the number of waiters waiting to be processed and consumed.
// This is a dimension of memory limiting to ensure waiters are not consuming an
// unexpectedly large amount of memory in the arrow receiver.
WaiterLimit int64 `mapstructure:"waiter_limit"`

// Zstd settings apply to OTel-Arrow use of gRPC specifically.
Zstd zstd.DecoderConfig `mapstructure:"zstd"`
}
Expand Down
6 changes: 4 additions & 2 deletions collector/receiver/otelarrowreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ func TestUnmarshalConfig(t *testing.T) {
},
},
Arrow: ArrowConfig{
MemoryLimitMiB: 123,
MemoryLimitMiB: 123,
AdmissionLimitMiB: 80,
},
},
}, cfg)
Expand All @@ -101,7 +102,8 @@ func TestUnmarshalConfigUnix(t *testing.T) {
ReadBufferSize: 512 * 1024,
},
Arrow: ArrowConfig{
MemoryLimitMiB: defaultMemoryLimitMiB,
MemoryLimitMiB: defaultMemoryLimitMiB,
AdmissionLimitMiB: defaultAdmissionLimitMiB,
},
},
}, cfg)
Expand Down
2 changes: 2 additions & 0 deletions collector/receiver/otelarrowreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
defaultGRPCEndpoint = "0.0.0.0:4317"

defaultMemoryLimitMiB = 128
defaultAdmissionLimitMiB = defaultMemoryLimitMiB / 2
)

// NewFactory creates a new OTLP receiver factory.
Expand All @@ -46,6 +47,7 @@ func createDefaultConfig() component.Config {
},
Arrow: ArrowConfig{
MemoryLimitMiB: defaultMemoryLimitMiB,
AdmissionLimitMiB: defaultAdmissionLimitMiB,
},
},
}
Expand Down
6 changes: 5 additions & 1 deletion collector/receiver/otelarrowreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@ require (
go.uber.org/zap v1.27.0
golang.org/x/net v0.24.0
google.golang.org/grpc v1.63.2
google.golang.org/protobuf v1.33.0
)

require (
github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect
github.com/apache/arrow/go/v14 v14.0.2 // indirect
github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc // indirect
github.com/bahlo/generic-list-go v0.2.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/buger/jsonparser v1.1.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc // indirect
Expand All @@ -55,6 +58,7 @@ require (
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
github.com/knadh/koanf/v2 v2.1.1 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand All @@ -66,6 +70,7 @@ require (
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.opentelemetry.io/collector/config/configcompression v1.5.0 // indirect
Expand All @@ -84,6 +89,5 @@ require (
golang.org/x/tools v0.15.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
11 changes: 9 additions & 2 deletions collector/receiver/otelarrowreceiver/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ github.com/apache/arrow/go/v14 v14.0.2 h1:N8OkaJEOfI3mEZt07BIkvo4sC6XDbL+48MBPWO
github.com/apache/arrow/go/v14 v14.0.2/go.mod h1:u3fgh3EdgN/YQ8cVQRguVW3R+seMybFg8QBQ5LU+eBY=
github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc h1:Keo7wQ7UODUaHcEi7ltENhbAK2VgZjfat6mLy03tQzo=
github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc/go.mod h1:k08r+Yj1PRAmuayFiRK6MYuR5Ve4IuZtTfxErMIh0+c=
github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk=
github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/brianvoe/gofakeit/v6 v6.17.0 h1:obbQTJeHfktJtiZzq0Q1bEpsNUs+yHrYlPVWt7BtmJ4=
github.com/brianvoe/gofakeit/v6 v6.17.0/go.mod h1:Ow6qC71xtwm79anlwKRlWZW6zVq9D2XHE4QSSMP/rU8=
github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs=
github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
Expand Down Expand Up @@ -49,6 +53,7 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek=
github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
Expand All @@ -70,6 +75,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw=
github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s=
github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ=
Expand All @@ -84,8 +91,6 @@ github.com/mostynb/go-grpc-compression v1.2.2/go.mod h1:GOCr2KBxXcblCuczg3YdLQlc
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/open-telemetry/otel-arrow v0.22.0 h1:G1jgtqAM2ho5pyKQ4tyrDzk9Y0VcJ+GZQRJgN26vRlI=
github.com/open-telemetry/otel-arrow v0.22.0/go.mod h1:F50XFaiNfkfB0MYftZIUKFULm6pxfGqjbgQzevi+65M=
github.com/open-telemetry/otel-arrow/collector v0.22.0 h1:lHFjzkh5PbsiW8B63SRntnP9W7bLCXV9lslO4zI0s/Y=
github.com/open-telemetry/otel-arrow/collector v0.22.0/go.mod h1:R7hRwuGDxoGLB27dkJUFKDK7mGG7Yb02ODnLHx8Whis=
github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ=
github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand All @@ -105,6 +110,8 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc=
github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw=
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
Loading
Loading