-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
query.go
181 lines (152 loc) · 5.45 KB
/
query.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
/*
Copyright 2021 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package srvtopo
import (
"context"
"fmt"
"sync"
"time"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/log"
)
type queryEntry struct {
// immutable values
key fmt.Stringer
// the mutex protects any access to this structure (read or write)
mutex sync.Mutex
// refreshingChan is used to synchronize requests and avoid hammering
// the topo server
refreshingChan chan struct{}
insertionTime time.Time
lastQueryTime time.Time
value any
lastError error
}
type resilientQuery struct {
query func(ctx context.Context, entry *queryEntry) (any, error)
counts *stats.CountersWithSingleLabel
cacheRefreshInterval time.Duration
cacheTTL time.Duration
mutex sync.Mutex
entries map[string]*queryEntry
}
func (q *resilientQuery) getCurrentValue(ctx context.Context, wkey fmt.Stringer, staleOK bool) (any, error) {
q.counts.Add(queryCategory, 1)
// find the entry in the cache, add it if not there
key := wkey.String()
q.mutex.Lock()
entry, ok := q.entries[key]
if !ok {
entry = &queryEntry{
key: wkey,
}
q.entries[key] = entry
}
q.mutex.Unlock()
// Lock the entry, and do everything holding the lock except
// querying the underlying topo server.
//
// This means that even if the topo server is very slow, two concurrent
// requests will only issue one underlying query.
entry.mutex.Lock()
defer entry.mutex.Unlock()
cacheValid := entry.value != nil && (time.Since(entry.insertionTime) < q.cacheTTL)
if !cacheValid && staleOK {
// Only allow stale results for a bounded period
cacheValid = entry.value != nil && (time.Since(entry.insertionTime) < (q.cacheTTL + 2*q.cacheRefreshInterval))
}
shouldRefresh := time.Since(entry.lastQueryTime) > q.cacheRefreshInterval
// If it is not time to check again, then return either the cached
// value or the cached error but don't ask topo again.
// Here we have to be careful with the part where we haven't gotten even the first result.
// In that case, a refresh is already in progress, but the cache is empty! So, we can't use the cache.
// We have to wait for the query's results.
// We know the query has run at least once if the insertionTime is non-zero, or if we have an error.
queryRanAtLeastOnce := !entry.insertionTime.IsZero() || entry.lastError != nil
if !shouldRefresh && queryRanAtLeastOnce {
if cacheValid {
return entry.value, nil
}
return nil, entry.lastError
}
// Refresh the state in a background goroutine if no refresh is already
// in progress. This way queries are not blocked while the cache is still
// valid but past the refresh time, and avoids calling out to the topo
// service while the lock is held.
if entry.refreshingChan == nil {
entry.refreshingChan = make(chan struct{})
entry.lastQueryTime = time.Now()
go func() {
defer func() {
if err := recover(); err != nil {
log.Errorf("ResilientQuery uncaught panic, cell :%v, err :%v)", key, err)
}
}()
newCtx, cancel := context.WithTimeout(ctx, srvTopoTimeout)
defer cancel()
result, err := q.query(newCtx, entry)
entry.mutex.Lock()
defer func() {
close(entry.refreshingChan)
entry.refreshingChan = nil
entry.mutex.Unlock()
}()
if err == nil {
// save the value we got and the current time in the cache
entry.insertionTime = time.Now()
// Avoid a tiny race if TTL == refresh time (the default)
entry.lastQueryTime = entry.insertionTime
entry.value = result
} else {
q.counts.Add(errorCategory, 1)
if entry.insertionTime.IsZero() {
log.Errorf("ResilientQuery(%v, %v) failed: %v (no cached value, caching and returning error)", ctx, wkey, err)
} else if newCtx.Err() == context.DeadlineExceeded {
log.Errorf("ResilientQuery(%v, %v) failed: %v (request timeout), (keeping cached value: %v)", ctx, wkey, err, entry.value)
} else if entry.value != nil && time.Since(entry.insertionTime) < q.cacheTTL {
q.counts.Add(cachedCategory, 1)
log.Warningf("ResilientQuery(%v, %v) failed: %v (keeping cached value: %v)", ctx, wkey, err, entry.value)
} else {
log.Errorf("ResilientQuery(%v, %v) failed: %v (cached value expired)", ctx, wkey, err)
entry.insertionTime = time.Time{}
entry.value = nil
}
}
entry.lastError = err
}()
}
// If the cached entry is still valid then use it, otherwise wait
// for the refresh attempt to complete to get a more up to date
// response.
//
// In the event that the topo service is slow or unresponsive either
// on the initial fetch or if the cache TTL expires, then several
// requests could be blocked on refreshingChan waiting for the response
// to come back.
if cacheValid {
return entry.value, nil
}
refreshingChan := entry.refreshingChan
entry.mutex.Unlock()
select {
case <-refreshingChan:
case <-ctx.Done():
entry.mutex.Lock()
return nil, ctx.Err()
}
entry.mutex.Lock()
if entry.value != nil {
return entry.value, nil
}
return nil, entry.lastError
}