-
Notifications
You must be signed in to change notification settings - Fork 592
/
backoff_strategy_konnect.go
167 lines (138 loc) · 5.34 KB
/
backoff_strategy_konnect.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package adminapi
import (
"bytes"
"encoding/hex"
"fmt"
"net/http"
"strings"
"sync"
"time"
"github.com/jpillora/backoff"
"github.com/kong/go-kong/kong"
"github.com/samber/lo"
"github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/deckerrors"
)
const (
KonnectBackoffInitialInterval = time.Second * 3
KonnectBackoffMaxInterval = time.Minute * 15
KonnectBackoffMultiplier = 2
)
type Clock interface {
Now() time.Time
}
// KonnectBackoffStrategy keeps track of Konnect config push backoffs.
//
// It takes into account:
// - a regular exponential backoff that is incremented on every Update failure,
// - a last failed configuration hash (where we skip Update until a config changes).
//
// It's important to note that KonnectBackoffStrategy can use the latter (config hash)
// because of the nature of the one-directional integration where KIC is the only
// component responsible for populating configuration of Konnect's Control Plane.
// In case that changes in the future (e.g. manual modifications to parts of the
// configuration are allowed on Konnect side for some reason), we might have to
// drop this part of the backoff strategy.
type KonnectBackoffStrategy struct {
b *backoff.Backoff
nextAttempt time.Time
clock Clock
lastFailedConfigHash []byte
lock sync.RWMutex
}
func NewKonnectBackoffStrategy(clock Clock) *KonnectBackoffStrategy {
exponentialBackoff := &backoff.Backoff{
Min: KonnectBackoffInitialInterval,
Max: KonnectBackoffMaxInterval,
Factor: KonnectBackoffMultiplier,
}
exponentialBackoff.Reset()
return &KonnectBackoffStrategy{
b: exponentialBackoff,
clock: clock,
}
}
func (s *KonnectBackoffStrategy) CanUpdate(configHash []byte) (bool, string) {
s.lock.RLock()
defer s.lock.RUnlock()
// The exponential backoff duration is satisfied.
// In case of the first attempt it will be satisfied as s.nextAttempt will be a zero value which is always in the past.
timeLeft := s.nextAttempt.Sub(s.clock.Now())
exponentialBackoffSatisfied := timeLeft <= 0
// The configuration we're attempting to update is not the same faulty config we've already tried pushing.
isTheSameFaultyConfig := s.lastFailedConfigHash != nil && bytes.Equal(s.lastFailedConfigHash, configHash)
// In case both conditions are satisfied, we're good to make an attempt.
if exponentialBackoffSatisfied && !isTheSameFaultyConfig {
return true, ""
}
// Otherwise, we build a human-readable explanation of why the update cannot be performed at this point in time.
return false, s.whyCannotUpdate(timeLeft, isTheSameFaultyConfig)
}
func (s *KonnectBackoffStrategy) RegisterUpdateFailure(err error, configHash []byte) {
s.lock.Lock()
defer s.lock.Unlock()
apiErrs := deckerrors.ExtractAPIErrors(err)
tooManyRequestsErr, isTooManyRequests := lo.Find(apiErrs, func(err *kong.APIError) bool {
return err.Code() == http.StatusTooManyRequests
})
if isTooManyRequests {
s.handleTooManyRequests(tooManyRequestsErr)
return
}
isClientError := lo.ContainsBy(apiErrs, func(err *kong.APIError) bool {
return err.Code() >= 400 && err.Code() < 500
})
if isClientError {
s.handleGenericClientError(configHash)
return
}
// If it's neither of the specific cases above, we just increment the standard exponential backoff.
s.incrementExponentialBackoff()
}
func (s *KonnectBackoffStrategy) RegisterUpdateSuccess() {
s.lock.Lock()
defer s.lock.Unlock()
s.b.Reset()
s.nextAttempt = time.Time{}
s.lastFailedConfigHash = nil
}
func (s *KonnectBackoffStrategy) handleTooManyRequests(tooManyRequestsErr *kong.APIError) {
if details, ok := tooManyRequestsErr.Details().(kong.ErrTooManyRequestsDetails); ok && details.RetryAfter != 0 {
// In case we get 429 with details embedded, we just retry after the suggested Retry-After time.
s.nextAttempt = s.clock.Now().Add(details.RetryAfter)
} else {
// In case the details for 429 are missing, we retry after the standard exponential backoff time.
s.incrementExponentialBackoff()
}
// Despite whether we've got details or not, we prune the last failed config hash to not block update after the
// period we set up above.
s.lastFailedConfigHash = nil
}
func (s *KonnectBackoffStrategy) handleGenericClientError(configHash []byte) {
// We increment the standard exponential backoff time and store the faulty config hash to prevent pushing it again.
s.incrementExponentialBackoff()
s.lastFailedConfigHash = configHash
}
func (s *KonnectBackoffStrategy) incrementExponentialBackoff() {
// Backoff.Duration() call returns backoff time we need to wait until next attempt.
// It also increments the internal attempts counter so the next time we call it, the
// duration will be multiplied accordingly.
timeLeft := s.b.Duration()
// We're storing the exact point in time after which we'll be allowed to perform the next update attempt.
s.nextAttempt = s.clock.Now().Add(timeLeft)
}
func (s *KonnectBackoffStrategy) whyCannotUpdate(
timeLeft time.Duration,
isTheSameFaultyConfig bool,
) string {
var reasons []string
if isTheSameFaultyConfig {
reasons = append(reasons, fmt.Sprintf(
"Config has to be changed: %q hash has already failed to be pushed with a client error",
hex.EncodeToString(s.lastFailedConfigHash),
))
}
if timeLeft > 0 {
reasons = append(reasons, fmt.Sprintf("next attempt allowed in %s", timeLeft))
}
return strings.Join(reasons, ", ")
}