generated from overmindtech/source-template
/
describe_source.go
424 lines (347 loc) · 12.9 KB
/
describe_source.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
package sources
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"
"github.com/overmindtech/sdp-go"
"github.com/overmindtech/sdpcache"
)
const DefaultCacheDuration = 1 * time.Hour
// DescribeOnlySource Generates a source for AWS APIs that only use a `Describe`
// function for both List and Get operations. EC2 is a good example of this,
// where running Describe with no params returns everything, but params can be
// supplied to reduce the number of results.
type DescribeOnlySource[Input InputType, Output OutputType, ClientStruct ClientStructType, Options OptionsType] struct {
MaxResultsPerPage int32 // Max results per page when making API queries
ItemType string // The type of items that will be returned
CacheDuration time.Duration // How long to cache items for
cache *sdpcache.Cache // The sdpcache of this source
cacheInitMu sync.Mutex // Mutex to ensure cache is only initialised once
// The function that should be used to describe the resources that this
// source is related to
DescribeFunc func(ctx context.Context, client ClientStruct, input Input) (Output, error)
// A function that returns the input object that will be passed to
// DescribeFunc for a GET request
InputMapperGet func(scope, query string) (Input, error)
// A function that returns the input object that will be passed to
// DescribeFunc for a LIST request
InputMapperList func(scope string) (Input, error)
// A function that maps a search query to the required input. If this is
// unset then a search request will default to searching by ARN
InputMapperSearch func(ctx context.Context, client ClientStruct, scope string, query string) (Input, error)
// A function that returns a paginator for this API. If this is nil, we will
// assume that the API is not paginated e.g.
// https://aws.github.io/aws-sdk-go-v2/docs/making-requests/#using-paginators
PaginatorBuilder func(client ClientStruct, params Input) Paginator[Output, Options]
// A function that returns a slice of items for a given output. The scope
// and input are passed in on order to assist in creating the items if
// needed, but primarily this function should iterate over the output and
// create new items for each result
OutputMapper func(ctx context.Context, client ClientStruct, scope string, input Input, output Output) ([]*sdp.Item, error)
// The region that this source is configured in, each source can only be
// configured for one region. Getting data from many regions requires a
// source per region. This is used in the scope of returned resources
Region string
// AccountID The id of the account that is being used. This is used by
// sources as the first element in the scope
AccountID string
// Client The AWS client to use when making requests
Client ClientStruct
// UseListForGet If true, the source will use the List function to get items
// This option should be used when the Describe function does not support
// getting a single item by ID. The source will then filter the items
// itself.
// InputMapperGet should still be defined. It will be used to create the
// input for the List function. The output of the List function will be
// filtered by the source to find the item with the matching ID.
// See the directconnect-virtual-gateway source for an example of this.
UseListForGet bool
}
// Returns the duration that items should be cached for. This will use the
// `CacheDuration` for this source if set, otherwise it will use the default
// duration of 1 hour
func (s *DescribeOnlySource[Input, Output, ClientStruct, Options]) cacheDuration() time.Duration {
if s.CacheDuration == 0 {
return DefaultCacheDuration
}
return s.CacheDuration
}
func (s *DescribeOnlySource[Input, Output, ClientStruct, Options]) ensureCache() {
s.cacheInitMu.Lock()
defer s.cacheInitMu.Unlock()
if s.cache == nil {
s.cache = sdpcache.NewCache()
}
}
func (s *DescribeOnlySource[Input, Output, ClientStruct, Options]) Cache() *sdpcache.Cache {
s.ensureCache()
return s.cache
}
// Validate Checks that the source is correctly set up and returns an error if
// not
func (s *DescribeOnlySource[Input, Output, ClientStruct, Options]) Validate() error {
if s.DescribeFunc == nil {
return errors.New("source describe func is nil")
}
if s.MaxResultsPerPage == 0 {
s.MaxResultsPerPage = DefaultMaxResultsPerPage
}
if s.InputMapperGet == nil {
return errors.New("source get input mapper is nil")
}
if s.OutputMapper == nil {
return errors.New("source output mapper is nil")
}
return nil
}
// Paginated returns whether or not this source is using a paginated API
func (s *DescribeOnlySource[Input, Output, ClientStruct, Options]) Paginated() bool {
return s.PaginatorBuilder != nil
}
func (s *DescribeOnlySource[Input, Output, ClientStruct, Options]) Type() string {
return s.ItemType
}
func (s *DescribeOnlySource[Input, Output, ClientStruct, Options]) Name() string {
return fmt.Sprintf("%v-source", s.ItemType)
}
// List of scopes that this source is capable of find items for. This will be
// in the format {accountID}.{region}
func (s *DescribeOnlySource[Input, Output, ClientStruct, Options]) Scopes() []string {
return []string{
FormatScope(s.AccountID, s.Region),
}
}
// Get Get a single item with a given scope and query. The item returned
// should have a UniqueAttributeValue that matches the `query` parameter. The
// ctx parameter contains a golang context object which should be used to allow
// this source to timeout or be cancelled when executing potentially
// long-running actions
func (s *DescribeOnlySource[Input, Output, ClientStruct, Options]) Get(ctx context.Context, scope string, query string, ignoreCache bool) (*sdp.Item, error) {
if scope != s.Scopes()[0] {
return nil, &sdp.QueryError{
ErrorType: sdp.QueryError_NOSCOPE,
ErrorString: fmt.Sprintf("requested scope %v does not match source scope %v", scope, s.Scopes()[0]),
}
}
var input Input
var output Output
var err error
var items []*sdp.Item
err = s.Validate()
if err != nil {
return nil, WrapAWSError(err)
}
s.ensureCache()
cacheHit, ck, cachedItems, qErr := s.cache.Lookup(ctx, s.Name(), sdp.QueryMethod_GET, scope, s.ItemType, query, ignoreCache)
if qErr != nil {
return nil, qErr
}
if cacheHit {
if len(cachedItems) > 0 {
return cachedItems[0], nil
} else {
return nil, nil
}
}
// Get the input object
input, err = s.InputMapperGet(scope, query)
if err != nil {
err = s.processError(err, ck)
return nil, err
}
// Call the API using the object
output, err = s.DescribeFunc(ctx, s.Client, input)
if err != nil {
err = s.processError(err, ck)
return nil, err
}
items, err = s.OutputMapper(ctx, s.Client, scope, input, output)
if err != nil {
err = s.processError(err, ck)
return nil, err
}
if s.UseListForGet {
// If we're using List for Get, we need to filter the items ourselves
var filteredItems []*sdp.Item
for _, item := range items {
if item.UniqueAttributeValue() == query {
filteredItems = append(filteredItems, item)
break
}
}
items = filteredItems
}
numItems := len(items)
switch {
case numItems > 1:
itemNames := make([]string, len(items))
// Get the names for logging
for i := range items {
itemNames[i] = items[i].GloballyUniqueName()
}
qErr := &sdp.QueryError{
ErrorType: sdp.QueryError_OTHER,
ErrorString: fmt.Sprintf("Request returned > 1 item for a GET request. Items: %v", strings.Join(itemNames, ", ")),
}
s.cache.StoreError(qErr, s.cacheDuration(), ck)
return nil, qErr
case numItems == 0:
qErr := &sdp.QueryError{
ErrorType: sdp.QueryError_NOTFOUND,
ErrorString: fmt.Sprintf("%v %v not found", s.Type(), query),
}
s.cache.StoreError(qErr, s.cacheDuration(), ck)
return nil, qErr
}
s.cache.StoreItem(items[0], s.cacheDuration(), ck)
return items[0], nil
}
// List Lists all items in a given scope
func (s *DescribeOnlySource[Input, Output, ClientStruct, Options]) List(ctx context.Context, scope string, ignoreCache bool) ([]*sdp.Item, error) {
if scope != s.Scopes()[0] {
return nil, &sdp.QueryError{
ErrorType: sdp.QueryError_NOSCOPE,
ErrorString: fmt.Sprintf("requested scope %v does not match source scope %v", scope, s.Scopes()[0]),
}
}
if s.InputMapperList == nil {
return nil, &sdp.QueryError{
ErrorType: sdp.QueryError_NOTFOUND,
ErrorString: fmt.Sprintf("list is not supported for %v resources", s.ItemType),
}
}
err := s.Validate()
if err != nil {
return nil, WrapAWSError(err)
}
s.ensureCache()
cacheHit, ck, cachedItems, qErr := s.cache.Lookup(ctx, s.Name(), sdp.QueryMethod_LIST, scope, s.ItemType, "", ignoreCache)
if qErr != nil {
return nil, qErr
}
if cacheHit {
return cachedItems, nil
}
var items []*sdp.Item
input, err := s.InputMapperList(scope)
if err != nil {
err = s.processError(err, ck)
return nil, err
}
items, err = s.describe(ctx, input, scope)
if err != nil {
err = s.processError(err, ck)
return nil, err
}
for _, item := range items {
s.cache.StoreItem(item, s.cacheDuration(), ck)
}
return items, nil
}
// Search Searches for AWS resources by ARN
func (s *DescribeOnlySource[Input, Output, ClientStruct, Options]) Search(ctx context.Context, scope string, query string, ignoreCache bool) ([]*sdp.Item, error) {
if scope != s.Scopes()[0] {
return nil, &sdp.QueryError{
ErrorType: sdp.QueryError_NOSCOPE,
ErrorString: fmt.Sprintf("requested scope %v does not match source scope %v", scope, s.Scopes()[0]),
}
}
ck := sdpcache.CacheKeyFromParts(s.Name(), sdp.QueryMethod_SEARCH, scope, s.ItemType, query)
if s.InputMapperSearch == nil {
return s.searchARN(ctx, scope, query, ignoreCache)
} else {
return s.searchCustom(ctx, scope, query, ck)
}
}
func (s *DescribeOnlySource[Input, Output, ClientStruct, Options]) searchARN(ctx context.Context, scope string, query string, ignoreCache bool) ([]*sdp.Item, error) {
// Parse the ARN
a, err := ParseARN(query)
if err != nil {
return nil, WrapAWSError(err)
}
if arnScope := FormatScope(a.AccountID, a.Region); arnScope != scope {
return nil, &sdp.QueryError{
ErrorType: sdp.QueryError_NOSCOPE,
ErrorString: fmt.Sprintf("ARN scope %v does not match request scope %v", arnScope, scope),
Scope: scope,
}
}
// this already uses the cache, so needs no extra handling
item, err := s.Get(ctx, scope, a.ResourceID(), ignoreCache)
if err != nil {
return nil, WrapAWSError(err)
}
return []*sdp.Item{item}, nil
}
// searchCustom Runs custom search logic using the `InputMapperSearch` function
func (s *DescribeOnlySource[Input, Output, ClientStruct, Options]) searchCustom(ctx context.Context, scope string, query string, ck sdpcache.CacheKey) ([]*sdp.Item, error) {
input, err := s.InputMapperSearch(ctx, s.Client, scope, query)
if err != nil {
return nil, WrapAWSError(err)
}
items, err := s.describe(ctx, input, scope)
if err != nil {
err = s.processError(err, ck)
return nil, err
}
for _, item := range items {
s.cache.StoreItem(item, s.cacheDuration(), ck)
}
return items, nil
}
// Processes an error returned by the AWS API so that it can be handled by
// Overmind. This includes extracting the correct error type, wrapping in an SDP
// error, and caching that error if it is non-transient (like a 404)
func (s *DescribeOnlySource[Input, Output, ClientStruct, Options]) processError(err error, cacheKey sdpcache.CacheKey) error {
var sdpErr *sdp.QueryError
if err != nil {
sdpErr = WrapAWSError(err)
// Only cache the error if is something that won't be fixed by retrying
if sdpErr.GetErrorType() == sdp.QueryError_NOTFOUND || sdpErr.GetErrorType() == sdp.QueryError_NOSCOPE {
s.cache.StoreError(sdpErr, s.cacheDuration(), cacheKey)
}
}
return sdpErr
}
// describe Runs describe on the given input, intelligently choosing whether to
// run the paginated or unpaginated query
func (s *DescribeOnlySource[Input, Output, ClientStruct, Options]) describe(ctx context.Context, input Input, scope string) ([]*sdp.Item, error) {
var output Output
var err error
var newItems []*sdp.Item
items := make([]*sdp.Item, 0)
if s.Paginated() {
paginator := s.PaginatorBuilder(s.Client, input)
for paginator.HasMorePages() {
output, err = paginator.NextPage(ctx)
if err != nil {
return nil, err
}
newItems, err = s.OutputMapper(ctx, s.Client, scope, input, output)
if err != nil {
return nil, err
}
items = append(items, newItems...)
}
} else {
output, err = s.DescribeFunc(ctx, s.Client, input)
if err != nil {
return nil, err
}
items, err = s.OutputMapper(ctx, s.Client, scope, input, output)
if err != nil {
return nil, err
}
}
return items, nil
}
// Weight Returns the priority weighting of items returned by this source.
// This is used to resolve conflicts where two sources of the same type
// return an item for a GET request. In this instance only one item can be
// seen on, so the one with the higher weight value will win.
func (s *DescribeOnlySource[Input, Output, ClientStruct, Options]) Weight() int {
return 100
}