Skip to content

Commit

Permalink
try to avoid race in backoff (#584)
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitryRomanov committed Feb 5, 2024
1 parent 77e9347 commit 5a3d4d1
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 22 deletions.
48 changes: 32 additions & 16 deletions pipeline/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (

type RetriableBatcher struct {
outFn RetriableBatcherOutFn
backoff backoff.BackOff
batcher *Batcher
backoffOpts BackoffOpts
onRetryError func(err error)
}

Expand All @@ -19,19 +19,13 @@ type RetriableBatcherOutFn func(*WorkerData, *Batch) error
type BackoffOpts struct {
MinRetention time.Duration
Multiplier float64
AttemptNum uint64
AttemptNum int
}

func NewRetriableBatcher(batcherOpts *BatcherOptions, batcherOutFn RetriableBatcherOutFn, opts BackoffOpts, onError func(err error)) *RetriableBatcher {
boff := GetBackoff(
opts.MinRetention,
opts.Multiplier,
opts.AttemptNum,
)

batcherBackoff := &RetriableBatcher{
outFn: batcherOutFn,
backoff: boff,
backoffOpts: opts,
onRetryError: onError,
}
batcherBackoff.setBatcher(batcherOpts)
Expand All @@ -44,14 +38,36 @@ func (b *RetriableBatcher) setBatcher(batcherOpts *BatcherOptions) {
}

func (b *RetriableBatcher) Out(data *WorkerData, batch *Batch) {
b.backoff.Reset()

err := backoff.Retry(func() error {
return b.outFn(data, batch)
}, b.backoff)
exponentionalBackoff := backoff.ExponentialBackOff{
InitialInterval: b.backoffOpts.MinRetention,
Multiplier: b.backoffOpts.Multiplier,
RandomizationFactor: 0.5,
MaxInterval: backoff.DefaultMaxInterval,
MaxElapsedTime: backoff.DefaultMaxElapsedTime,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}
exponentionalBackoff.Reset()

if err != nil {
b.onRetryError(err)
var timer *time.Timer
numTries := 0
for {
err := b.outFn(data, batch)
if err == nil {
return
}
next := exponentionalBackoff.NextBackOff()
if next == backoff.Stop || (b.backoffOpts.AttemptNum >= 0 && numTries > b.backoffOpts.AttemptNum) {
b.onRetryError(err)
return
}
numTries++
if timer == nil {
timer = time.NewTimer(next)
} else {
timer.Reset(next)
}
<-timer.C
}
}

Expand Down
2 changes: 1 addition & 1 deletion plugin/output/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
backoffOpts := pipeline.BackoffOpts{
MinRetention: p.config.Retention_,
Multiplier: float64(p.config.RetentionExponentMultiplier),
AttemptNum: uint64(p.config.Retry),
AttemptNum: p.config.Retry,
}

onError := func(err error) {
Expand Down
2 changes: 1 addition & 1 deletion plugin/output/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
backoffOpts := pipeline.BackoffOpts{
MinRetention: p.config.Retention_,
Multiplier: float64(p.config.RetentionExponentMultiplier),
AttemptNum: uint64(p.config.Retry),
AttemptNum: p.config.Retry,
}

onError := func(err error) {
Expand Down
2 changes: 1 addition & 1 deletion plugin/output/gelf/gelf.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
backoffOpts := pipeline.BackoffOpts{
MinRetention: p.config.Retention_,
Multiplier: float64(p.config.RetentionExponentMultiplier),
AttemptNum: uint64(p.config.Retry),
AttemptNum: p.config.Retry,
}

onError := func(err error) {
Expand Down
2 changes: 1 addition & 1 deletion plugin/output/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
backoffOpts := pipeline.BackoffOpts{
MinRetention: p.config.Retention_,
Multiplier: float64(p.config.RetentionExponentMultiplier),
AttemptNum: uint64(p.config.Retry),
AttemptNum: p.config.Retry,
}

onError := func(err error) {
Expand Down
2 changes: 1 addition & 1 deletion plugin/output/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
backoffOpts := pipeline.BackoffOpts{
MinRetention: p.config.Retention_,
Multiplier: float64(p.config.RetentionExponentMultiplier),
AttemptNum: uint64(p.config.Retry),
AttemptNum: p.config.Retry,
}

onError := func(err error) {
Expand Down
2 changes: 1 addition & 1 deletion plugin/output/splunk/splunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
backoffOpts := pipeline.BackoffOpts{
MinRetention: p.config.Retention_,
Multiplier: float64(p.config.RetentionExponentMultiplier),
AttemptNum: uint64(p.config.Retry),
AttemptNum: p.config.Retry,
}

onError := func(err error) {
Expand Down

0 comments on commit 5a3d4d1

Please sign in to comment.