Skip to content

Commit

Permalink
Apply jitter once per lifetime, not once per stream (#199)
Browse files Browse the repository at this point in the history
Fixes #198.
  • Loading branch information
jmacd committed May 22, 2024
1 parent f1d366e commit b3f7682
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 18 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## Unreleased

- Jitter is applied to once per process, not once per stream. [#199](https://github.com/open-telemetry/otel-arrow/pull/199)

## [0.23.0](https://github.com/open-telemetry/otel-arrow/releases/tag/v0.23.0) - 2024-05-09

- Remove the OTel-Arrow exporter FIFO prioritizer. Let "leastloaded" imply least-loaded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"math/rand"
"runtime"
"sort"
"time"
)

// bestOfNPrioritizer is a prioritizer that selects a less-loaded stream to write.
Expand Down Expand Up @@ -42,16 +43,17 @@ type streamSorter struct {

var _ streamPrioritizer = &bestOfNPrioritizer{}

func newBestOfNPrioritizer(dc doneCancel, numChoices, numStreams int, lf loadFunc) (*bestOfNPrioritizer, []*streamWorkState) {
func newBestOfNPrioritizer(dc doneCancel, numChoices, numStreams int, lf loadFunc, maxLifetime time.Duration) (*bestOfNPrioritizer, []*streamWorkState) {
var state []*streamWorkState

// Limit numChoices to the number of streams.
numChoices = min(numStreams, numChoices)

for i := 0; i < numStreams; i++ {
ws := &streamWorkState{
waiters: map[int64]chan<- error{},
toWrite: make(chan writeItem, 1),
maxStreamLifetime: addJitter(maxLifetime),
waiters: map[int64]chan<- error{},
toWrite: make(chan writeItem, 1),
}

state = append(state, ws)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ type Exporter struct {
// prioritizerName the name of a balancer policy.
prioritizerName PrioritizerName

// maxStreamLifetime is a limit on duration for streams. A
// slight "jitter" is applied relative to this value on a
// per-stream basis.
// maxStreamLifetime is a limit on duration for streams.
maxStreamLifetime time.Duration

// disableDowngrade prevents downgrade from occurring, supports
Expand Down Expand Up @@ -156,7 +154,7 @@ func (e *Exporter) Start(ctx context.Context) error {
downCtx, downDc := newDoneCancel(ctx)

var sws []*streamWorkState
e.ready, sws = newStreamPrioritizer(downDc, e.prioritizerName, e.numStreams)
e.ready, sws = newStreamPrioritizer(downDc, e.prioritizerName, e.numStreams, e.maxStreamLifetime)

for _, ws := range sws {
e.startArrowStream(downCtx, ws)
Expand Down Expand Up @@ -236,7 +234,6 @@ func (e *Exporter) runArrowStream(ctx context.Context, dc doneCancel, state *str
producer := e.newProducer()

stream := newStream(producer, e.ready, e.telemetry, e.netReporter, state)
stream.maxStreamLifetime = addJitter(e.maxStreamLifetime)

defer func() {
if err := producer.Close(); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"strconv"
"strings"
"time"

"go.opentelemetry.io/collector/component"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -50,18 +51,18 @@ type streamWriter interface {
sendAndWait(context.Context, <-chan error, writeItem) error
}

func newStreamPrioritizer(dc doneCancel, name PrioritizerName, numStreams int) (streamPrioritizer, []*streamWorkState) {
func newStreamPrioritizer(dc doneCancel, name PrioritizerName, numStreams int, maxLifetime time.Duration) (streamPrioritizer, []*streamWorkState) {
if name == unsetPrioritizer {
name = DefaultPrioritizer
}
if strings.HasPrefix(string(name), llPrefix) {
// error was checked and reported in Validate
n, err := strconv.Atoi(string(name[len(llPrefix):]))
if err == nil {
return newBestOfNPrioritizer(dc, n, numStreams, pendingRequests)
return newBestOfNPrioritizer(dc, n, numStreams, pendingRequests, maxLifetime)
}
}
return newBestOfNPrioritizer(dc, numStreams, numStreams, pendingRequests)
return newBestOfNPrioritizer(dc, numStreams, numStreams, pendingRequests, maxLifetime)
}

// pendingRequests is the load function used by leastloadedN.
Expand Down
5 changes: 5 additions & 0 deletions collector/exporter/otelarrowexporter/internal/arrow/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ type streamWorkState struct {
// prioritizer and a stream.
toWrite chan writeItem

// maxStreamLifetime is a limit on duration for streams. A
// slight "jitter" is applied relative to this value on a
// per-stream basis.
maxStreamLifetime time.Duration

// lock protects waiters
lock sync.Mutex

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func newStreamTestCase(t *testing.T, pname PrioritizerName) *streamTestCase {
producer := arrowRecordMock.NewMockProducerAPI(ctrl)

bg, dc := newDoneCancel(context.Background())
prio, state := newStreamPrioritizer(dc, pname, 1)
prio, state := newStreamPrioritizer(dc, pname, 1, 10*time.Second)

ctc := newCommonTestCase(t, NotNoisy)
cts := ctc.newMockStream(bg)
Expand Down
11 changes: 5 additions & 6 deletions collector/exporter/otelarrowexporter/otelarrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1105,16 +1105,17 @@ func TestSendArrowFailedTraces(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, exp)

defer func() {
assert.NoError(t, exp.Shutdown(context.Background()))
}()

host := componenttest.NewNopHost()
assert.NoError(t, exp.Start(context.Background(), host))

rcv, _ := otelArrowTracesReceiverOnGRPCServer(ln, false)
rcv.startStreamMockArrowTraces(t, failedStatusFor)

defer func() {
assert.NoError(t, exp.Shutdown(context.Background()))
rcv.srv.GracefulStop()
}()

// Delay the server start, slightly.
go func() {
time.Sleep(100 * time.Millisecond)
Expand All @@ -1136,8 +1137,6 @@ func TestSendArrowFailedTraces(t *testing.T) {
assert.EqualValues(t, int32(2), rcv.totalItems.Load())
assert.EqualValues(t, int32(1), rcv.requestCount.Load())
assert.EqualValues(t, td, rcv.getLastRequest())

rcv.srv.GracefulStop()
}

func TestUserDialOptions(t *testing.T) {
Expand Down

0 comments on commit b3f7682

Please sign in to comment.