-
Notifications
You must be signed in to change notification settings - Fork 14
/
retry_config.go
149 lines (126 loc) · 4.99 KB
/
retry_config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package plugin
import (
"context"
"fmt"
"log"
"strings"
"github.com/turbot/go-kit/helpers"
)
/*
RetryConfig retries [HydrateFunc] errors.
If a HydrateFunc returns an error in the first attempt but resolves itself in a future attempt, for instance API rate limit or throttling errors, set [plugin.GetConfig.RetryConfig], [plugin.ListConfig.RetryConfig] or [plugin.HydrateConfig.RetryConfig].
For errors common to many HydrateFuncs, you can define a default RetryConfig by setting [plugin.DefaultGetConfig].
Retry errors from a HydrateFunc that has a GetConfig:
Get: &plugin.GetConfig{
RetryConfig: &plugin.RetryConfig{
ShouldRetryError: shouldRetryError,
},
...
},
Retry errors from a HydrateFunc that has a ListConfig:
List: &plugin.ListConfig{
RetryConfig: &plugin.RetryConfig{
ShouldRetryError: shouldRetryError,
},
...
},
Retry errors from a HydrateFunc that has a HydrateConfig:
HydrateConfig: []plugin.HydrateConfig{
RetryConfig: &plugin.RetryConfig{
ShouldRetryError: shouldRetryError,
},
...
},
Retry errors that may occur in many HydrateFuncs:
DefaultIgnoreConfig: &plugin.DefaultIgnoreConfig{
RetryConfig: &plugin.RetryConfig{
ShouldRetryError: shouldRetryError,
},
...
},
*/
type RetryConfig struct {
ShouldRetryErrorFunc ErrorPredicateWithContext
// deprecated use ShouldRetryErrorFunc
ShouldRetryError ErrorPredicate
// Maximum number of retry operation to be performed. Default set to 10.
MaxAttempts int64
// Algorithm for the backoff. Supported values: Fibonacci, Exponential, and Constant. Default set to Fibonacci.
BackoffAlgorithm string
// Starting interval. Default set to 100ms.
RetryInterval int64
// Set a maximum on the duration (in ms) returned from the next backoff.
CappedDuration int64
// Sets a maximum on the total amount of time (in ms) a backoff should execute.
MaxDuration int64
}
func (c *RetryConfig) String() interface{} {
if c.ShouldRetryError != nil {
return fmt.Sprintf("ShouldRetryError: %s", helpers.GetFunctionName(c.ShouldRetryError))
}
if c.ShouldRetryErrorFunc != nil {
return fmt.Sprintf("ShouldRetryErrorFunc: %s", helpers.GetFunctionName(c.ShouldRetryErrorFunc))
}
return ""
}
func (c *RetryConfig) validate(table *Table) []string {
var res []string
validBackoffAlgorithm := []string{"Constant", "Exponential", "Fibonacci"}
var tablePrefix string
if table != nil {
tablePrefix = fmt.Sprintf("table '%s' ", table.Name)
}
if c.ShouldRetryError != nil && c.ShouldRetryErrorFunc != nil {
log.Printf("[TRACE] RetryConfig validate failed - both ShouldRetryError and ShouldRetryErrorFunc are defined")
res = append(res, fmt.Sprintf("%sboth ShouldRetryError and ShouldRetryErrorFunc are defined", tablePrefix))
}
if c.BackoffAlgorithm != "" && !helpers.StringSliceContains(validBackoffAlgorithm, c.BackoffAlgorithm) {
res = append(res, fmt.Sprintf("%sBackoffAlgorithm value '%s' is not valid, it must be one of: %s", tablePrefix, c.BackoffAlgorithm, strings.Join(validBackoffAlgorithm, ",")))
}
return res
}
func (c *RetryConfig) DefaultTo(other *RetryConfig) {
// if not other provided, nothing to do
if other == nil {
return
}
// if either ShouldIgnoreError or ShouldRetryErrorFunc are set, do not default to other
if c.ShouldRetryError != nil || c.ShouldRetryErrorFunc != nil {
log.Printf("[TRACE] RetryConfig DefaultTo: config defines a should retry function so not defaulting to base")
return
}
// legacy func
if c.ShouldRetryError == nil && other.ShouldRetryError != nil {
log.Printf("[TRACE] RetryConfig DefaultTo: using base ShouldRetryError: %s", helpers.GetFunctionName(other.ShouldRetryError))
c.ShouldRetryError = other.ShouldRetryError
}
if c.ShouldRetryErrorFunc == nil && other.ShouldRetryErrorFunc != nil {
log.Printf("[TRACE] RetryConfig DefaultTo: using base ShouldRetryErrorFunc: %s", helpers.GetFunctionName(other.ShouldRetryErrorFunc))
c.ShouldRetryErrorFunc = other.ShouldRetryErrorFunc
}
}
// GetListRetryConfig wraps the ShouldRetry function with an additional check of the rows streamed
// (as we cannot retry errors in the list hydrate function after streaming has started)
func (c *RetryConfig) GetListRetryConfig() *RetryConfig {
listRetryConfig := &RetryConfig{}
if c.ShouldRetryErrorFunc != nil {
listRetryConfig.ShouldRetryErrorFunc = func(ctx context.Context, d *QueryData, h *HydrateData, err error) bool {
if d.queryStatus.rowsStreamed != 0 {
log.Printf("[TRACE] shouldRetryError we have started streaming rows (%d) - return false", d.queryStatus.rowsStreamed)
return false
}
res := c.ShouldRetryErrorFunc(ctx, d, h, err)
return res
}
} else if c.ShouldRetryError != nil {
listRetryConfig.ShouldRetryErrorFunc = func(ctx context.Context, d *QueryData, h *HydrateData, err error) bool {
if d.queryStatus.rowsStreamed != 0 {
log.Printf("[TRACE] shouldRetryError we have started streaming rows (%d) - return false", d.queryStatus.rowsStreamed)
return false
}
// call the legacy function
return c.ShouldRetryError(err)
}
}
return listRetryConfig
}