forked from vitessio/vitess
/
after_action.go
247 lines (219 loc) · 7.93 KB
/
after_action.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
// Copyright 2012, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package tabletmanager
// This file handles the agent state changes.
import (
"fmt"
"reflect"
"strings"
"golang.org/x/net/context"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/stats"
"github.com/youtube/vitess/go/trace"
"github.com/youtube/vitess/go/vt/binlog"
"github.com/youtube/vitess/go/vt/tabletserver"
"github.com/youtube/vitess/go/vt/tabletserver/planbuilder"
"github.com/youtube/vitess/go/vt/topo"
)
var (
// the stats exported by this module
statsType = stats.NewString("TabletType")
statsKeyspace = stats.NewString("TabletKeyspace")
statsShard = stats.NewString("TabletShard")
statsKeyRangeStart = stats.NewString("TabletKeyRangeStart")
statsKeyRangeEnd = stats.NewString("TabletKeyRangeEnd")
// constants for this module
historyLength = 16
)
// Query rules from keyrange
const keyrangeQueryRules string = "KeyrangeQueryRules"
// Query rules from blacklist
const blacklistQueryRules string = "BlacklistQueryRules"
func (agent *ActionAgent) allowQueries(tablet *topo.Tablet, blacklistedTables []string) error {
if agent.DBConfigs == nil {
// test instance, do nothing
return nil
}
// if the query service is already running, we're not starting it again
if tabletserver.SqlQueryRpcService.GetState() == "SERVING" {
return nil
}
// Update our DB config to match the info we have in the tablet
if agent.DBConfigs.App.DbName == "" {
agent.DBConfigs.App.DbName = tablet.DbName()
}
agent.DBConfigs.App.Keyspace = tablet.Keyspace
agent.DBConfigs.App.Shard = tablet.Shard
if tablet.Type != topo.TYPE_MASTER {
agent.DBConfigs.App.EnableInvalidator = true
} else {
agent.DBConfigs.App.EnableInvalidator = false
}
err := agent.loadKeyspaceAndBlacklistRules(tablet, blacklistedTables)
if err != nil {
return err
}
return tabletserver.AllowQueries(agent.DBConfigs, agent.SchemaOverrides, agent.Mysqld)
}
// loadKeyspaceAndBlacklistRules does what the name suggests:
// 1. load and build keyrange query rules
// 2. load and build blacklist query rules
func (agent *ActionAgent) loadKeyspaceAndBlacklistRules(tablet *topo.Tablet, blacklistedTables []string) (err error) {
// Keyrange rules
keyrangeRules := tabletserver.NewQueryRules()
if tablet.KeyRange.IsPartial() {
log.Infof("Restricting to keyrange: %v", tablet.KeyRange)
dmlPlans := []struct {
planID planbuilder.PlanType
onAbsent bool
}{
{planbuilder.PLAN_INSERT_PK, true},
{planbuilder.PLAN_INSERT_SUBQUERY, true},
{planbuilder.PLAN_PASS_DML, false},
{planbuilder.PLAN_DML_PK, false},
{planbuilder.PLAN_DML_SUBQUERY, false},
}
for _, plan := range dmlPlans {
qr := tabletserver.NewQueryRule(
fmt.Sprintf("enforce keyspace_id range for %v", plan.planID),
fmt.Sprintf("keyspace_id_not_in_range_%v", plan.planID),
tabletserver.QR_FAIL,
)
qr.AddPlanCond(plan.planID)
err := qr.AddBindVarCond("keyspace_id", plan.onAbsent, true, tabletserver.QR_NOTIN, tablet.KeyRange)
if err != nil {
return fmt.Errorf("Unable to add keyspace rule: %v", err)
}
keyrangeRules.Add(qr)
}
}
// Blacklisted tables
blacklistRules := tabletserver.NewQueryRules()
if len(blacklistedTables) > 0 {
// tables, first resolve wildcards
tables, err := agent.Mysqld.ResolveTables(tablet.DbName(), blacklistedTables)
if err != nil {
return err
}
log.Infof("Blacklisting tables %v", strings.Join(tables, ", "))
qr := tabletserver.NewQueryRule("enforce blacklisted tables", "blacklisted_table", tabletserver.QR_FAIL_RETRY)
for _, t := range tables {
qr.AddTableCond(t)
}
blacklistRules.Add(qr)
}
// Push all three sets of QueryRules to SqlQueryRpcService
loadRuleErr := tabletserver.SetQueryRules(keyrangeQueryRules, keyrangeRules)
if loadRuleErr != nil {
log.Warningf("Fail to load query rule set %s: %s", keyrangeQueryRules, loadRuleErr)
}
loadRuleErr = tabletserver.SetQueryRules(blacklistQueryRules, blacklistRules)
if loadRuleErr != nil {
log.Warningf("Fail to load query rule set %s: %s", blacklistQueryRules, loadRuleErr)
}
return nil
}
func (agent *ActionAgent) disallowQueries() {
if agent.DBConfigs == nil {
// test instance, do nothing
return
}
tabletserver.DisallowQueries()
}
// changeCallback is run after every action that might
// have changed something in the tablet record.
func (agent *ActionAgent) changeCallback(ctx context.Context, oldTablet, newTablet *topo.Tablet) error {
span := trace.NewSpanFromContext(ctx)
span.StartLocal("ActionAgent.changeCallback")
defer span.Finish()
allowQuery := newTablet.IsRunningQueryService()
// Read the shard to get SourceShards / TabletControlMap if
// we're going to use it.
var shardInfo *topo.ShardInfo
var tabletControl *topo.TabletControl
var blacklistedTables []string
var err error
if allowQuery {
shardInfo, err = topo.GetShard(ctx, agent.TopoServer, newTablet.Keyspace, newTablet.Shard)
if err != nil {
log.Errorf("Cannot read shard for this tablet %v, might have inaccurate SourceShards and TabletControls: %v", newTablet.Alias, err)
} else {
if newTablet.Type == topo.TYPE_MASTER {
allowQuery = len(shardInfo.SourceShards) == 0
}
if tc, ok := shardInfo.TabletControlMap[newTablet.Type]; ok {
if topo.InCellList(newTablet.Alias.Cell, tc.Cells) {
if tc.DisableQueryService {
allowQuery = false
}
blacklistedTables = tc.BlacklistedTables
tabletControl = tc
}
}
}
}
// Read the keyspace on masters to get ShardingColumnType,
// for binlog replication, only if source shards are set.
var keyspaceInfo *topo.KeyspaceInfo
if newTablet.Type == topo.TYPE_MASTER && shardInfo != nil && len(shardInfo.SourceShards) > 0 {
keyspaceInfo, err = agent.TopoServer.GetKeyspace(newTablet.Keyspace)
if err != nil {
log.Errorf("Cannot read keyspace for this tablet %v: %v", newTablet.Alias, err)
keyspaceInfo = nil
}
}
if allowQuery {
// There are a few transitions when we're
// going to need to restart the query service:
// - transitioning from replica to master, so clients
// that were already connected don't keep on using
// the master as replica or rdonly.
// - having different parameters for the query
// service. It needs to stop and restart with the
// new parameters. That includes:
// - changing KeyRange
// - changing the BlacklistedTables list
if (newTablet.Type == topo.TYPE_MASTER &&
oldTablet.Type != topo.TYPE_MASTER) ||
(newTablet.KeyRange != oldTablet.KeyRange) ||
!reflect.DeepEqual(blacklistedTables, agent.BlacklistedTables()) {
agent.disallowQueries()
}
if err := agent.allowQueries(newTablet, blacklistedTables); err != nil {
log.Errorf("Cannot start query service: %v", err)
}
} else {
agent.disallowQueries()
}
// save the tabletControl we've been using, so the background
// healthcheck makes the same decisions as we've been making.
agent.setTabletControl(tabletControl)
// update stream needs to be started or stopped too
if agent.DBConfigs != nil {
if topo.IsRunningUpdateStream(newTablet.Type) {
binlog.EnableUpdateStreamService(agent.DBConfigs.App.DbName, agent.Mysqld)
} else {
binlog.DisableUpdateStreamService()
}
}
statsType.Set(string(newTablet.Type))
statsKeyspace.Set(newTablet.Keyspace)
statsShard.Set(newTablet.Shard)
statsKeyRangeStart.Set(string(newTablet.KeyRange.Start.Hex()))
statsKeyRangeEnd.Set(string(newTablet.KeyRange.End.Hex()))
// See if we need to start or stop any binlog player
if agent.BinlogPlayerMap != nil {
if newTablet.Type == topo.TYPE_MASTER {
agent.BinlogPlayerMap.RefreshMap(newTablet, keyspaceInfo, shardInfo)
} else {
agent.BinlogPlayerMap.StopAllPlayersAndReset()
}
}
return nil
}
func init() {
// Register query rule sources under control of agent
tabletserver.QueryRuleSources.RegisterQueryRuleSource(keyrangeQueryRules)
tabletserver.QueryRuleSources.RegisterQueryRuleSource(blacklistQueryRules)
}