-
Notifications
You must be signed in to change notification settings - Fork 14
/
cacher_result_subscriber.go
125 lines (111 loc) · 3.47 KB
/
cacher_result_subscriber.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package query_cache
import (
"context"
"github.com/turbot/go-kit/helpers"
"github.com/turbot/steampipe-plugin-sdk/v5/error_helpers"
sdkproto "github.com/turbot/steampipe-plugin-sdk/v5/grpc/proto"
"golang.org/x/sync/semaphore"
"log"
"sync"
)
type cacheResultSubscriber struct {
indexItem *IndexItem
req *CacheRequest
streamRowFunc func(row *sdkproto.Row)
queryCache *QueryCache
}
func newCacheResultSubscriber(c *QueryCache, indexItem *IndexItem, req *CacheRequest, streamRowFunc func(row *sdkproto.Row)) *cacheResultSubscriber {
return &cacheResultSubscriber{
indexItem: indexItem,
req: req,
streamRowFunc: streamRowFunc,
queryCache: c,
}
}
func (s *cacheResultSubscriber) waitUntilDone(ctx context.Context) error {
// so we have a cache index, retrieve the item
log.Printf("[INFO] got an index item - try to retrieve rows from cache (%s)", s.req.CallId)
cacheHit := true
var errors []error
errorChan := make(chan (error), s.indexItem.PageCount)
var wg sync.WaitGroup
const maxReadThreads = 5
var maxReadSem = semaphore.NewWeighted(maxReadThreads)
// define streaming function
streamRows := func(cacheResult *sdkproto.QueryResult) {
for _, r := range cacheResult.Rows {
// check for context cancellation
if error_helpers.IsContextCancelledError(ctx.Err()) {
log.Printf("[INFO] getCachedQueryResult context cancelled - returning (%s)", s.req.CallId)
return
}
s.streamRowFunc(r)
}
}
// ok so we have an index item - we now stream
// ensure the first page exists (evictions start with oldest item so if first page exists, they all exist)
pageIdx := 0
pageKey := getPageKey(s.indexItem.Key, pageIdx)
var cacheResult = &sdkproto.QueryResult{}
if err := doGet[*sdkproto.QueryResult](ctx, pageKey, s.queryCache.cache, cacheResult); err != nil {
return err
}
// ok it's there, stream rows
streamRows(cacheResult)
// update page index
pageIdx++
// now fetch the rest (if any), in parallel maxReadThreads at a time
for ; pageIdx < int(s.indexItem.PageCount); pageIdx++ {
maxReadSem.Acquire(ctx, 1)
wg.Add(1)
// construct the page key, _using the index item key as the root_
p := getPageKey(s.indexItem.Key, pageIdx)
go func(pageKey string) {
defer wg.Done()
defer maxReadSem.Release(1)
log.Printf("[TRACE] fetching key: %s", pageKey)
var cacheResult = &sdkproto.QueryResult{}
if err := doGet[*sdkproto.QueryResult](ctx, pageKey, s.queryCache.cache, cacheResult); err != nil {
if IsCacheMiss(err) {
// This is not expected
log.Printf("[WARN] getCachedQueryResult - no item retrieved for cache key %s (%s)", pageKey, s.req.CallId)
} else {
log.Printf("[WARN] cacheGetResult Get failed %v (%s)", err, s.req.CallId)
}
errorChan <- err
return
}
log.Printf("[TRACE] got result: %d rows", len(cacheResult.Rows))
streamRows(cacheResult)
}(p)
}
doneChan := make(chan bool)
go func() {
wg.Wait()
close(doneChan)
}()
for {
select {
case err := <-errorChan:
log.Printf("[WARN] cacheResultSubscriber waitUntilDone received error: %s (%s)", err.Error(), s.req.CallId)
if IsCacheMiss(err) {
cacheHit = false
} else {
errors = append(errors, err)
}
case <-doneChan:
// any real errors return them
if len(errors) > 0 {
return helpers.CombineErrors(errors...)
}
if cacheHit {
// this was a hit - return
s.queryCache.Stats.Hits++
return nil
} else {
s.queryCache.Stats.Misses++
return CacheMissError{}
}
}
}
}