/
query_data_rate_limiters.go
181 lines (156 loc) · 6 KB
/
query_data_rate_limiters.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
package plugin
import (
"context"
"github.com/turbot/steampipe-plugin-sdk/v5/grpc"
"github.com/turbot/steampipe-plugin-sdk/v5/plugin/quals"
"github.com/turbot/steampipe-plugin-sdk/v5/rate_limiter"
"log"
"time"
)
func (d *QueryData) WaitForListRateLimit(ctx context.Context) {
d.fetchLimiters.wait(ctx)
}
func (d *QueryData) initialiseRateLimiters() {
log.Printf("[INFO] initialiseRateLimiters for query data %p (%s)", d, d.connectionCallId)
// build the base set of scope values used to resolve a rate limiter
d.populateRateLimitScopeValues()
// populate the rate limiters for the fetch call(s) (get/list/parent-list)
d.resolveFetchRateLimiters()
// populate the rate limiters for the hydrate calls
d.resolveHydrateRateLimiters()
}
// resolve the scope values for a given hydrate call
func (d *QueryData) resolveRateLimiterScopeValues(hydrateCallScopeValues map[string]string) map[string]string {
log.Printf("[INFO] resolveRateLimiterScopeValues (%s)", d.connectionCallId)
log.Printf("[INFO] HydrateCall tags %v", hydrateCallScopeValues)
log.Printf("[INFO] Table tags %v", d.Table.Tags)
log.Printf("[INFO] QueryData rateLimiterScopeValues %v", d.rateLimiterScopeValues)
// build list of source value maps which we will merge
// this is in order of DECREASING precedence, i.e. highest first
scopeValueList := []map[string]string{
// static scope values defined by hydrate config
hydrateCallScopeValues,
// static scope values defined by table config
d.Table.Tags,
// scope values for this scan (static and column values)
d.rateLimiterScopeValues,
}
// merge these in precedence order
res := rate_limiter.MergeScopeValues(scopeValueList)
log.Printf("[INFO] merged scope values %v", res)
return res
}
/*
build the base set of scope used to resolve a rate limiter
this will consist of:
- plugin, connection and table name
- quals (with value as string)
*/
func (d *QueryData) populateRateLimitScopeValues() {
d.rateLimiterScopeValues = map[string]string{}
// add the connection
d.rateLimiterScopeValues[rate_limiter.RateLimiterScopeConnection] = d.Connection.Name
// add matrix quals
for column, qualsForColumn := range d.Quals {
if _, isMatrixQual := d.matrixColLookup[column]; isMatrixQual {
for _, qual := range qualsForColumn.Quals {
if qual.Operator == quals.QualOperatorEqual {
qualValueString := grpc.GetQualValueString(qual.Value)
d.rateLimiterScopeValues[column] = qualValueString
}
}
}
}
}
func (d *QueryData) resolveFetchRateLimiters() error {
d.fetchLimiters = &fetchCallRateLimiters{}
// is it a get
if d.FetchType == fetchTypeGet {
return d.resolveGetRateLimiters()
}
// otherwise this is a list
// is there a parent-child hydrate?
if d.Table.List.ParentHydrate != nil {
// it is a parent child list
return d.resolveParentChildRateLimiters()
}
// ok it's just a single level list hydrate
return d.resolveListRateLimiters()
}
func (d *QueryData) resolveGetRateLimiters() error {
// NOTE: RateLimit cannot be nil as it is initialized to an empty struct if needed
getLimiter, err := d.plugin.getHydrateCallRateLimiter(d.Table.Get.Tags, d)
if err != nil {
log.Printf("[WARN] get call %s getHydrateCallRateLimiter failed: %s (%s)", d.Table.Get.namedHydrate.Name, err.Error(), d.connectionCallId)
return err
}
d.fetchLimiters.rateLimiter = getLimiter
return nil
}
func (d *QueryData) resolveParentChildRateLimiters() error {
// NOTE: RateLimit and ParentRateLimit cannot be nil as they are initialized to an empty struct if needed
// resolve the parent hydrate rate limiter
parentRateLimiter, err := d.plugin.getHydrateCallRateLimiter(d.Table.List.ParentTags, d)
if err != nil {
log.Printf("[WARN] resolveParentChildRateLimiters: %s: getHydrateCallRateLimiter failed: %s (%s)", d.Table.List.namedParentHydrate.Name, err.Error(), d.connectionCallId)
return err
}
// assign the parent rate limiter to d.fetchLimiters
d.fetchLimiters.rateLimiter = parentRateLimiter
// resolve the child hydrate rate limiter
childRateLimiter, err := d.plugin.getHydrateCallRateLimiter(d.Table.List.Tags, d)
if err != nil {
log.Printf("[WARN] resolveParentChildRateLimiters: %s: getHydrateCallRateLimiter failed: %s (%s)", d.Table.List.namedHydrate.Name, err.Error(), d.connectionCallId)
return err
}
d.fetchLimiters.childListRateLimiter = childRateLimiter
return nil
}
func (d *QueryData) resolveListRateLimiters() error {
// NOTE: RateLimit cannot be nil as it is initialized to an empty struct if needed
listLimiter, err := d.plugin.getHydrateCallRateLimiter(d.Table.List.Tags, d)
if err != nil {
log.Printf("[WARN] get call %s getHydrateCallRateLimiter failed: %s (%s)", d.Table.Get.namedHydrate.Name, err.Error(), d.connectionCallId)
return err
}
d.fetchLimiters.rateLimiter = listLimiter
return nil
}
func (d *QueryData) setListMetadata(fetchDelay time.Duration) {
fetchMetadata := &hydrateMetadata{
FuncName: d.listHydrate.Name,
RateLimiters: d.fetchLimiters.rateLimiter.LimiterNames(),
ScopeValues: d.fetchLimiters.rateLimiter.ScopeValues,
DelayMs: fetchDelay.Milliseconds(),
}
if d.childHydrate.empty() {
fetchMetadata.Type = string(fetchTypeList)
d.fetchMetadata = fetchMetadata
} else {
d.fetchMetadata = &hydrateMetadata{
Type: string(fetchTypeList),
FuncName: d.childHydrate.Name,
RateLimiters: d.fetchLimiters.childListRateLimiter.LimiterNames(),
ScopeValues: d.fetchLimiters.childListRateLimiter.ScopeValues,
}
fetchMetadata.Type = "parentHydrate"
d.parentHydrateMetadata = fetchMetadata
}
}
func (d *QueryData) setGetLimiterMetadata(fetchDelay time.Duration) {
d.fetchMetadata = &hydrateMetadata{
Type: string(fetchTypeGet),
FuncName: d.Table.Get.namedHydrate.Name,
RateLimiters: d.fetchLimiters.rateLimiter.LimiterNames(),
ScopeValues: d.fetchLimiters.rateLimiter.ScopeValues,
DelayMs: fetchDelay.Milliseconds(),
}
}
func (d *QueryData) resolveHydrateRateLimiters() error {
for _, h := range d.hydrateCalls {
if err := h.initialiseRateLimiter(); err != nil {
return err
}
}
return nil
}