-
Notifications
You must be signed in to change notification settings - Fork 14
/
hydrate_cache.go
100 lines (87 loc) · 3.07 KB
/
hydrate_cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package plugin
import (
"context"
"log"
"sync"
"github.com/turbot/go-kit/helpers"
)
var cacheableHydrateFunctionsPending = make(map[string]*sync.WaitGroup)
var cacheableHydrateLock sync.Mutex
// WithCache is a chainable function which wraps a hydrate call with caching and checks for
// pending execution of the same function
func (hydrate HydrateFunc) WithCache(args ...HydrateFunc) HydrateFunc {
// build a function to return the cache key
getCacheKey := hydrate.getCacheKeyFunction(args)
return func(ctx context.Context, d *QueryData, h *HydrateData) (interface{}, error) {
// build key
k, err := getCacheKey(ctx, d, h)
if err != nil {
return nil, err
}
cacheKey := k.(string)
// wait until there is no instance of the hydrate function running
// get the global lock
cacheableHydrateLock.Lock()
functionLock, ok := cacheableHydrateFunctionsPending[cacheKey]
if ok {
// a hydrate function is running - or it has completed
// unlock the global lock and try to lock the functionLock
cacheableHydrateLock.Unlock()
functionLock.Wait()
// we have the function lock
// so at this point, there is no hydrate function running - we hope the data is in the cache
// (but it may not be - if there was an error)
// look in the cache to see if the data is there
cachedData, ok := d.ConnectionManager.Cache.Get(cacheKey)
if ok {
// we got the data
return cachedData, nil
}
// so there is no cached data - call the hydrate function and cache the result
return callAndCacheHydrate(ctx, d, h, hydrate, cacheKey)
} else {
log.Printf("[TRACE] WithCache no function lock key %s", cacheKey)
// there is no lock for this function, which means it has not been run yet
// create a lock
functionLock = new(sync.WaitGroup)
// lock it
functionLock.Add(1)
// ensure we unlock before return
defer functionLock.Done()
// add to map
cacheableHydrateFunctionsPending[cacheKey] = functionLock
// unlock the global lock
cacheableHydrateLock.Unlock()
log.Printf("[TRACE] WithCache added lock to map key %s", cacheKey)
// no call the hydrate function and cache the result
return callAndCacheHydrate(ctx, d, h, hydrate, cacheKey)
}
}
}
func (hydrate HydrateFunc) getCacheKeyFunction(args []HydrateFunc) HydrateFunc {
var getCacheKey HydrateFunc
switch len(args) {
case 0:
// no argument was supplied - infer cache key from the hydrate function
getCacheKey = func(context.Context, *QueryData, *HydrateData) (interface{}, error) {
return helpers.GetFunctionName(hydrate), nil
}
case 1:
getCacheKey = args[0]
default:
panic("WithCache accepts 0 or 1 argument")
}
return getCacheKey
}
func callAndCacheHydrate(ctx context.Context, d *QueryData, h *HydrateData, hydrate HydrateFunc, cacheKey string) (interface{}, error) {
// now call the hydrate function
hydrateData, err := hydrate(ctx, d, h)
if err != nil {
// there was an error
return nil, err
}
// so we have a hydrate result - add to the cache
d.ConnectionManager.Cache.Set(cacheKey, hydrateData)
// return the hydrate data
return hydrateData, nil
}