-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
topology_recovery.go
2190 lines (1999 loc) · 106 KB
/
topology_recovery.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
Copyright 2015 Shlomi Noach, courtesy Booking.com
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package logic
import (
"encoding/json"
"fmt"
"math/rand"
goos "os"
"sort"
"strings"
"sync/atomic"
"time"
"github.com/patrickmn/go-cache"
"github.com/rcrowley/go-metrics"
"vitess.io/vitess/go/vt/orchestrator/attributes"
"vitess.io/vitess/go/vt/orchestrator/config"
"vitess.io/vitess/go/vt/orchestrator/external/golib/log"
"vitess.io/vitess/go/vt/orchestrator/inst"
"vitess.io/vitess/go/vt/orchestrator/kv"
ometrics "vitess.io/vitess/go/vt/orchestrator/metrics"
"vitess.io/vitess/go/vt/orchestrator/os"
"vitess.io/vitess/go/vt/orchestrator/process"
"vitess.io/vitess/go/vt/orchestrator/util"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)
var countPendingRecoveries int64
type RecoveryType string
const (
MasterRecovery RecoveryType = "MasterRecovery"
CoMasterRecovery RecoveryType = "CoMasterRecovery"
IntermediateMasterRecovery RecoveryType = "IntermediateMasterRecovery"
)
type RecoveryAcknowledgement struct {
CreatedAt time.Time
Owner string
Comment string
Key inst.InstanceKey
ClusterName string
Id int64
UID string
AllRecoveries bool
}
func NewRecoveryAcknowledgement(owner string, comment string) *RecoveryAcknowledgement {
return &RecoveryAcknowledgement{
CreatedAt: time.Now(),
Owner: owner,
Comment: comment,
}
}
func NewInternalAcknowledgement() *RecoveryAcknowledgement {
return &RecoveryAcknowledgement{
CreatedAt: time.Now(),
Owner: "orchestrator",
Comment: "internal",
}
}
// BlockedTopologyRecovery represents an entry in the blocked_topology_recovery table
type BlockedTopologyRecovery struct {
FailedInstanceKey inst.InstanceKey
ClusterName string
Analysis inst.AnalysisCode
LastBlockedTimestamp string
BlockingRecoveryId int64
}
// TopologyRecovery represents an entry in the topology_recovery table
type TopologyRecovery struct {
inst.PostponedFunctionsContainer
Id int64
UID string
AnalysisEntry inst.ReplicationAnalysis
SuccessorKey *inst.InstanceKey
SuccessorAlias string
IsActive bool
IsSuccessful bool
LostReplicas inst.InstanceKeyMap
ParticipatingInstanceKeys inst.InstanceKeyMap
AllErrors []string
RecoveryStartTimestamp string
RecoveryEndTimestamp string
ProcessingNodeHostname string
ProcessingNodeToken string
Acknowledged bool
AcknowledgedAt string
AcknowledgedBy string
AcknowledgedComment string
LastDetectionId int64
RelatedRecoveryId int64
Type RecoveryType
RecoveryType MasterRecoveryType
}
func NewTopologyRecovery(replicationAnalysis inst.ReplicationAnalysis) *TopologyRecovery {
topologyRecovery := &TopologyRecovery{}
topologyRecovery.UID = util.PrettyUniqueToken()
topologyRecovery.AnalysisEntry = replicationAnalysis
topologyRecovery.SuccessorKey = nil
topologyRecovery.LostReplicas = *inst.NewInstanceKeyMap()
topologyRecovery.ParticipatingInstanceKeys = *inst.NewInstanceKeyMap()
topologyRecovery.AllErrors = []string{}
topologyRecovery.RecoveryType = NotMasterRecovery
return topologyRecovery
}
func (this *TopologyRecovery) AddError(err error) error {
if err != nil {
this.AllErrors = append(this.AllErrors, err.Error())
}
return err
}
func (this *TopologyRecovery) AddErrors(errs []error) {
for _, err := range errs {
this.AddError(err)
}
}
type TopologyRecoveryStep struct {
Id int64
RecoveryUID string
AuditAt string
Message string
}
func NewTopologyRecoveryStep(uid string, message string) *TopologyRecoveryStep {
return &TopologyRecoveryStep{
RecoveryUID: uid,
Message: message,
}
}
type MasterRecoveryType string
const (
NotMasterRecovery MasterRecoveryType = "NotMasterRecovery"
MasterRecoveryGTID MasterRecoveryType = "MasterRecoveryGTID"
MasterRecoveryBinlogServer MasterRecoveryType = "MasterRecoveryBinlogServer"
MasterRecoveryUnknown MasterRecoveryType = "MasterRecoveryUnknown"
)
var emergencyReadTopologyInstanceMap *cache.Cache
var emergencyRestartReplicaTopologyInstanceMap *cache.Cache
var emergencyOperationGracefulPeriodMap *cache.Cache
// InstancesByCountReplicas sorts instances by umber of replicas, descending
type InstancesByCountReplicas [](*inst.Instance)
func (this InstancesByCountReplicas) Len() int { return len(this) }
func (this InstancesByCountReplicas) Swap(i, j int) { this[i], this[j] = this[j], this[i] }
func (this InstancesByCountReplicas) Less(i, j int) bool {
if len(this[i].Replicas) == len(this[j].Replicas) {
// Secondary sorting: prefer more advanced replicas
return !this[i].ExecBinlogCoordinates.SmallerThan(&this[j].ExecBinlogCoordinates)
}
return len(this[i].Replicas) < len(this[j].Replicas)
}
var recoverDeadMasterCounter = metrics.NewCounter()
var recoverDeadMasterSuccessCounter = metrics.NewCounter()
var recoverDeadMasterFailureCounter = metrics.NewCounter()
var recoverDeadIntermediateMasterCounter = metrics.NewCounter()
var recoverDeadIntermediateMasterSuccessCounter = metrics.NewCounter()
var recoverDeadIntermediateMasterFailureCounter = metrics.NewCounter()
var recoverDeadCoMasterCounter = metrics.NewCounter()
var recoverDeadCoMasterSuccessCounter = metrics.NewCounter()
var recoverDeadCoMasterFailureCounter = metrics.NewCounter()
var countPendingRecoveriesGauge = metrics.NewGauge()
func init() {
metrics.Register("recover.dead_master.start", recoverDeadMasterCounter)
metrics.Register("recover.dead_master.success", recoverDeadMasterSuccessCounter)
metrics.Register("recover.dead_master.fail", recoverDeadMasterFailureCounter)
metrics.Register("recover.dead_intermediate_master.start", recoverDeadIntermediateMasterCounter)
metrics.Register("recover.dead_intermediate_master.success", recoverDeadIntermediateMasterSuccessCounter)
metrics.Register("recover.dead_intermediate_master.fail", recoverDeadIntermediateMasterFailureCounter)
metrics.Register("recover.dead_co_master.start", recoverDeadCoMasterCounter)
metrics.Register("recover.dead_co_master.success", recoverDeadCoMasterSuccessCounter)
metrics.Register("recover.dead_co_master.fail", recoverDeadCoMasterFailureCounter)
metrics.Register("recover.pending", countPendingRecoveriesGauge)
go initializeTopologyRecoveryPostConfiguration()
ometrics.OnMetricsTick(func() {
countPendingRecoveriesGauge.Update(getCountPendingRecoveries())
})
}
func getCountPendingRecoveries() int64 {
return atomic.LoadInt64(&countPendingRecoveries)
}
func initializeTopologyRecoveryPostConfiguration() {
config.WaitForConfigurationToBeLoaded()
emergencyReadTopologyInstanceMap = cache.New(time.Second, time.Millisecond*250)
emergencyRestartReplicaTopologyInstanceMap = cache.New(time.Second*30, time.Second)
emergencyOperationGracefulPeriodMap = cache.New(time.Second*5, time.Millisecond*500)
}
// AuditTopologyRecovery audits a single step in a topology recovery process.
func AuditTopologyRecovery(topologyRecovery *TopologyRecovery, message string) error {
log.Infof("topology_recovery: %s", message)
if topologyRecovery == nil {
return nil
}
recoveryStep := NewTopologyRecoveryStep(topologyRecovery.UID, message)
return writeTopologyRecoveryStep(recoveryStep)
}
func resolveRecovery(topologyRecovery *TopologyRecovery, successorInstance *inst.Instance) error {
if successorInstance != nil {
topologyRecovery.SuccessorKey = &successorInstance.Key
topologyRecovery.SuccessorAlias = successorInstance.InstanceAlias
topologyRecovery.IsSuccessful = true
}
return writeResolveRecovery(topologyRecovery)
}
// prepareCommand replaces agreed-upon placeholders with analysis data
func prepareCommand(command string, topologyRecovery *TopologyRecovery) (result string, async bool) {
analysisEntry := &topologyRecovery.AnalysisEntry
command = strings.TrimSpace(command)
if strings.HasSuffix(command, "&") {
command = strings.TrimRight(command, "&")
async = true
}
command = strings.Replace(command, "{failureType}", string(analysisEntry.Analysis), -1)
command = strings.Replace(command, "{instanceType}", string(analysisEntry.GetAnalysisInstanceType()), -1)
command = strings.Replace(command, "{isMaster}", fmt.Sprintf("%t", analysisEntry.IsMaster), -1)
command = strings.Replace(command, "{isCoMaster}", fmt.Sprintf("%t", analysisEntry.IsCoMaster), -1)
command = strings.Replace(command, "{failureDescription}", analysisEntry.Description, -1)
command = strings.Replace(command, "{command}", analysisEntry.CommandHint, -1)
command = strings.Replace(command, "{failedHost}", analysisEntry.AnalyzedInstanceKey.Hostname, -1)
command = strings.Replace(command, "{failedPort}", fmt.Sprintf("%d", analysisEntry.AnalyzedInstanceKey.Port), -1)
command = strings.Replace(command, "{failureCluster}", analysisEntry.ClusterDetails.ClusterName, -1)
command = strings.Replace(command, "{failureClusterAlias}", analysisEntry.ClusterDetails.ClusterAlias, -1)
command = strings.Replace(command, "{failureClusterDomain}", analysisEntry.ClusterDetails.ClusterDomain, -1)
command = strings.Replace(command, "{countSlaves}", fmt.Sprintf("%d", analysisEntry.CountReplicas), -1)
command = strings.Replace(command, "{countReplicas}", fmt.Sprintf("%d", analysisEntry.CountReplicas), -1)
command = strings.Replace(command, "{isDowntimed}", fmt.Sprint(analysisEntry.IsDowntimed), -1)
command = strings.Replace(command, "{autoMasterRecovery}", fmt.Sprint(analysisEntry.ClusterDetails.HasAutomatedMasterRecovery), -1)
command = strings.Replace(command, "{autoIntermediateMasterRecovery}", fmt.Sprint(analysisEntry.ClusterDetails.HasAutomatedIntermediateMasterRecovery), -1)
command = strings.Replace(command, "{orchestratorHost}", process.ThisHostname, -1)
command = strings.Replace(command, "{recoveryUID}", topologyRecovery.UID, -1)
command = strings.Replace(command, "{isSuccessful}", fmt.Sprint(topologyRecovery.SuccessorKey != nil), -1)
if topologyRecovery.SuccessorKey != nil {
command = strings.Replace(command, "{successorHost}", topologyRecovery.SuccessorKey.Hostname, -1)
command = strings.Replace(command, "{successorPort}", fmt.Sprintf("%d", topologyRecovery.SuccessorKey.Port), -1)
// As long as SucesssorKey != nil, we replace {successorAlias}.
// If SucessorAlias is "", it's fine. We'll replace {successorAlias} with "".
command = strings.Replace(command, "{successorAlias}", topologyRecovery.SuccessorAlias, -1)
}
command = strings.Replace(command, "{lostSlaves}", topologyRecovery.LostReplicas.ToCommaDelimitedList(), -1)
command = strings.Replace(command, "{lostReplicas}", topologyRecovery.LostReplicas.ToCommaDelimitedList(), -1)
command = strings.Replace(command, "{countLostReplicas}", fmt.Sprintf("%d", len(topologyRecovery.LostReplicas)), -1)
command = strings.Replace(command, "{slaveHosts}", analysisEntry.Replicas.ToCommaDelimitedList(), -1)
command = strings.Replace(command, "{replicaHosts}", analysisEntry.Replicas.ToCommaDelimitedList(), -1)
return command, async
}
// applyEnvironmentVariables sets the relevant environment variables for a recovery
func applyEnvironmentVariables(topologyRecovery *TopologyRecovery) []string {
analysisEntry := &topologyRecovery.AnalysisEntry
env := goos.Environ()
env = append(env, fmt.Sprintf("ORC_FAILURE_TYPE=%s", string(analysisEntry.Analysis)))
env = append(env, fmt.Sprintf("ORC_INSTANCE_TYPE=%s", string(analysisEntry.GetAnalysisInstanceType())))
env = append(env, fmt.Sprintf("ORC_IS_MASTER=%t", analysisEntry.IsMaster))
env = append(env, fmt.Sprintf("ORC_IS_CO_MASTER=%t", analysisEntry.IsCoMaster))
env = append(env, fmt.Sprintf("ORC_FAILURE_DESCRIPTION=%s", analysisEntry.Description))
env = append(env, fmt.Sprintf("ORC_COMMAND=%s", analysisEntry.CommandHint))
env = append(env, fmt.Sprintf("ORC_FAILED_HOST=%s", analysisEntry.AnalyzedInstanceKey.Hostname))
env = append(env, fmt.Sprintf("ORC_FAILED_PORT=%d", analysisEntry.AnalyzedInstanceKey.Port))
env = append(env, fmt.Sprintf("ORC_FAILURE_CLUSTER=%s", analysisEntry.ClusterDetails.ClusterName))
env = append(env, fmt.Sprintf("ORC_FAILURE_CLUSTER_ALIAS=%s", analysisEntry.ClusterDetails.ClusterAlias))
env = append(env, fmt.Sprintf("ORC_FAILURE_CLUSTER_DOMAIN=%s", analysisEntry.ClusterDetails.ClusterDomain))
env = append(env, fmt.Sprintf("ORC_COUNT_REPLICAS=%d", analysisEntry.CountReplicas))
env = append(env, fmt.Sprintf("ORC_IS_DOWNTIMED=%v", analysisEntry.IsDowntimed))
env = append(env, fmt.Sprintf("ORC_AUTO_MASTER_RECOVERY=%v", analysisEntry.ClusterDetails.HasAutomatedMasterRecovery))
env = append(env, fmt.Sprintf("ORC_AUTO_INTERMEDIATE_MASTER_RECOVERY=%v", analysisEntry.ClusterDetails.HasAutomatedIntermediateMasterRecovery))
env = append(env, fmt.Sprintf("ORC_ORCHESTRATOR_HOST=%s", process.ThisHostname))
env = append(env, fmt.Sprintf("ORC_IS_SUCCESSFUL=%v", (topologyRecovery.SuccessorKey != nil)))
env = append(env, fmt.Sprintf("ORC_LOST_REPLICAS=%s", topologyRecovery.LostReplicas.ToCommaDelimitedList()))
env = append(env, fmt.Sprintf("ORC_REPLICA_HOSTS=%s", analysisEntry.Replicas.ToCommaDelimitedList()))
env = append(env, fmt.Sprintf("ORC_RECOVERY_UID=%s", topologyRecovery.UID))
if topologyRecovery.SuccessorKey != nil {
env = append(env, fmt.Sprintf("ORC_SUCCESSOR_HOST=%s", topologyRecovery.SuccessorKey.Hostname))
env = append(env, fmt.Sprintf("ORC_SUCCESSOR_PORT=%d", topologyRecovery.SuccessorKey.Port))
// As long as SucesssorKey != nil, we replace {successorAlias}.
// If SucessorAlias is "", it's fine. We'll replace {successorAlias} with "".
env = append(env, fmt.Sprintf("ORC_SUCCESSOR_ALIAS=%s", topologyRecovery.SuccessorAlias))
}
return env
}
func executeProcess(command string, env []string, topologyRecovery *TopologyRecovery, fullDescription string) (err error) {
// Log the command to be run and record how long it takes as this may be useful
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("Running %s: %s", fullDescription, command))
start := time.Now()
var info string
if err = os.CommandRun(command, env); err == nil {
info = fmt.Sprintf("Completed %s in %v", fullDescription, time.Since(start))
} else {
info = fmt.Sprintf("Execution of %s failed in %v with error: %v", fullDescription, time.Since(start), err)
log.Errorf(info)
}
AuditTopologyRecovery(topologyRecovery, info)
return err
}
// executeProcesses executes a list of processes
func executeProcesses(processes []string, description string, topologyRecovery *TopologyRecovery, failOnError bool) (err error) {
if len(processes) == 0 {
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("No %s hooks to run", description))
return nil
}
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("Running %d %s hooks", len(processes), description))
for i, command := range processes {
command, async := prepareCommand(command, topologyRecovery)
env := applyEnvironmentVariables(topologyRecovery)
fullDescription := fmt.Sprintf("%s hook %d of %d", description, i+1, len(processes))
if async {
fullDescription = fmt.Sprintf("%s (async)", fullDescription)
}
if async {
// Ignore errors
go executeProcess(command, env, topologyRecovery, fullDescription)
} else {
if cmdErr := executeProcess(command, env, topologyRecovery, fullDescription); cmdErr != nil {
if failOnError {
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("Not running further %s hooks", description))
return cmdErr
}
if err == nil {
// Keep first error encountered
err = cmdErr
}
}
}
}
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("done running %s hooks", description))
return err
}
func recoverDeadMasterInBinlogServerTopology(topologyRecovery *TopologyRecovery) (promotedReplica *inst.Instance, err error) {
failedMasterKey := &topologyRecovery.AnalysisEntry.AnalyzedInstanceKey
var promotedBinlogServer *inst.Instance
_, promotedBinlogServer, err = inst.RegroupReplicasBinlogServers(failedMasterKey, true)
if err != nil {
return nil, log.Errore(err)
}
promotedBinlogServer, err = inst.StopReplication(&promotedBinlogServer.Key)
if err != nil {
return promotedReplica, log.Errore(err)
}
// Find candidate replica
promotedReplica, err = inst.GetCandidateReplicaOfBinlogServerTopology(&promotedBinlogServer.Key)
if err != nil {
return promotedReplica, log.Errore(err)
}
// Align it with binlog server coordinates
promotedReplica, err = inst.StopReplication(&promotedReplica.Key)
if err != nil {
return promotedReplica, log.Errore(err)
}
promotedReplica, err = inst.StartReplicationUntilMasterCoordinates(&promotedReplica.Key, &promotedBinlogServer.ExecBinlogCoordinates)
if err != nil {
return promotedReplica, log.Errore(err)
}
promotedReplica, err = inst.StopReplication(&promotedReplica.Key)
if err != nil {
return promotedReplica, log.Errore(err)
}
// Detach, flush binary logs forward
promotedReplica, err = inst.ResetReplication(&promotedReplica.Key)
if err != nil {
return promotedReplica, log.Errore(err)
}
promotedReplica, err = inst.FlushBinaryLogsTo(&promotedReplica.Key, promotedBinlogServer.ExecBinlogCoordinates.LogFile)
if err != nil {
return promotedReplica, log.Errore(err)
}
promotedReplica, err = inst.FlushBinaryLogs(&promotedReplica.Key, 1)
if err != nil {
return promotedReplica, log.Errore(err)
}
promotedReplica, err = inst.PurgeBinaryLogsToLatest(&promotedReplica.Key, false)
if err != nil {
return promotedReplica, log.Errore(err)
}
// Reconnect binlog servers to promoted replica (now master):
promotedBinlogServer, err = inst.SkipToNextBinaryLog(&promotedBinlogServer.Key)
if err != nil {
return promotedReplica, log.Errore(err)
}
promotedBinlogServer, err = inst.Repoint(&promotedBinlogServer.Key, &promotedReplica.Key, inst.GTIDHintDeny)
if err != nil {
return nil, log.Errore(err)
}
func() {
// Move binlog server replicas up to replicate from master.
// This can only be done once a BLS has skipped to the next binlog
// We postpone this operation. The master is already promoted and we're happy.
binlogServerReplicas, err := inst.ReadBinlogServerReplicaInstances(&promotedBinlogServer.Key)
if err != nil {
return
}
maxBinlogServersToPromote := 3
for i, binlogServerReplica := range binlogServerReplicas {
binlogServerReplica := binlogServerReplica
if i >= maxBinlogServersToPromote {
return
}
postponedFunction := func() error {
binlogServerReplica, err := inst.StopReplication(&binlogServerReplica.Key)
if err != nil {
return err
}
// Make sure the BLS has the "next binlog" -- the one the master flushed & purged to. Otherwise the BLS
// will request a binlog the master does not have
if binlogServerReplica.ExecBinlogCoordinates.SmallerThan(&promotedBinlogServer.ExecBinlogCoordinates) {
binlogServerReplica, err = inst.StartReplicationUntilMasterCoordinates(&binlogServerReplica.Key, &promotedBinlogServer.ExecBinlogCoordinates)
if err != nil {
return err
}
}
_, err = inst.Repoint(&binlogServerReplica.Key, &promotedReplica.Key, inst.GTIDHintDeny)
return err
}
topologyRecovery.AddPostponedFunction(postponedFunction, fmt.Sprintf("recoverDeadMasterInBinlogServerTopology, moving binlog server %+v", binlogServerReplica.Key))
}
}()
return promotedReplica, err
}
func GetMasterRecoveryType(analysisEntry *inst.ReplicationAnalysis) (masterRecoveryType MasterRecoveryType) {
masterRecoveryType = MasterRecoveryUnknown
if analysisEntry.OracleGTIDImmediateTopology || analysisEntry.MariaDBGTIDImmediateTopology {
masterRecoveryType = MasterRecoveryGTID
} else if analysisEntry.BinlogServerImmediateTopology {
masterRecoveryType = MasterRecoveryBinlogServer
}
return masterRecoveryType
}
// recoverDeadMaster recovers a dead master, complete logic inside
func recoverDeadMaster(topologyRecovery *TopologyRecovery, candidateInstanceKey *inst.InstanceKey, skipProcesses bool) (recoveryAttempted bool, promotedReplica *inst.Instance, lostReplicas [](*inst.Instance), err error) {
topologyRecovery.Type = MasterRecovery
analysisEntry := &topologyRecovery.AnalysisEntry
failedInstanceKey := &analysisEntry.AnalyzedInstanceKey
var cannotReplicateReplicas [](*inst.Instance)
postponedAll := false
inst.AuditOperation("recover-dead-master", failedInstanceKey, "problem found; will recover")
if !skipProcesses {
if err := executeProcesses(config.Config.PreFailoverProcesses, "PreFailoverProcesses", topologyRecovery, true); err != nil {
return false, nil, lostReplicas, topologyRecovery.AddError(err)
}
}
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("RecoverDeadMaster: will recover %+v", *failedInstanceKey))
err = TabletDemoteMaster(*failedInstanceKey)
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("RecoverDeadMaster: TabletDemoteMaster: %v", err))
topologyRecovery.RecoveryType = GetMasterRecoveryType(analysisEntry)
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("RecoverDeadMaster: masterRecoveryType=%+v", topologyRecovery.RecoveryType))
promotedReplicaIsIdeal := func(promoted *inst.Instance, hasBestPromotionRule bool) bool {
if promoted == nil {
return false
}
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("RecoverDeadMaster: promotedReplicaIsIdeal(%+v)", promoted.Key))
if candidateInstanceKey != nil { //explicit request to promote a specific server
return promoted.Key.Equals(candidateInstanceKey)
}
if promoted.DataCenter == topologyRecovery.AnalysisEntry.AnalyzedInstanceDataCenter &&
promoted.PhysicalEnvironment == topologyRecovery.AnalysisEntry.AnalyzedInstancePhysicalEnvironment {
if promoted.PromotionRule == inst.MustPromoteRule || promoted.PromotionRule == inst.PreferPromoteRule ||
(hasBestPromotionRule && promoted.PromotionRule != inst.MustNotPromoteRule) {
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("RecoverDeadMaster: found %+v to be ideal candidate; will optimize recovery", promoted.Key))
postponedAll = true
return true
}
}
return false
}
switch topologyRecovery.RecoveryType {
case MasterRecoveryUnknown:
{
return false, nil, lostReplicas, topologyRecovery.AddError(log.Errorf("RecoveryType unknown/unsupported"))
}
case MasterRecoveryGTID:
{
AuditTopologyRecovery(topologyRecovery, "RecoverDeadMaster: regrouping replicas via GTID")
lostReplicas, _, cannotReplicateReplicas, promotedReplica, err = inst.RegroupReplicasGTID(failedInstanceKey, true, nil, &topologyRecovery.PostponedFunctionsContainer, promotedReplicaIsIdeal)
}
case MasterRecoveryBinlogServer:
{
AuditTopologyRecovery(topologyRecovery, "RecoverDeadMaster: recovering via binlog servers")
promotedReplica, err = recoverDeadMasterInBinlogServerTopology(topologyRecovery)
}
}
topologyRecovery.AddError(err)
lostReplicas = append(lostReplicas, cannotReplicateReplicas...)
for _, replica := range lostReplicas {
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("RecoverDeadMaster: - lost replica: %+v", replica.Key))
}
if promotedReplica != nil && len(lostReplicas) > 0 && config.Config.DetachLostReplicasAfterMasterFailover {
postponedFunction := func() error {
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("RecoverDeadMaster: lost %+v replicas during recovery process; detaching them", len(lostReplicas)))
for _, replica := range lostReplicas {
replica := replica
inst.DetachReplicaMasterHost(&replica.Key)
}
return nil
}
topologyRecovery.AddPostponedFunction(postponedFunction, fmt.Sprintf("RecoverDeadMaster, detach %+v lost replicas", len(lostReplicas)))
}
func() error {
// TODO(sougou): Commented out: this downtime feels a little aggressive.
//inst.BeginDowntime(inst.NewDowntime(failedInstanceKey, inst.GetMaintenanceOwner(), inst.DowntimeLostInRecoveryMessage, time.Duration(config.LostInRecoveryDowntimeSeconds)*time.Second))
acknowledgeInstanceFailureDetection(&analysisEntry.AnalyzedInstanceKey)
for _, replica := range lostReplicas {
replica := replica
inst.BeginDowntime(inst.NewDowntime(&replica.Key, inst.GetMaintenanceOwner(), inst.DowntimeLostInRecoveryMessage, time.Duration(config.LostInRecoveryDowntimeSeconds)*time.Second))
}
return nil
}()
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("RecoverDeadMaster: %d postponed functions", topologyRecovery.PostponedFunctionsContainer.Len()))
if promotedReplica != nil && !postponedAll {
promotedReplica, err = replacePromotedReplicaWithCandidate(topologyRecovery, &analysisEntry.AnalyzedInstanceKey, promotedReplica, candidateInstanceKey)
topologyRecovery.AddError(err)
}
if promotedReplica == nil {
err := TabletUndoDemoteMaster(*failedInstanceKey)
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("RecoverDeadMaster: TabletUndoDemoteMaster: %v", err))
message := "Failure: no replica promoted."
AuditTopologyRecovery(topologyRecovery, message)
inst.AuditOperation("recover-dead-master", failedInstanceKey, message)
return true, promotedReplica, lostReplicas, err
}
message := fmt.Sprintf("promoted replica: %+v", promotedReplica.Key)
AuditTopologyRecovery(topologyRecovery, message)
inst.AuditOperation("recover-dead-master", failedInstanceKey, message)
return true, promotedReplica, lostReplicas, err
}
func MasterFailoverGeographicConstraintSatisfied(analysisEntry *inst.ReplicationAnalysis, suggestedInstance *inst.Instance) (satisfied bool, dissatisfiedReason string) {
if config.Config.PreventCrossDataCenterMasterFailover {
if suggestedInstance.DataCenter != analysisEntry.AnalyzedInstanceDataCenter {
return false, fmt.Sprintf("PreventCrossDataCenterMasterFailover: will not promote server in %s when failed server in %s", suggestedInstance.DataCenter, analysisEntry.AnalyzedInstanceDataCenter)
}
}
if config.Config.PreventCrossRegionMasterFailover {
if suggestedInstance.Region != analysisEntry.AnalyzedInstanceRegion {
return false, fmt.Sprintf("PreventCrossRegionMasterFailover: will not promote server in %s when failed server in %s", suggestedInstance.Region, analysisEntry.AnalyzedInstanceRegion)
}
}
return true, ""
}
// SuggestReplacementForPromotedReplica returns a server to take over the already
// promoted replica, if such server is found and makes an improvement over the promoted replica.
func SuggestReplacementForPromotedReplica(topologyRecovery *TopologyRecovery, deadInstanceKey *inst.InstanceKey, promotedReplica *inst.Instance, candidateInstanceKey *inst.InstanceKey) (replacement *inst.Instance, actionRequired bool, err error) {
candidateReplicas, _ := inst.ReadClusterCandidateInstances(promotedReplica.ClusterName)
candidateReplicas = inst.RemoveInstance(candidateReplicas, deadInstanceKey)
deadInstance, _, err := inst.ReadInstance(deadInstanceKey)
if err != nil {
deadInstance = nil
}
// So we've already promoted a replica.
// However, can we improve on our choice? Are there any replicas marked with "is_candidate"?
// Maybe we actually promoted such a replica. Does that mean we should keep it?
// Maybe we promoted a "neutral", and some "prefer" server is available.
// Maybe we promoted a "prefer_not"
// Maybe we promoted a server in a different DC than the master
// There's many options. We may wish to replace the server we promoted with a better one.
AuditTopologyRecovery(topologyRecovery, "checking if should replace promoted replica with a better candidate")
if candidateInstanceKey == nil {
AuditTopologyRecovery(topologyRecovery, "+ checking if promoted replica is the ideal candidate")
if deadInstance != nil {
for _, candidateReplica := range candidateReplicas {
if promotedReplica.Key.Equals(&candidateReplica.Key) &&
promotedReplica.DataCenter == deadInstance.DataCenter &&
promotedReplica.PhysicalEnvironment == deadInstance.PhysicalEnvironment {
// Seems like we promoted a candidate in the same DC & ENV as dead IM! Ideal! We're happy!
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("promoted replica %+v is the ideal candidate", promotedReplica.Key))
return promotedReplica, false, nil
}
}
}
}
// We didn't pick the ideal candidate; let's see if we can replace with a candidate from same DC and ENV
if candidateInstanceKey == nil {
// Try a candidate replica that is in same DC & env as the dead instance
AuditTopologyRecovery(topologyRecovery, "+ searching for an ideal candidate")
if deadInstance != nil {
for _, candidateReplica := range candidateReplicas {
if canTakeOverPromotedServerAsMaster(candidateReplica, promotedReplica) &&
candidateReplica.DataCenter == deadInstance.DataCenter &&
candidateReplica.PhysicalEnvironment == deadInstance.PhysicalEnvironment {
// This would make a great candidate
candidateInstanceKey = &candidateReplica.Key
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("no candidate was offered for %+v but orchestrator picks %+v as candidate replacement, based on being in same DC & env as failed instance", *deadInstanceKey, candidateReplica.Key))
}
}
}
}
if candidateInstanceKey == nil {
// We cannot find a candidate in same DC and ENV as dead master
AuditTopologyRecovery(topologyRecovery, "+ checking if promoted replica is an OK candidate")
for _, candidateReplica := range candidateReplicas {
if promotedReplica.Key.Equals(&candidateReplica.Key) {
// Seems like we promoted a candidate replica (though not in same DC and ENV as dead master)
if satisfied, reason := MasterFailoverGeographicConstraintSatisfied(&topologyRecovery.AnalysisEntry, candidateReplica); satisfied {
// Good enough. No further action required.
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("promoted replica %+v is a good candidate", promotedReplica.Key))
return promotedReplica, false, nil
} else {
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("skipping %+v; %s", candidateReplica.Key, reason))
}
}
}
}
// Still nothing?
if candidateInstanceKey == nil {
// Try a candidate replica that is in same DC & env as the promoted replica (our promoted replica is not an "is_candidate")
AuditTopologyRecovery(topologyRecovery, "+ searching for a candidate")
for _, candidateReplica := range candidateReplicas {
if canTakeOverPromotedServerAsMaster(candidateReplica, promotedReplica) &&
promotedReplica.DataCenter == candidateReplica.DataCenter &&
promotedReplica.PhysicalEnvironment == candidateReplica.PhysicalEnvironment {
// OK, better than nothing
candidateInstanceKey = &candidateReplica.Key
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("no candidate was offered for %+v but orchestrator picks %+v as candidate replacement, based on being in same DC & env as promoted instance", promotedReplica.Key, candidateReplica.Key))
}
}
}
// Still nothing?
if candidateInstanceKey == nil {
// Try a candidate replica (our promoted replica is not an "is_candidate")
AuditTopologyRecovery(topologyRecovery, "+ searching for a candidate")
for _, candidateReplica := range candidateReplicas {
if canTakeOverPromotedServerAsMaster(candidateReplica, promotedReplica) {
if satisfied, reason := MasterFailoverGeographicConstraintSatisfied(&topologyRecovery.AnalysisEntry, candidateReplica); satisfied {
// OK, better than nothing
candidateInstanceKey = &candidateReplica.Key
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("no candidate was offered for %+v but orchestrator picks %+v as candidate replacement", promotedReplica.Key, candidateReplica.Key))
} else {
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("skipping %+v; %s", candidateReplica.Key, reason))
}
}
}
}
keepSearchingHint := ""
if satisfied, reason := MasterFailoverGeographicConstraintSatisfied(&topologyRecovery.AnalysisEntry, promotedReplica); !satisfied {
keepSearchingHint = fmt.Sprintf("Will keep searching; %s", reason)
} else if promotedReplica.PromotionRule == inst.PreferNotPromoteRule {
keepSearchingHint = fmt.Sprintf("Will keep searching because we have promoted a server with prefer_not rule: %+v", promotedReplica.Key)
}
if keepSearchingHint != "" {
AuditTopologyRecovery(topologyRecovery, keepSearchingHint)
neutralReplicas, _ := inst.ReadClusterNeutralPromotionRuleInstances(promotedReplica.ClusterName)
if candidateInstanceKey == nil {
// Still nothing? Then we didn't find a replica marked as "candidate". OK, further down the stream we have:
// find neutral instance in same dv&env as dead master
AuditTopologyRecovery(topologyRecovery, "+ searching for a neutral server to replace promoted server, in same DC and env as dead master")
for _, neutralReplica := range neutralReplicas {
if canTakeOverPromotedServerAsMaster(neutralReplica, promotedReplica) &&
deadInstance.DataCenter == neutralReplica.DataCenter &&
deadInstance.PhysicalEnvironment == neutralReplica.PhysicalEnvironment {
candidateInstanceKey = &neutralReplica.Key
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("no candidate was offered for %+v but orchestrator picks %+v as candidate replacement, based on being in same DC & env as dead master", promotedReplica.Key, neutralReplica.Key))
}
}
}
if candidateInstanceKey == nil {
// find neutral instance in same dv&env as promoted replica
AuditTopologyRecovery(topologyRecovery, "+ searching for a neutral server to replace promoted server, in same DC and env as promoted replica")
for _, neutralReplica := range neutralReplicas {
if canTakeOverPromotedServerAsMaster(neutralReplica, promotedReplica) &&
promotedReplica.DataCenter == neutralReplica.DataCenter &&
promotedReplica.PhysicalEnvironment == neutralReplica.PhysicalEnvironment {
candidateInstanceKey = &neutralReplica.Key
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("no candidate was offered for %+v but orchestrator picks %+v as candidate replacement, based on being in same DC & env as promoted instance", promotedReplica.Key, neutralReplica.Key))
}
}
}
if candidateInstanceKey == nil {
AuditTopologyRecovery(topologyRecovery, "+ searching for a neutral server to replace a prefer_not")
for _, neutralReplica := range neutralReplicas {
if canTakeOverPromotedServerAsMaster(neutralReplica, promotedReplica) {
if satisfied, reason := MasterFailoverGeographicConstraintSatisfied(&topologyRecovery.AnalysisEntry, neutralReplica); satisfied {
// OK, better than nothing
candidateInstanceKey = &neutralReplica.Key
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("no candidate was offered for %+v but orchestrator picks %+v as candidate replacement, based on promoted instance having prefer_not promotion rule", promotedReplica.Key, neutralReplica.Key))
} else {
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("skipping %+v; %s", neutralReplica.Key, reason))
}
}
}
}
}
// So do we have a candidate?
if candidateInstanceKey == nil {
// Found nothing. Stick with promoted replica
AuditTopologyRecovery(topologyRecovery, "+ found no server to promote on top promoted replica")
return promotedReplica, false, nil
}
if promotedReplica.Key.Equals(candidateInstanceKey) {
// Sanity. It IS the candidate, nothing to promote...
AuditTopologyRecovery(topologyRecovery, "+ sanity check: found our very own server to promote; doing nothing")
return promotedReplica, false, nil
}
replacement, _, err = inst.ReadInstance(candidateInstanceKey)
return replacement, true, err
}
// replacePromotedReplicaWithCandidate is called after a master (or co-master)
// died and was replaced by some promotedReplica.
// But, is there an even better replica to promote?
// if candidateInstanceKey is given, then it is forced to be promoted over the promotedReplica
// Otherwise, search for the best to promote!
func replacePromotedReplicaWithCandidate(topologyRecovery *TopologyRecovery, deadInstanceKey *inst.InstanceKey, promotedReplica *inst.Instance, candidateInstanceKey *inst.InstanceKey) (*inst.Instance, error) {
candidateInstance, actionRequired, err := SuggestReplacementForPromotedReplica(topologyRecovery, deadInstanceKey, promotedReplica, candidateInstanceKey)
if err != nil {
return promotedReplica, log.Errore(err)
}
if !actionRequired {
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("replace-promoted-replica-with-candidate: promoted instance %+v requires no further action", promotedReplica.Key))
return promotedReplica, nil
}
// Try and promote suggested candidate, if applicable and possible
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("replace-promoted-replica-with-candidate: promoted instance %+v is not the suggested candidate %+v. Will see what can be done", promotedReplica.Key, candidateInstance.Key))
if candidateInstance.MasterKey.Equals(&promotedReplica.Key) {
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("replace-promoted-replica-with-candidate: suggested candidate %+v is replica of promoted instance %+v. Will try and take its master", candidateInstance.Key, promotedReplica.Key))
candidateInstance, err = inst.TakeMaster(&candidateInstance.Key, topologyRecovery.Type == CoMasterRecovery)
if err != nil {
return promotedReplica, log.Errore(err)
}
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("success promoting %+v over %+v", candidateInstance.Key, promotedReplica.Key))
// As followup to taking over, let's relocate all the rest of the replicas under the candidate instance
relocateReplicasFunc := func() error {
log.Debugf("replace-promoted-replica-with-candidate: relocating replicas of %+v below %+v", promotedReplica.Key, candidateInstance.Key)
relocatedReplicas, _, err, _ := inst.RelocateReplicas(&promotedReplica.Key, &candidateInstance.Key, "")
log.Debugf("replace-promoted-replica-with-candidate: + relocated %+v replicas of %+v below %+v", len(relocatedReplicas), promotedReplica.Key, candidateInstance.Key)
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("relocated %+v replicas of %+v below %+v", len(relocatedReplicas), promotedReplica.Key, candidateInstance.Key))
return log.Errore(err)
}
postponedFunctionsContainer := &topologyRecovery.PostponedFunctionsContainer
if postponedFunctionsContainer != nil {
postponedFunctionsContainer.AddPostponedFunction(relocateReplicasFunc, fmt.Sprintf("replace-promoted-replica-with-candidate: relocate replicas of %+v", promotedReplica.Key))
} else {
_ = relocateReplicasFunc()
// We do not propagate the error. It is logged, but otherwise should not fail the entire failover operation
}
return candidateInstance, nil
}
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("could not manage to promoted suggested candidate %+v", candidateInstance.Key))
return promotedReplica, nil
}
// checkAndRecoverDeadMaster checks a given analysis, decides whether to take action, and possibly takes action
// Returns true when action was taken.
func checkAndRecoverDeadMaster(analysisEntry inst.ReplicationAnalysis, candidateInstanceKey *inst.InstanceKey, forceInstanceRecovery bool, skipProcesses bool) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) {
if !(forceInstanceRecovery || analysisEntry.ClusterDetails.HasAutomatedMasterRecovery) {
return false, nil, nil
}
topologyRecovery, err = AttemptRecoveryRegistration(&analysisEntry, !forceInstanceRecovery, !forceInstanceRecovery)
if topologyRecovery == nil {
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another RecoverDeadMaster.", analysisEntry.AnalyzedInstanceKey))
return false, nil, err
}
log.Infof("Analysis: %v, deadmaster %+v", analysisEntry.Analysis, analysisEntry.AnalyzedInstanceKey)
// That's it! We must do recovery!
// TODO(sougou): This function gets called by GracefulMasterTakeover which may
// need to obtain shard lock before getting here.
unlock, err := LockShard(analysisEntry.AnalyzedInstanceKey)
if err != nil {
log.Infof("CheckAndRecover: Analysis: %+v, InstanceKey: %+v, candidateInstanceKey: %+v, "+
"skipProcesses: %v: NOT detecting/recovering host, could not obtain shard lock (%v)",
analysisEntry.Analysis, analysisEntry.AnalyzedInstanceKey, candidateInstanceKey, skipProcesses, err)
return false, nil, err
}
defer unlock(&err)
// Check if someone else fixed the problem.
tablet, err := TabletRefresh(analysisEntry.AnalyzedInstanceKey)
if err == nil && tablet.Type != topodatapb.TabletType_MASTER {
// TODO(sougou); use a version that only refreshes the current shard.
RefreshTablets()
AuditTopologyRecovery(topologyRecovery, "another agent seems to have fixed the problem")
// TODO(sougou): see if we have to reset the cluster as healthy.
return false, topologyRecovery, nil
}
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("will handle DeadMaster event on %+v", analysisEntry.ClusterDetails.ClusterName))
recoverDeadMasterCounter.Inc(1)
recoveryAttempted, promotedReplica, lostReplicas, err := recoverDeadMaster(topologyRecovery, candidateInstanceKey, skipProcesses)
if err != nil {
AuditTopologyRecovery(topologyRecovery, err.Error())
}
topologyRecovery.LostReplicas.AddInstances(lostReplicas)
if !recoveryAttempted {
return false, topologyRecovery, err
}
overrideMasterPromotion := func() (*inst.Instance, error) {
if promotedReplica == nil {
// No promotion; nothing to override.
return promotedReplica, err
}
// Scenarios where we might cancel the promotion.
if satisfied, reason := MasterFailoverGeographicConstraintSatisfied(&analysisEntry, promotedReplica); !satisfied {
return nil, fmt.Errorf("RecoverDeadMaster: failed %+v promotion; %s", promotedReplica.Key, reason)
}
if config.Config.FailMasterPromotionOnLagMinutes > 0 &&
time.Duration(promotedReplica.ReplicationLagSeconds.Int64)*time.Second >= time.Duration(config.Config.FailMasterPromotionOnLagMinutes)*time.Minute {
// candidate replica lags too much
return nil, fmt.Errorf("RecoverDeadMaster: failed promotion. FailMasterPromotionOnLagMinutes is set to %d (minutes) and promoted replica %+v 's lag is %d (seconds)", config.Config.FailMasterPromotionOnLagMinutes, promotedReplica.Key, promotedReplica.ReplicationLagSeconds.Int64)
}
if config.Config.FailMasterPromotionIfSQLThreadNotUpToDate && !promotedReplica.SQLThreadUpToDate() {
return nil, fmt.Errorf("RecoverDeadMaster: failed promotion. FailMasterPromotionIfSQLThreadNotUpToDate is set and promoted replica %+v 's sql thread is not up to date (relay logs still unapplied). Aborting promotion", promotedReplica.Key)
}
if config.Config.DelayMasterPromotionIfSQLThreadNotUpToDate && !promotedReplica.SQLThreadUpToDate() {
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("DelayMasterPromotionIfSQLThreadNotUpToDate: waiting for SQL thread on %+v", promotedReplica.Key))
if _, err := inst.WaitForSQLThreadUpToDate(&promotedReplica.Key, 0, 0); err != nil {
return nil, fmt.Errorf("DelayMasterPromotionIfSQLThreadNotUpToDate error: %+v", err)
}
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("DelayMasterPromotionIfSQLThreadNotUpToDate: SQL thread caught up on %+v", promotedReplica.Key))
}
// All seems well. No override done.
return promotedReplica, err
}
if promotedReplica, err = overrideMasterPromotion(); err != nil {
AuditTopologyRecovery(topologyRecovery, err.Error())
}
// And this is the end; whether successful or not, we're done.
resolveRecovery(topologyRecovery, promotedReplica)
// Now, see whether we are successful or not. From this point there's no going back.
if promotedReplica != nil {
// Success!
recoverDeadMasterSuccessCounter.Inc(1)
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("RecoverDeadMaster: successfully promoted %+v", promotedReplica.Key))
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("- RecoverDeadMaster: promoted server coordinates: %+v", promotedReplica.SelfBinlogCoordinates))
AuditTopologyRecovery(topologyRecovery, "- RecoverDeadMaster: will apply MySQL changes to promoted master")
{
_, err := inst.ResetReplicationOperation(&promotedReplica.Key)
if err != nil {
// Ugly, but this is important. Let's give it another try
_, err = inst.ResetReplicationOperation(&promotedReplica.Key)
}
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("- RecoverDeadMaster: applying RESET SLAVE ALL on promoted master: success=%t", (err == nil)))
if err != nil {
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("- RecoverDeadMaster: NOTE that %+v is promoted even though SHOW SLAVE STATUS may still show it has a master", promotedReplica.Key))
}
}
{
count := inst.MasterSemiSync(promotedReplica.Key)
err := inst.SetSemiSyncMaster(&promotedReplica.Key, count > 0)
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("- RecoverDeadMaster: applying semi-sync %v: success=%t", count > 0, (err == nil)))
// Dont' allow writes if semi-sync settings fail.
if err == nil {
_, err := inst.SetReadOnly(&promotedReplica.Key, false)
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("- RecoverDeadMaster: applying read-only=0 on promoted master: success=%t", (err == nil)))
}
}
// Let's attempt, though we won't necessarily succeed, to set old master as read-only
go func() {
_, err := inst.SetReadOnly(&analysisEntry.AnalyzedInstanceKey, true)
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("- RecoverDeadMaster: applying read-only=1 on demoted master: success=%t", (err == nil)))
}()
kvPairs := inst.GetClusterMasterKVPairs(analysisEntry.ClusterDetails.ClusterAlias, &promotedReplica.Key)
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("Writing KV %+v", kvPairs))
for _, kvPair := range kvPairs {
err := kv.PutKVPair(kvPair)
log.Errore(err)
}
{
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("Distributing KV %+v", kvPairs))
err := kv.DistributePairs(kvPairs)
log.Errore(err)
}
if config.Config.MasterFailoverDetachReplicaMasterHost {
postponedFunction := func() error {
AuditTopologyRecovery(topologyRecovery, "- RecoverDeadMaster: detaching master host on promoted master")
inst.DetachReplicaMasterHost(&promotedReplica.Key)
return nil
}
topologyRecovery.AddPostponedFunction(postponedFunction, fmt.Sprintf("RecoverDeadMaster, detaching promoted master host %+v", promotedReplica.Key))
}
func() error {
before := analysisEntry.AnalyzedInstanceKey.StringCode()
after := promotedReplica.Key.StringCode()
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("- RecoverDeadMaster: updating cluster_alias: %v -> %v", before, after))
//~~~inst.ReplaceClusterName(before, after)
if alias := analysisEntry.ClusterDetails.ClusterAlias; alias != "" {
inst.SetClusterAlias(promotedReplica.Key.StringCode(), alias)
} else {
inst.ReplaceAliasClusterName(before, after)
}
return nil
}()
attributes.SetGeneralAttribute(analysisEntry.ClusterDetails.ClusterDomain, promotedReplica.Key.StringCode())
if !skipProcesses {
// Execute post master-failover processes
executeProcesses(config.Config.PostMasterFailoverProcesses, "PostMasterFailoverProcesses", topologyRecovery, false)
}
} else {
recoverDeadMasterFailureCounter.Inc(1)
}
return true, topologyRecovery, err
}
// isGenerallyValidAsCandidateSiblingOfIntermediateMaster sees that basic server configuration and state are valid
func isGenerallyValidAsCandidateSiblingOfIntermediateMaster(sibling *inst.Instance) bool {
if !sibling.LogBinEnabled {
return false
}
if !sibling.LogReplicationUpdatesEnabled {
return false
}
if !sibling.ReplicaRunning() {
return false
}
if !sibling.IsLastCheckValid {
return false
}
return true
}
// isValidAsCandidateSiblingOfIntermediateMaster checks to see that the given sibling is capable to take over instance's replicas
func isValidAsCandidateSiblingOfIntermediateMaster(intermediateMasterInstance *inst.Instance, sibling *inst.Instance) bool {
if sibling.Key.Equals(&intermediateMasterInstance.Key) {
// same instance
return false
}
if !isGenerallyValidAsCandidateSiblingOfIntermediateMaster(sibling) {
return false
}
if inst.IsBannedFromBeingCandidateReplica(sibling) {