forked from cockroachdb/cockroach
/
store_pool.go
327 lines (287 loc) · 9.26 KB
/
store_pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
// Copyright 2015 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
//
// Author: Bram Gruneir (bram+code@cockroachlabs.com)
package storage
import (
"container/heap"
"sort"
"sync"
"time"
"github.com/cockroachdb/cockroach/gossip"
"github.com/cockroachdb/cockroach/roachpb"
"github.com/cockroachdb/cockroach/util/hlc"
"github.com/cockroachdb/cockroach/util/log"
"github.com/cockroachdb/cockroach/util/stop"
)
const (
// TestTimeUntilStoreDead is the test value for TimeUntilStoreDead to
// quickly mark stores as dead.
TestTimeUntilStoreDead = 5 * time.Millisecond
// TestTimeUntilStoreDeadOff is the test value for TimeUntilStoreDead that
// prevents the store pool from marking stores as dead.
TestTimeUntilStoreDeadOff = 24 * time.Hour
)
type storeDetail struct {
desc roachpb.StoreDescriptor
dead bool
gossiped bool // Was this store updated via gossip?
timesDied int
foundDeadOn roachpb.Timestamp
lastUpdatedTime roachpb.Timestamp // This is also the priority for the queue.
index int // index of the item in the heap, required for heap.Interface
}
// markDead sets the storeDetail to dead(inactive).
func (sd *storeDetail) markDead(foundDeadOn roachpb.Timestamp) {
sd.dead = true
sd.foundDeadOn = foundDeadOn
sd.timesDied++
log.Warningf("store %s on node %s is now considered offline", sd.desc.StoreID, sd.desc.Node.NodeID)
}
// markAlive sets the storeDetail to alive(active) and saves the updated time
// and descriptor.
func (sd *storeDetail) markAlive(foundAliveOn roachpb.Timestamp, storeDesc roachpb.StoreDescriptor, gossiped bool) {
sd.desc = storeDesc
sd.dead = false
sd.gossiped = gossiped
sd.lastUpdatedTime = foundAliveOn
}
// storePoolPQ implements the heap.Interface (which includes sort.Interface)
// and holds storeDetail. storePoolPQ is not threadsafe.
type storePoolPQ []*storeDetail
// Len implements the sort.Interface.
func (pq storePoolPQ) Len() int {
return len(pq)
}
// Less implements the sort.Interface.
func (pq storePoolPQ) Less(i, j int) bool {
return pq[i].lastUpdatedTime.Less(pq[j].lastUpdatedTime)
}
// Swap implements the sort.Interface.
func (pq storePoolPQ) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index, pq[j].index = i, j
}
// Push implements the heap.Interface.
func (pq *storePoolPQ) Push(x interface{}) {
n := len(*pq)
item := x.(*storeDetail)
item.index = n
*pq = append(*pq, item)
}
// Pop implements the heap.Interface.
func (pq *storePoolPQ) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
item.index = -1 // for safety
*pq = old[0 : n-1]
return item
}
// peek returns the next value in the priority queue without dequeuing it.
func (pq storePoolPQ) peek() *storeDetail {
if len(pq) == 0 {
return nil
}
return (pq)[0]
}
// enqueue either adds the detail to the queue or updates its location in the
// priority queue.
func (pq *storePoolPQ) enqueue(detail *storeDetail) {
if detail.index < 0 {
heap.Push(pq, detail)
} else {
heap.Fix(pq, detail.index)
}
}
// dequeue removes the next detail from the priority queue.
func (pq *storePoolPQ) dequeue() *storeDetail {
if len(*pq) == 0 {
return nil
}
return heap.Pop(pq).(*storeDetail)
}
// StorePool maintains a list of all known stores in the cluster and
// information on their health.
type StorePool struct {
clock *hlc.Clock
timeUntilStoreDead time.Duration
// Each storeDetail is contained in both a map and a priorityQueue; pointers
// are used so that data can be kept in sync.
mu sync.RWMutex // Protects stores and queue.
stores map[roachpb.StoreID]*storeDetail
queue storePoolPQ
}
// NewStorePool creates a StorePool and registers the store updating callback
// with gossip.
func NewStorePool(g *gossip.Gossip, clock *hlc.Clock, timeUntilStoreDead time.Duration, stopper *stop.Stopper) *StorePool {
sp := &StorePool{
clock: clock,
timeUntilStoreDead: timeUntilStoreDead,
stores: make(map[roachpb.StoreID]*storeDetail),
}
heap.Init(&sp.queue)
storeRegex := gossip.MakePrefixPattern(gossip.KeyStorePrefix)
g.RegisterCallback(storeRegex, sp.storeGossipUpdate)
sp.start(stopper)
return sp
}
// storeGossipUpdate is the gossip callback used to keep the StorePool up to date.
func (sp *StorePool) storeGossipUpdate(_ string, content roachpb.Value) {
var storeDesc roachpb.StoreDescriptor
if err := content.GetProto(&storeDesc); err != nil {
log.Error(err)
return
}
sp.mu.Lock()
defer sp.mu.Unlock()
// Does this storeDetail exist yet?
detail, ok := sp.stores[storeDesc.StoreID]
if !ok {
// Setting index to -1 ensures this gets added to the queue.
detail = &storeDetail{index: -1}
sp.stores[storeDesc.StoreID] = detail
}
detail.markAlive(sp.clock.Now(), storeDesc, true)
sp.queue.enqueue(detail)
}
// start will run continuously and mark stores as offline if they haven't been
// heard from in longer than timeUntilStoreDead.
func (sp *StorePool) start(stopper *stop.Stopper) {
stopper.RunWorker(func() {
for {
var timeout time.Duration
sp.mu.Lock()
detail := sp.queue.peek()
if detail == nil {
// No stores yet, wait the full timeout.
timeout = sp.timeUntilStoreDead
} else {
// Check to see if the store should be marked as dead.
deadAsOf := detail.lastUpdatedTime.GoTime().Add(sp.timeUntilStoreDead)
now := sp.clock.Now()
if now.GoTime().After(deadAsOf) {
deadDetail := sp.queue.dequeue()
deadDetail.markDead(now)
// The next store might be dead as well, set the timeout to
// 0 to process it immediately.
timeout = 0
} else {
// Store is still alive, schedule the next check for when
// it should timeout.
timeout = deadAsOf.Sub(now.GoTime())
}
}
sp.mu.Unlock()
select {
case <-time.After(timeout):
case <-stopper.ShouldStop():
return
}
}
})
}
// GetStoreDescriptor returns the store detail for the given storeID.
func (sp *StorePool) getStoreDetail(storeID roachpb.StoreID) storeDetail {
sp.mu.Lock()
defer sp.mu.Unlock()
detail, ok := sp.stores[storeID]
if !ok {
// We don't seem to have that store yet, create a new detail and add
// it to the queue. This will give it the full timeout before it is
// considered dead.
detail = &storeDetail{index: -1}
sp.stores[storeID] = detail
detail.markAlive(sp.clock.Now(), roachpb.StoreDescriptor{StoreID: storeID}, false)
sp.queue.enqueue(detail)
}
return *detail
}
// GetStoreDescriptor returns the latest store descriptor for the given
// storeID.
func (sp *StorePool) getStoreDescriptor(storeID roachpb.StoreID) *roachpb.StoreDescriptor {
sp.mu.RLock()
defer sp.mu.RUnlock()
detail, ok := sp.stores[storeID]
if !ok {
return nil
}
// Only return gossiped stores.
if !detail.gossiped {
return nil
}
desc := detail.desc
return &desc
}
// findDeadReplicas returns any replicas from the supplied slice that are
// located on dead stores.
func (sp *StorePool) deadReplicas(repls []roachpb.ReplicaDescriptor) []roachpb.ReplicaDescriptor {
var deadReplicas []roachpb.ReplicaDescriptor
for _, repl := range repls {
if sp.getStoreDetail(repl.StoreID).dead {
deadReplicas = append(deadReplicas, repl)
}
}
return deadReplicas
}
// stat provides a running sample size and mean.
type stat struct {
n, mean float64
}
// Update adds the specified value to the stat, augmenting the sample
// size & mean.
func (s *stat) update(x float64) {
s.n++
s.mean += (x - s.mean) / s.n
}
// StoreList holds a list of store descriptors and associated count and used
// stats for those stores.
type StoreList struct {
stores []*roachpb.StoreDescriptor
count, used stat
}
// add includes the store descriptor to the list of stores and updates
// maintained statistics.
func (sl *StoreList) add(s *roachpb.StoreDescriptor) {
sl.stores = append(sl.stores, s)
sl.count.update(float64(s.Capacity.RangeCount))
sl.used.update(s.Capacity.FractionUsed())
}
// GetStoreList returns a storeList that contains all active stores that
// contain the required attributes and their associated stats.
// TODO(embark, spencer): consider using a reverse index map from
// Attr->stores, for efficiency. Ensure that entries in this map still
// have an opportunity to be garbage collected.
func (sp *StorePool) getStoreList(required roachpb.Attributes, deterministic bool) StoreList {
sp.mu.RLock()
defer sp.mu.RUnlock()
var storeIDs roachpb.StoreIDSlice
for storeID := range sp.stores {
storeIDs = append(storeIDs, storeID)
}
// Sort the stores by key if deterministic is requested. This is only for
// unit testing.
if deterministic {
sort.Sort(storeIDs)
}
sl := StoreList{}
for _, storeID := range storeIDs {
detail := sp.stores[roachpb.StoreID(storeID)]
if !detail.dead && required.IsSubset(*detail.desc.CombinedAttrs()) {
desc := detail.desc
sl.add(&desc)
}
}
return sl
}