-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
legacy_tablet_stats_cache.go
313 lines (274 loc) · 10.4 KB
/
legacy_tablet_stats_cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
/*
Copyright 2019 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package discovery
import (
"sync"
"context"
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
)
// LegacyTabletStatsCache is a LegacyHealthCheckStatsListener that keeps both the
// current list of available LegacyTabletStats, and a serving list:
// - for primary tablets, only the current primary is kept.
// - for non-primary tablets, we filter the list using FilterLegacyStatsByReplicationLag.
// It keeps entries for all tablets in the cell(s) it's configured to serve for,
// and for the primary independently of which cell it's in.
// Note the healthy tablet computation is done when we receive a tablet
// update only, not at serving time.
// Also note the cache may not have the last entry received by the tablet.
// For instance, if a tablet was healthy, and is still healthy, we do not
// keep its new update.
type LegacyTabletStatsCache struct {
// cell is the cell we are keeping all tablets for.
// Note we keep track of all primary tablets in all cells.
cell string
// ts is the topo server in use.
ts *topo.Server
// mu protects the following fields. It does not protect individual
// entries in the entries map.
mu sync.RWMutex
// entries maps from keyspace/shard/tabletType to our cache.
entries map[string]map[string]map[topodatapb.TabletType]*legacyTabletStatsCacheEntry
// cellAliases is a cache of cell aliases
cellAliases map[string]string
}
// legacyTabletStatsCacheEntry is the per keyspace/shard/tabletType
// entry of the in-memory map for LegacyTabletStatsCache.
type legacyTabletStatsCacheEntry struct {
// mu protects the rest of this structure.
mu sync.RWMutex
// all has the valid tablets, indexed by TabletToMapKey(ts.Tablet),
// as it is the index used by LegacyHealthCheck.
all map[string]*LegacyTabletStats
// healthy only has the healthy ones.
healthy []*LegacyTabletStats
}
func (e *legacyTabletStatsCacheEntry) updateHealthyMapForPrimary(ts *LegacyTabletStats) {
if ts.Up {
// We have an Up primary.
if len(e.healthy) == 0 {
// We have a new Up server, just remember it.
e.healthy = append(e.healthy, ts)
return
}
// We already have one up server, see if we
// need to replace it.
if ts.TabletExternallyReparentedTimestamp < e.healthy[0].TabletExternallyReparentedTimestamp {
log.Warningf("not marking healthy primary %s as Up for %s because its externally reparented timestamp is smaller than the highest known timestamp from previous MASTERs %s: %d < %d ",
topoproto.TabletAliasString(ts.Tablet.Alias),
topoproto.KeyspaceShardString(ts.Target.Keyspace, ts.Target.Shard),
topoproto.TabletAliasString(e.healthy[0].Tablet.Alias),
ts.TabletExternallyReparentedTimestamp,
e.healthy[0].TabletExternallyReparentedTimestamp)
return
}
// Just replace it.
e.healthy[0] = ts
return
}
// We have a Down primary, remove it only if it's exactly the same.
if len(e.healthy) != 0 {
if ts.Key == e.healthy[0].Key {
// Same guy, remove it.
e.healthy = nil
}
}
}
// NewLegacyTabletStatsCache creates a LegacyTabletStatsCache, and registers
// it as LegacyHealthCheckStatsListener of the provided healthcheck.
// Note we do the registration in this code to guarantee we call
// SetListener with sendDownEvents=true, as we need these events
// to maintain the integrity of our cache.
func NewLegacyTabletStatsCache(hc LegacyHealthCheck, ts *topo.Server, cell string) *LegacyTabletStatsCache {
return newLegacyTabletStatsCache(hc, ts, cell, true /* setListener */)
}
// NewTabletStatsCacheDoNotSetListener is identical to NewLegacyTabletStatsCache
// but does not automatically set the returned object as listener for "hc".
// Instead, it's up to the caller to ensure that LegacyTabletStatsCache.StatsUpdate()
// gets called properly. This is useful for chaining multiple listeners.
// When the caller sets its own listener on "hc", they must make sure that they
// set the parameter "sendDownEvents" to "true" or this cache won't properly
// remove tablets whose tablet type changes.
func NewTabletStatsCacheDoNotSetListener(ts *topo.Server, cell string) *LegacyTabletStatsCache {
return newLegacyTabletStatsCache(nil, ts, cell, false /* setListener */)
}
func newLegacyTabletStatsCache(hc LegacyHealthCheck, ts *topo.Server, cell string, setListener bool) *LegacyTabletStatsCache {
tc := &LegacyTabletStatsCache{
cell: cell,
ts: ts,
entries: make(map[string]map[string]map[topodatapb.TabletType]*legacyTabletStatsCacheEntry),
cellAliases: make(map[string]string),
}
if setListener {
// We need to set sendDownEvents=true to get the deletes from the map
// upon type change.
hc.SetListener(tc, true /*sendDownEvents*/)
}
return tc
}
// getEntry returns an existing legacyTabletStatsCacheEntry in the cache, or nil
// if the entry does not exist. It only takes a Read lock on mu.
func (tc *LegacyTabletStatsCache) getEntry(keyspace, shard string, tabletType topodatapb.TabletType) *legacyTabletStatsCacheEntry {
tc.mu.RLock()
defer tc.mu.RUnlock()
if s, ok := tc.entries[keyspace]; ok {
if t, ok := s[shard]; ok {
if e, ok := t[tabletType]; ok {
return e
}
}
}
return nil
}
// getOrCreateEntry returns an existing legacyTabletStatsCacheEntry from the cache,
// or creates it if it doesn't exist.
func (tc *LegacyTabletStatsCache) getOrCreateEntry(target *querypb.Target) *legacyTabletStatsCacheEntry {
// Fast path (most common path too): Read-lock, return the entry.
if e := tc.getEntry(target.Keyspace, target.Shard, target.TabletType); e != nil {
return e
}
// Slow path: Lock, will probably have to add the entry at some level.
tc.mu.Lock()
defer tc.mu.Unlock()
s, ok := tc.entries[target.Keyspace]
if !ok {
s = make(map[string]map[topodatapb.TabletType]*legacyTabletStatsCacheEntry)
tc.entries[target.Keyspace] = s
}
t, ok := s[target.Shard]
if !ok {
t = make(map[topodatapb.TabletType]*legacyTabletStatsCacheEntry)
s[target.Shard] = t
}
e, ok := t[target.TabletType]
if !ok {
e = &legacyTabletStatsCacheEntry{
all: make(map[string]*LegacyTabletStats),
}
t[target.TabletType] = e
}
return e
}
func (tc *LegacyTabletStatsCache) getAliasByCell(cell string) string {
tc.mu.Lock()
defer tc.mu.Unlock()
if alias, ok := tc.cellAliases[cell]; ok {
return alias
}
alias := topo.GetAliasByCell(context.Background(), tc.ts, cell)
tc.cellAliases[cell] = alias
return alias
}
// StatsUpdate is part of the LegacyHealthCheckStatsListener interface.
func (tc *LegacyTabletStatsCache) StatsUpdate(ts *LegacyTabletStats) {
if ts.Target.TabletType != topodatapb.TabletType_PRIMARY &&
ts.Tablet.Alias.Cell != tc.cell &&
tc.getAliasByCell(ts.Tablet.Alias.Cell) != tc.getAliasByCell(tc.cell) {
// this is for a non-primary tablet in a different cell and a different alias, drop it
return
}
e := tc.getOrCreateEntry(ts.Target)
e.mu.Lock()
defer e.mu.Unlock()
// Update our full map.
trivialNonPrimaryUpdate := false
if existing, ok := e.all[ts.Key]; ok {
if ts.Up {
// We have an existing entry, and a new entry.
// Remember if they are both good (most common case).
trivialNonPrimaryUpdate = existing.LastError == nil && existing.Serving && ts.LastError == nil &&
ts.Serving && ts.Target.TabletType != topodatapb.TabletType_PRIMARY && existing.TrivialStatsUpdate(ts)
// We already have the entry, update the
// values if necessary. (will update both
// 'all' and 'healthy' as they use pointers).
if !trivialNonPrimaryUpdate {
*existing = *ts
}
} else {
// We have an entry which we shouldn't. Remove it.
delete(e.all, ts.Key)
}
} else {
if ts.Up {
// Add the entry.
e.all[ts.Key] = ts
} else {
// We were told to remove an entry which we
// didn't have anyway, nothing should happen.
return
}
}
// Update our healthy list.
var allArray []*LegacyTabletStats
if ts.Target.TabletType == topodatapb.TabletType_PRIMARY {
// The healthy list is different for TabletType_PRIMARY: we
// only keep the most recent one.
e.updateHealthyMapForPrimary(ts)
} else {
// For non-primary, if it is a trivial update,
// we just skip everything else. We don't even update the
// aggregate stats.
if trivialNonPrimaryUpdate {
return
}
// Now we need to do some work. Recompute our healthy list.
allArray = make([]*LegacyTabletStats, 0, len(e.all))
for _, s := range e.all {
allArray = append(allArray, s)
}
e.healthy = FilterLegacyStatsByReplicationLag(allArray)
}
}
// GetTabletStats returns the full list of available targets.
// The returned array is owned by the caller.
func (tc *LegacyTabletStatsCache) GetTabletStats(keyspace, shard string, tabletType topodatapb.TabletType) []LegacyTabletStats {
e := tc.getEntry(keyspace, shard, tabletType)
if e == nil {
return nil
}
e.mu.RLock()
defer e.mu.RUnlock()
result := make([]LegacyTabletStats, 0, len(e.all))
for _, s := range e.all {
result = append(result, *s)
}
return result
}
// GetHealthyTabletStats returns only the healthy targets.
// The returned array is owned by the caller.
// For TabletType_PRIMARY, this will only return at most one entry,
// the most recent tablet of type primary.
func (tc *LegacyTabletStatsCache) GetHealthyTabletStats(keyspace, shard string, tabletType topodatapb.TabletType) []LegacyTabletStats {
e := tc.getEntry(keyspace, shard, tabletType)
if e == nil {
return nil
}
e.mu.RLock()
defer e.mu.RUnlock()
result := make([]LegacyTabletStats, len(e.healthy))
for i, ts := range e.healthy {
result[i] = *ts
}
return result
}
// ResetForTesting is for use in tests only.
func (tc *LegacyTabletStatsCache) ResetForTesting() {
tc.mu.Lock()
defer tc.mu.Unlock()
tc.entries = make(map[string]map[string]map[topodatapb.TabletType]*legacyTabletStatsCacheEntry)
}
// Compile-time interface check.
var _ LegacyHealthCheckStatsListener = (*LegacyTabletStatsCache)(nil)