Skip to content

Commit

Permalink
Add max_retries added to the retry processor
Browse files Browse the repository at this point in the history
Also add `retry_count` and `backoff_duration` metadata fields.

Fixes #2544.

Signed-off-by: Mihai Todor <todormihai@gmail.com>
  • Loading branch information
mihaitodor committed May 17, 2024
1 parent 279e25b commit 6f757c9
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 8 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@ Changelog

All notable changes to this project will be documented in this file.

## 4.28.0 - TBD

### Added

- Field `max_retries` added to the `retry` processor
- Metadata fields `retry_count` and `backoff_duration` added to the `retry` processor.

## 4.27.0 - 2024-04-23

### Added
Expand Down
52 changes: 44 additions & 8 deletions internal/impl/pure/processor_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
rpFieldProcessors = "processors"
rpFieldBackoff = "backoff"
rpFieldParallel = "parallel"
rpFieldMaxRetries = "max_retries"
)

func retryProcSpec() *service.ConfigSpec {
Expand All @@ -36,6 +37,14 @@ By default the retry backoff has a specified `+"[`max_elapsed_time`](#backoffmax
In order to avoid permanent loops any error associated with messages as they first enter a retry processor will be cleared.
### Metadata
This processor adds the following metadata fields to each message:
`+"```"+`
- retry_count - The number of retry attempts.
- backoff_duration - The total time elapsed while performing retries.
`+"```"+`
:::caution Batching
If you wish to wrap a batch-aware series of processors then take a look at the [batching section](#batching) below.
:::
Expand Down Expand Up @@ -97,6 +106,9 @@ output:
service.NewBoolField(rpFieldParallel).
Description("When processing batches of messages these batches are ignored and the processors apply to each message sequentially. However, when this field is set to `true` each message will be processed in parallel. Caution should be made to ensure that batch sizes do not surpass a point where this would cause resource (CPU, memory, API limits) contention.").
Default(false),
service.NewIntField(rpFieldMaxRetries).
Description("The maximum number of retry attempts before the request is aborted. Setting this value to `0` will result in unbounded number of retries.").
Default(0),
)
}

Expand Down Expand Up @@ -128,6 +140,10 @@ func init() {
return nil, err
}

if p.maxRetries, err = conf.FieldInt(rpFieldMaxRetries); err != nil {
return nil, err
}

return interop.NewUnwrapInternalBatchProcessor(processor.NewAutoObservedBatchedProcessor("retry", p, mgr)), nil
})
if err != nil {
Expand All @@ -136,10 +152,11 @@ func init() {
}

type retryProc struct {
children []processor.V1
boff *backoff.ExponentialBackOff
parallel bool
log log.Modular
children []processor.V1
boff *backoff.ExponentialBackOff
parallel bool
maxRetries int
log log.Modular
}

func (r *retryProc) ProcessBatch(ctx *processor.BatchProcContext, msgs message.Batch) ([]message.Batch, error) {
Expand Down Expand Up @@ -184,16 +201,28 @@ func (r *retryProc) ProcessBatch(ctx *processor.BatchProcContext, msgs message.B
return []message.Batch{resMsg}, nil
}

func (r *retryProc) dispatchMessage(ctx context.Context, p *message.Part) ([]message.Batch, error) {
func (r *retryProc) dispatchMessage(ctx context.Context, p *message.Part) (resBatches []message.Batch, err error) {
// NOTE: We always ensure we start off with a copy of the reference backoff.
boff := *r.boff
boff.Reset()

retries := 0
var backoffDuration time.Duration

defer func() {
for _, b := range resBatches {
for _, m := range b {
m.MetaSetMut("retry_count", retries)
m.MetaSetMut("backoff_duration", backoffDuration)
}
}
}()

// Ensure we do not start off with an error.
p.ErrorSet(nil)

for {
resBatches, err := processor.ExecuteAll(ctx, r.children, message.Batch{p.ShallowCopy()})
resBatches, err = processor.ExecuteAll(ctx, r.children, message.Batch{p.ShallowCopy()})
if err != nil {
return nil, err
}
Expand All @@ -214,13 +243,20 @@ func (r *retryProc) dispatchMessage(ctx context.Context, p *message.Part) ([]mes
return resBatches, nil
}

retries++
if retries == r.maxRetries {
r.log.With("error", err).Debug("Error occurred and maximum number of retries was reached.")
return resBatches, nil
}

nextSleep := boff.NextBackOff()
backoffDuration += nextSleep
if nextSleep == backoff.Stop {
r.log.With("error", err).Debug("Error occured and maximum wait period was reached.")
r.log.With("error", err).Debug("Error occurred and maximum wait period was reached.")
return resBatches, nil
}

r.log.With("error", err, "backoff", nextSleep).Debug("Error occured, sleeping for next backoff period.")
r.log.With("error", err, "backoff", nextSleep).Debug("Error occurred, sleeping for next backoff period.")
select {
case <-time.After(nextSleep):
case <-ctx.Done():
Expand Down
60 changes: 60 additions & 0 deletions internal/impl/pure/processor_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ retry:

var resMsgs []string
for _, m := range resBatches[0] {
retryCount, ok := m.MetaGetMut("retry_count")
require.True(t, ok)
assert.Equal(t, 0, retryCount)

backoffDuration, ok := m.MetaGetMut("backoff_duration")
require.True(t, ok)
assert.Equal(t, backoffDuration, time.Duration(0))

resMsgs = append(resMsgs, string(m.AsBytes()))
}
assert.Equal(t, []string{
Expand Down Expand Up @@ -154,6 +162,58 @@ retry:
require.NoError(t, p.Close(context.Background()))
}

func TestRetryMaxRetriesFailure(t *testing.T) {
conf, err := testutil.ProcessorFromYAML(`
retry:
max_retries: 2
processors:
- resource: foo
`)
require.NoError(t, err)

mockMgr := mock.NewManager()

var fooCalls uint32
mockMgr.Processors["foo"] = func(b message.Batch) ([]message.Batch, error) {
b[0].SetBytes([]byte(string(b[0].AsBytes()) + " updated"))
atomic.AddUint32(&fooCalls, 1)
b[0].ErrorSet(errors.New("nope"))
return []message.Batch{
{b[0]},
}, nil
}

p, err := mockMgr.NewProcessor(conf)
require.NoError(t, err)

resBatches, err := p.ProcessBatch(context.Background(), message.Batch{
message.NewPart([]byte("hello world a")),
})
require.NoError(t, err)
require.Len(t, resBatches, 1)
require.Len(t, resBatches[0], 1)

var resMsgs []string
for _, m := range resBatches[0] {
retryCount, ok := m.MetaGetMut("retry_count")
require.True(t, ok)
assert.Equal(t, 2, retryCount)

backoffDuration, ok := m.MetaGetMut("backoff_duration")
require.True(t, ok)
assert.Greater(t, backoffDuration, time.Duration(0))

resMsgs = append(resMsgs, string(m.AsBytes()))
}
assert.Equal(t, []string{
"hello world a updated",
}, resMsgs)

assert.Equal(t, uint32(2), fooCalls)

require.NoError(t, p.Close(context.Background()))
}

func TestRetryParallelErrors(t *testing.T) {
conf, err := testutil.ProcessorFromYAML(`
retry:
Expand Down
17 changes: 17 additions & 0 deletions website/docs/components/processors/retry.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ retry:
max_elapsed_time: 1m
processors: [] # No default (required)
parallel: false
max_retries: 0
```

Executes child processors and if a resulting message is errored then, after a specified backoff period, the same original message will be attempted again through those same processors. If the child processors result in more than one message then the retry mechanism will kick in if _any_ of the resulting messages are errored.
Expand All @@ -42,6 +43,14 @@ By default the retry backoff has a specified [`max_elapsed_time`](#backoffmax_el

In order to avoid permanent loops any error associated with messages as they first enter a retry processor will be cleared.

### Metadata

This processor adds the following metadata fields to each message:
```
- retry_count - The number of retry attempts.
- backoff_duration - The total time elapsed while performing retries.
```

:::caution Batching
If you wish to wrap a batch-aware series of processors then take a look at the [batching section](#batching) below.
:::
Expand Down Expand Up @@ -158,6 +167,14 @@ When processing batches of messages these batches are ignored and the processors
Type: `bool`
Default: `false`

### `max_retries`

The maximum number of retry attempts before the request is aborted. Setting this value to `0` will result in unbounded number of retries.


Type: `int`
Default: `0`

## Batching

When messages are batched the child processors of a retry are executed for each individual message in isolation, performed serially by default but in parallel when the field [`parallel`](#parallel) is set to `true`. This is an intentional limitation of the retry processor and is done in order to ensure that errors are correctly associated with a given input message. Otherwise, the archiving, expansion, grouping, filtering and so on of the child processors could obfuscate this relationship.
Expand Down

0 comments on commit 6f757c9

Please sign in to comment.