-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
client.go
151 lines (133 loc) · 4.28 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
/*
Copyright 2021 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package throttle
import (
"context"
"net/http"
"sync"
"sync/atomic"
"time"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
)
const (
throttleCheckDuration = 250 * time.Millisecond
)
var throttleTicks int64
var throttleInit sync.Once
func initThrottleTicker() {
throttleInit.Do(func() {
go func() {
tick := time.NewTicker(throttleCheckDuration)
defer tick.Stop()
for range tick.C {
atomic.AddInt64(&throttleTicks, 1)
}
}()
})
}
// Client construct is used by apps who wish to consult with a throttler. It encapsulates the check/throttling/backoff logic
type Client struct {
throttler *Throttler
appName throttlerapp.Name
checkType ThrottleCheckType
flags CheckFlags
lastSuccessfulThrottleMu sync.Mutex
lastSuccessfulThrottle int64
}
// NewProductionClient creates a client suitable for foreground/production jobs, which have normal priority.
func NewProductionClient(throttler *Throttler, appName throttlerapp.Name, checkType ThrottleCheckType) *Client {
initThrottleTicker()
return &Client{
throttler: throttler,
appName: appName,
checkType: checkType,
flags: CheckFlags{
LowPriority: false,
},
}
}
// NewBackgroundClient creates a client suitable for background jobs, which have low priority over production traffic,
// e.g. migration, table pruning, vreplication
func NewBackgroundClient(throttler *Throttler, appName throttlerapp.Name, checkType ThrottleCheckType) *Client {
initThrottleTicker()
return &Client{
throttler: throttler,
appName: appName,
checkType: checkType,
flags: CheckFlags{
LowPriority: true,
},
}
}
// ThrottleCheckOK checks the throttler, and returns 'true' when the throttler is satisfied.
// It does not sleep.
// The function caches results for a brief amount of time, hence it's safe and efficient to
// be called very frequenty.
// The function is not thread safe.
func (c *Client) ThrottleCheckOK(ctx context.Context, overrideAppName throttlerapp.Name) (throttleCheckOK bool) {
if c == nil {
// no client
return true
}
if c.throttler == nil {
// no throttler
return true
}
c.lastSuccessfulThrottleMu.Lock()
defer c.lastSuccessfulThrottleMu.Unlock()
if c.lastSuccessfulThrottle >= atomic.LoadInt64(&throttleTicks) {
// if last check was OK just very recently there is no need to check again
return true
}
// It's time to run a throttler check
checkApp := c.appName
if overrideAppName != "" {
checkApp = overrideAppName
}
checkResult := c.throttler.CheckByType(ctx, checkApp.String(), "", &c.flags, c.checkType)
if checkResult.StatusCode != http.StatusOK {
return false
}
c.lastSuccessfulThrottle = atomic.LoadInt64(&throttleTicks)
return true
}
// ThrottleCheckOKOrWait checks the throttler; if throttler is satisfied, the function returns 'true' mmediately,
// otherwise it briefly sleeps and returns 'false'.
// Non-empty appName overrides the default appName.
// The function is not thread safe.
func (c *Client) ThrottleCheckOKOrWaitAppName(ctx context.Context, appName throttlerapp.Name) bool {
ok := c.ThrottleCheckOK(ctx, appName)
if !ok {
time.Sleep(throttleCheckDuration)
}
return ok
}
// ThrottleCheckOKOrWait checks the throttler; if throttler is satisfied, the function returns 'true' mmediately,
// otherwise it briefly sleeps and returns 'false'.
// The function is not thread safe.
func (c *Client) ThrottleCheckOKOrWait(ctx context.Context) bool {
return c.ThrottleCheckOKOrWaitAppName(ctx, "")
}
// Throttle throttles until the throttler is satisfied, or until context is cancelled.
// The function sleeps between throttle checks.
// The function is not thread safe.
func (c *Client) Throttle(ctx context.Context) {
for {
if ctx.Err() != nil {
return
}
if c.ThrottleCheckOKOrWait(ctx) {
return
}
}
}