forked from vitessio/vitess
-
Notifications
You must be signed in to change notification settings - Fork 0
/
state_change.go
306 lines (273 loc) · 10.3 KB
/
state_change.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
// Copyright 2012, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package tabletmanager
// This file handles the agent state changes.
import (
"flag"
"fmt"
"strings"
"time"
"golang.org/x/net/context"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/event"
"github.com/youtube/vitess/go/trace"
"github.com/youtube/vitess/go/vt/mysqlctl"
"github.com/youtube/vitess/go/vt/tabletmanager/events"
"github.com/youtube/vitess/go/vt/tabletserver"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/topo/topoproto"
querypb "github.com/youtube/vitess/go/vt/proto/query"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
)
var (
// constants for this module
historyLength = 16
// gracePeriod is the amount of time we pause after broadcasting to vtgate
// that we're going to stop serving a particular target type (e.g. when going
// spare, or when being promoted to master). During this period, we expect
// vtgate to gracefully redirect traffic elsewhere, before we begin actually
// rejecting queries for that target type.
gracePeriod = flag.Duration("serving_state_grace_period", 0, "how long to pause after broadcasting health to vtgate, before enforcing a new serving state")
)
// Query rules from blacklist
const blacklistQueryRules string = "BlacklistQueryRules"
// loadBlacklistRules loads and builds the blacklist query rules
func (agent *ActionAgent) loadBlacklistRules(tablet *topodatapb.Tablet, blacklistedTables []string) (err error) {
blacklistRules := tabletserver.NewQueryRules()
if len(blacklistedTables) > 0 {
// tables, first resolve wildcards
tables, err := mysqlctl.ResolveTables(agent.MysqlDaemon, topoproto.TabletDbName(tablet), blacklistedTables)
if err != nil {
return err
}
// Verify that at least one table matches the wildcards, so
// that we don't add a rule to blacklist all tables
if len(tables) > 0 {
log.Infof("Blacklisting tables %v", strings.Join(tables, ", "))
qr := tabletserver.NewQueryRule("enforce blacklisted tables", "blacklisted_table", tabletserver.QRFailRetry)
for _, t := range tables {
qr.AddTableCond(t)
}
blacklistRules.Add(qr)
}
}
loadRuleErr := agent.QueryServiceControl.SetQueryRules(blacklistQueryRules, blacklistRules)
if loadRuleErr != nil {
log.Warningf("Fail to load query rule set %s: %s", blacklistQueryRules, loadRuleErr)
}
return nil
}
// lameduck changes the QueryServiceControl state to lameduck,
// brodcasts the new health, then sleep for grace period, to give time
// to clients to get the new status.
func (agent *ActionAgent) lameduck(reason string) {
log.Infof("Agent is entering lameduck, reason: %v", reason)
agent.QueryServiceControl.EnterLameduck()
agent.broadcastHealth()
time.Sleep(*gracePeriod)
log.Infof("Agent is leaving lameduck")
}
func (agent *ActionAgent) broadcastHealth() {
// get the replication delays
agent.mutex.Lock()
replicationDelay := agent._replicationDelay
healthError := agent._healthy
terTime := agent._tabletExternallyReparentedTime
agent.mutex.Unlock()
// send it to our observers
// FIXME(alainjobart,liguo) add CpuUsage
stats := &querypb.RealtimeStats{
SecondsBehindMaster: uint32(replicationDelay.Seconds()),
}
if agent.BinlogPlayerMap != nil {
stats.SecondsBehindMasterFilteredReplication, stats.BinlogPlayersCount = agent.BinlogPlayerMap.StatusSummary()
}
if qss := agent.QueryServiceControl.QueryServiceStats(); qss != nil {
stats.Qps = qss.QPSRates.TotalRate()
}
if healthError != nil {
stats.HealthError = healthError.Error()
}
var ts int64
if !terTime.IsZero() {
ts = terTime.Unix()
}
go agent.QueryServiceControl.BroadcastHealth(ts, stats)
}
// refreshTablet needs to be run after an action may have changed the current
// state of the tablet.
func (agent *ActionAgent) refreshTablet(ctx context.Context, reason string) error {
log.Infof("Executing post-action state refresh")
span := trace.NewSpanFromContext(ctx)
span.StartLocal("ActionAgent.refreshTablet")
span.Annotate("reason", reason)
defer span.Finish()
ctx = trace.NewContext(ctx, span)
// Save the old tablet so callbacks can have a better idea of
// the precise nature of the transition.
oldTablet := agent.Tablet()
// Actions should have side effects on the tablet, so reload the data.
tablet, err := agent.updateTabletFromTopo(ctx)
if err != nil {
log.Warningf("Failed rereading tablet after %v - services may be inconsistent: %v", reason, err)
return fmt.Errorf("Failed rereading tablet after %v: %v", reason, err)
}
if updatedTablet := agent.checkTabletMysqlPort(ctx, tablet); updatedTablet != nil {
agent.setTablet(updatedTablet)
}
if err := agent.updateState(ctx, oldTablet, reason); err != nil {
return err
}
log.Infof("Done with post-action state refresh")
return nil
}
// updateState will use the provided tablet record as a base, the current
// tablet record as the new one, run changeCallback, and dispatch the event.
func (agent *ActionAgent) updateState(ctx context.Context, oldTablet *topodatapb.Tablet, reason string) error {
newTablet := agent.Tablet()
log.Infof("Running tablet callback because: %v", reason)
if err := agent.changeCallback(ctx, oldTablet, newTablet); err != nil {
return err
}
event.Dispatch(&events.StateChange{
OldTablet: *oldTablet,
NewTablet: *newTablet,
Reason: reason,
})
return nil
}
// changeCallback is run after every action that might
// have changed something in the tablet record or in the topology.
//
// It owns making changes to the BinlogPlayerMap. The input for this is the
// tablet type (has to be master), and the shard's SourceShards.
//
// It owns updating the blacklisted tables.
//
// It owns updating the stats record for 'TabletType'.
//
// It owns starting and stopping the update stream service.
//
// It owns reading the TabletControl for the current tablet, and storing it.
func (agent *ActionAgent) changeCallback(ctx context.Context, oldTablet, newTablet *topodatapb.Tablet) error {
span := trace.NewSpanFromContext(ctx)
span.StartLocal("ActionAgent.changeCallback")
defer span.Finish()
allowQuery := topo.IsRunningQueryService(newTablet.Type)
broadcastHealth := false
runUpdateStream := allowQuery
// Read the shard to get SourceShards / TabletControlMap if
// we're going to use it.
var shardInfo *topo.ShardInfo
var err error
var disallowQueryReason string
var blacklistedTables []string
updateBlacklistedTables := true
if allowQuery {
shardInfo, err = agent.TopoServer.GetShard(ctx, newTablet.Keyspace, newTablet.Shard)
if err != nil {
log.Errorf("Cannot read shard for this tablet %v, might have inaccurate SourceShards and TabletControls: %v", newTablet.Alias, err)
updateBlacklistedTables = false
} else {
if newTablet.Type == topodatapb.TabletType_MASTER {
if len(shardInfo.SourceShards) > 0 {
allowQuery = false
disallowQueryReason = "master tablet with filtered replication on"
}
}
if tc := shardInfo.GetTabletControl(newTablet.Type); tc != nil {
if topo.InCellList(newTablet.Alias.Cell, tc.Cells) {
if tc.DisableQueryService {
allowQuery = false
disallowQueryReason = "TabletControl.DisableQueryService set"
}
blacklistedTables = tc.BlacklistedTables
}
}
}
} else {
disallowQueryReason = fmt.Sprintf("not a serving tablet type(%v)", newTablet.Type)
}
agent.setServicesDesiredState(disallowQueryReason, runUpdateStream)
if updateBlacklistedTables {
if err := agent.loadBlacklistRules(newTablet, blacklistedTables); err != nil {
// FIXME(alainjobart) how to handle this error?
log.Errorf("Cannot update blacklisted tables rule: %v", err)
} else {
agent.setBlacklistedTables(blacklistedTables)
}
}
if allowQuery {
// Query service should be running.
if oldTablet.Type == topodatapb.TabletType_REPLICA &&
newTablet.Type == topodatapb.TabletType_MASTER {
// When promoting from replica to master, allow both master and replica
// queries to be served during gracePeriod.
if _, err := agent.QueryServiceControl.SetServingType(newTablet.Type,
true, []topodatapb.TabletType{oldTablet.Type}); err == nil {
// If successful, broadcast to vtgate and then wait.
agent.broadcastHealth()
time.Sleep(*gracePeriod)
} else {
log.Errorf("Can't start query service for MASTER+REPLICA mode: %v", err)
}
}
if stateChanged, err := agent.QueryServiceControl.SetServingType(newTablet.Type, true, nil); err == nil {
// If the state changed, broadcast to vtgate.
// (e.g. this happens when the tablet was already master, but it just
// changed from NOT_SERVING to SERVING due to
// "vtctl MigrateServedFrom ... master".)
if stateChanged {
broadcastHealth = true
}
} else {
runUpdateStream = false
log.Errorf("Cannot start query service: %v", err)
}
} else {
// Query service should be stopped.
if topo.IsSubjectToLameduck(oldTablet.Type) &&
newTablet.Type == topodatapb.TabletType_SPARE &&
*gracePeriod > 0 {
// When a non-MASTER serving type is going SPARE,
// put query service in lameduck during gracePeriod.
agent.lameduck(disallowQueryReason)
}
log.Infof("Disabling query service on type change, reason: %v", disallowQueryReason)
if stateChanged, err := agent.QueryServiceControl.SetServingType(newTablet.Type, false, nil); err == nil {
// If the state changed, broadcast to vtgate.
// (e.g. this happens when the tablet was already master, but it just
// changed from SERVING to NOT_SERVING because filtered replication was
// enabled.)
if stateChanged {
broadcastHealth = true
}
} else {
log.Errorf("SetServingType(serving=false) failed: %v", err)
}
}
// update stream needs to be started or stopped too
if topo.IsRunningUpdateStream(newTablet.Type) && runUpdateStream {
agent.UpdateStream.Enable()
} else {
agent.UpdateStream.Disable()
}
// upate the stats to our current type
if agent.exportStats {
agent.statsTabletType.Set(topoproto.TabletTypeLString(newTablet.Type))
}
// See if we need to start or stop any binlog player
if agent.BinlogPlayerMap != nil {
if newTablet.Type == topodatapb.TabletType_MASTER {
agent.BinlogPlayerMap.RefreshMap(agent.batchCtx, newTablet, shardInfo)
} else {
agent.BinlogPlayerMap.StopAllPlayersAndReset()
}
}
// Broadcast health changes to vtgate immediately.
if broadcastHealth {
agent.broadcastHealth()
}
return nil
}