forked from cockroachdb/cockroach
-
Notifications
You must be signed in to change notification settings - Fork 0
/
split_queue.go
174 lines (160 loc) · 5.7 KB
/
split_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
// Copyright 2015 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License. See the AUTHORS file
// for names of contributors.
//
// Author: Spencer Kimball (spencer.kimball@gmail.com)
package storage
import (
"sort"
"time"
"github.com/cockroachdb/cockroach/client"
"github.com/cockroachdb/cockroach/gossip"
"github.com/cockroachdb/cockroach/proto"
"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/log"
)
const (
// splitQueueMaxSize is the max size of the split queue.
splitQueueMaxSize = 100
// splitQueueTimerDuration is the duration between splits of queued ranges.
splitQueueTimerDuration = 0 * time.Second // zero duration to process splits greedily
)
// splitQueue manages a queue of ranges slated to be split due to size
// or along intersecting accounting or zone config boundaries.
type splitQueue struct {
*baseQueue
db *client.KV
gossip *gossip.Gossip
// Some tests in this package disable the split queue.
disabled bool
}
// newSplitQueue returns a new instance of splitQueue.
func newSplitQueue(db *client.KV, gossip *gossip.Gossip) *splitQueue {
sq := &splitQueue{
db: db,
gossip: gossip,
}
sq.baseQueue = newBaseQueue("split", sq, splitQueueMaxSize)
return sq
}
// shouldQueue determines whether a range should be queued for
// splitting. This is true if the range is intersected by any
// accounting or zone config prefix or if the range's size in
// bytes exceeds the limit for the zone.
func (sq *splitQueue) shouldQueue(now proto.Timestamp, rng *Range) (shouldQ bool, priority float64) {
// Only queue for Split if this replica is leader.
if !rng.IsLeader() || sq.disabled {
return
}
// Set priority to 1 in the event the range is split by acct or zone configs.
if len(sq.computeSplitKeys(rng)) > 0 {
priority = 1
shouldQ = true
}
// Add priority based on the size of range compared to the max
// size for the zone it's in.
zone, err := sq.lookupZoneConfig(rng)
if err != nil {
log.Error(err)
return
}
if ratio := float64(rng.stats.GetSize()) / float64(zone.RangeMaxBytes); ratio > 1 {
priority += ratio
shouldQ = true
}
return
}
// process synchronously invokes admin split for each proposed split key.
func (sq *splitQueue) process(now proto.Timestamp, rng *Range) error {
if !rng.IsLeader() {
log.Infof("not leader of range %s; skipping split", rng)
return nil
}
// First handle case of splitting due to accounting and zone config maps.
splitKeys := sq.computeSplitKeys(rng)
if len(splitKeys) > 0 {
log.Infof("splitting range %q-%q at keys %v", rng.Desc().StartKey, rng.Desc().EndKey, splitKeys)
for _, splitKey := range splitKeys {
req := &proto.AdminSplitRequest{
RequestHeader: proto.RequestHeader{Key: splitKey},
SplitKey: splitKey,
}
if err := sq.db.Call(proto.AdminSplit, req, &proto.AdminSplitResponse{}); err != nil {
return util.Errorf("unable to split at key %q: %s", splitKey, err)
}
}
return nil
}
// Next handle case of splitting due to size.
zone, err := sq.lookupZoneConfig(rng)
if err != nil {
return err
}
if float64(rng.stats.GetSize())/float64(zone.RangeMaxBytes) > 1 {
rng.AddCmd(proto.AdminSplit, &proto.AdminSplitRequest{
RequestHeader: proto.RequestHeader{Key: rng.Desc().StartKey},
}, &proto.AdminSplitResponse{}, true)
}
return nil
}
// timer returns interval between processing successive queued splits.
func (sq *splitQueue) timer() time.Duration {
return splitQueueTimerDuration
}
// computeSplitKeys returns an array of keys at which the supplied
// range should be split, as computed by intersecting the range with
// accounting and zone config map boundaries.
func (sq *splitQueue) computeSplitKeys(rng *Range) []proto.Key {
// Now split the range into pieces by intersecting it with the
// boundaries of the config map.
splitKeys := proto.KeySlice{}
for _, configKey := range []string{gossip.KeyConfigAccounting, gossip.KeyConfigZone} {
info, err := sq.gossip.GetInfo(configKey)
if err != nil {
log.Errorf("unable to fetch %s config from gossip: %s", configKey, err)
continue
}
configMap := info.(PrefixConfigMap)
splits, err := configMap.SplitRangeByPrefixes(rng.Desc().StartKey, rng.Desc().EndKey)
if err != nil {
log.Errorf("unable to split range %q-%q by prefix map %s", rng.Desc().StartKey, rng.Desc().EndKey, configMap)
continue
}
// Gather new splits.
for _, split := range splits {
if split.end.Less(rng.Desc().EndKey) {
splitKeys = append(splitKeys, split.end)
}
}
}
// Sort and unique the combined split keys from intersections with
// both the accounting and zone config maps.
sort.Sort(splitKeys)
var unique []proto.Key
for i, key := range splitKeys {
if i == 0 || !key.Equal(splitKeys[i-1]) {
unique = append(unique, key)
}
}
return unique
}
// lookupZoneConfig returns the zone config matching the range.
func (sq *splitQueue) lookupZoneConfig(rng *Range) (proto.ZoneConfig, error) {
zoneMap, err := sq.gossip.GetInfo(gossip.KeyConfigZone)
if err != nil || zoneMap == nil {
return proto.ZoneConfig{}, util.Errorf("unable to lookup zone config for range %s: %s", rng, err)
}
prefixConfig := zoneMap.(PrefixConfigMap).MatchByPrefix(rng.Desc().StartKey)
return *prefixConfig.Config.(*proto.ZoneConfig), nil
}