-
Notifications
You must be signed in to change notification settings - Fork 7
/
besteffort.go
63 lines (51 loc) · 1.19 KB
/
besteffort.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package rpatterns
import (
"context"
"github.com/luno/jettison/errors"
"github.com/luno/jettison/j"
"github.com/luno/jettison/log"
"github.com/luno/reflex"
)
// NewBestEffortConsumer returns a reflex consumer that ignores errors
// after the provided number of retries and therefore eventually
// continues to the next event.
func NewBestEffortConsumer(name string, retries int, fn reflex.ConsumerFunc,
opts ...reflex.ConsumerOption,
) reflex.Consumer {
be := &bestEffort{
name: name,
inner: fn,
retries: retries,
}
return reflex.NewConsumer(name, be.consume, opts...)
}
type bestEffort struct {
name string
inner reflex.ConsumerFunc
retries int
retryID string
retryCount int
}
func (b *bestEffort) consume(ctx context.Context, e *reflex.Event) error {
err := b.inner(ctx, e)
if err != nil {
if b.retryID != e.ID {
b.retryCount = 0
}
b.retryID = e.ID
b.retryCount++
if b.retryCount > b.retries {
b.retryCount = 0
b.retryID = ""
if !reflex.IsExpected(err) {
log.Error(ctx, errors.Wrap(err, "best effort consumer ignoring error"),
j.KS("consumer", b.name))
}
return nil
}
return err
}
b.retryCount = 0
b.retryID = ""
return nil
}