forked from turbot/steampipe-plugin-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
hydrate_cache.go
197 lines (159 loc) · 6.46 KB
/
hydrate_cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
package plugin
import (
"context"
"fmt"
"log"
"reflect"
"sync"
"time"
"github.com/turbot/go-kit/helpers"
)
// pointer to (all) memoized functions
// lazily populated, use for isMemoized
var memoizedFuncPtr uintptr
// map of currently executing memoized hydrate funcs
var memoizedHydrateFunctionsPending = make(map[string]*sync.WaitGroup)
var memoizedHydrateLock sync.RWMutex
/*
HydrateFunc is a function that gathers data to build table rows.
Typically this would make an API call and return the raw API output.
List and Get are special hydrate functions.
- List returns data for all rows. Almost all tables will have a List function.
- Get returns data for a single row. In order to filter as cheaply as possible a Get function should be implemented if the API supports fetching single items by key.
A column may require data not returned by the List or Get calls and an additional API
call will be required. A HydrateFunc that wraps this API call can be specified in the [Column] definition.
You could do this the hard way by looping through the List API results and enriching each item
by making an additional API call. However the SDK does all this for you.
*/
type HydrateFunc func(context.Context, *QueryData, *HydrateData) (interface{}, error)
/*
WithCache deprecated: use Memoize
*/
func (f HydrateFunc) WithCache(args ...HydrateFunc) HydrateFunc {
// build a function to return the cache key
getCacheKey := f.getCacheKeyFunction(args)
return f.Memoize(func(o *MemoizeConfiguration) {
o.GetCacheKeyFunc = getCacheKey
})
}
/*
Memoize ensures the [HydrateFunc] results are saved in the [connection.ConnectionCache].
Use it to reduce the number of API calls if the HydrateFunc is used by multiple tables.
NOTE: this should only be used to memoize a function which will be manually invoked and requires caching
It should NOT be used to memoize a hydrate function being passed to a table definition.
*/
func (f HydrateFunc) Memoize(opts ...MemoizeOption) HydrateFunc {
if isMemoized(f) {
log.Printf("[WARN] Memoize %s - already memoized", helpers.GetFunctionName(f))
}
log.Printf("[INFO] Memoize %s", helpers.GetFunctionName(f))
config := newMemoizeConfiguration(f)
for _, o := range opts {
o(config)
}
// build a function to return the cache key
buildCacheKey := config.GetCacheKeyFunc
ttl := config.Ttl
memoizedFunc := func(ctx context.Context, d *QueryData, h *HydrateData) (interface{}, error) {
// build key
k, err := buildCacheKey(ctx, d, h)
if err != nil {
return nil, err
}
cacheKey := k.(string)
// build a key to access the cacheableHydrateFunctionsPending map, which includes the connection
// NOTE: when caching the actual hydrate data, the connection name will also be added to the cache key
// but this happens lower down
// here, we need to add it
executeLockKey := fmt.Sprintf("%s-%s", cacheKey, d.Connection.Name)
// wait until there is no instance of the hydrate function running
// acquire a Read lock on the pending call map
memoizedHydrateLock.RLock()
functionLock, ok := memoizedHydrateFunctionsPending[executeLockKey]
memoizedHydrateLock.RUnlock()
if ok {
// a hydrate function is running - or it has completed
// wait for the function lock
return f.waitForHydrate(ctx, d, h, functionLock, cacheKey, ttl)
}
// so there was no function lock - no pending hydrate so we must execute
// acquire a Write lock
memoizedHydrateLock.Lock()
// check again for pending call (in case another thread got the Write lock first)
functionLock, ok = memoizedHydrateFunctionsPending[executeLockKey]
if ok {
// release Write lock
memoizedHydrateLock.Unlock()
// a hydrate function is running - or it has completed
return f.waitForHydrate(ctx, d, h, functionLock, cacheKey, ttl)
}
// there is no lock for this function, which means it has not been run yet
// create a lock
functionLock = new(sync.WaitGroup)
// lock it
functionLock.Add(1)
// ensure we unlock before return
defer functionLock.Done()
// add to map
memoizedHydrateFunctionsPending[executeLockKey] = functionLock
// and release Write lock
memoizedHydrateLock.Unlock()
log.Printf("[TRACE] Memoize (connection %s, cache key %s) - no pending call found so calling and caching hydrate", d.Connection.Name, cacheKey)
// no call the hydrate function and cache the result
return callAndCacheHydrate(ctx, d, h, f, cacheKey, ttl)
}
log.Printf("[INFO] Memoize %p %s", f, helpers.GetFunctionName(f))
if memoizedFuncPtr == 0 {
memoizedFuncPtr = reflect.ValueOf(memoizedFunc).Pointer()
}
return memoizedFunc
}
func (f HydrateFunc) waitForHydrate(ctx context.Context, d *QueryData, h *HydrateData, functionLock *sync.WaitGroup, cacheKey string, ttl time.Duration) (interface{}, error) {
functionLock.Wait()
// we have the function lock
// so at this point, there is no hydrate function running - we hope the data is in the cache
// (but it may not be - if there was an error)
// look in the cache to see if the data is there
cachedData, ok := d.ConnectionCache.Get(ctx, cacheKey)
if ok {
// we got the data
return cachedData, nil
}
// so there is no cached data - call the hydrate function and cache the result
return callAndCacheHydrate(ctx, d, h, f, cacheKey, ttl)
}
// deprecated
func (f HydrateFunc) getCacheKeyFunction(args []HydrateFunc) HydrateFunc {
var getCacheKey HydrateFunc
switch len(args) {
case 0:
// no argument was supplied - infer cache key from the hydrate function
getCacheKey = func(context.Context, *QueryData, *HydrateData) (interface{}, error) {
return helpers.GetFunctionName(f), nil
}
case 1:
getCacheKey = args[0]
default:
panic("WithCache accepts 0 or 1 argument")
}
return getCacheKey
}
func callAndCacheHydrate(ctx context.Context, d *QueryData, h *HydrateData, hydrate HydrateFunc, cacheKey string, ttl time.Duration) (interface{}, error) {
log.Printf("[TRACE] callAndCacheHydrate (connection %s, cache key %s) ", d.Connection.Name, cacheKey)
// now call the hydrate function
hydrateData, err := hydrate(ctx, d, h)
if err != nil {
// there was an error
return nil, err
}
// so we have a hydrate result - add to the cache
d.ConnectionCache.SetWithTTL(ctx, cacheKey, hydrateData, ttl)
// return the hydrate data
return hydrateData, nil
}
// all memoized functions have the same pointer
// - to determine if a function is memoized, compare the pointer to a memoized function
func isMemoized(hydrateFunc HydrateFunc) bool {
res := reflect.ValueOf(hydrateFunc).Pointer() == memoizedFuncPtr
return res
}