forked from vitessio/vitess
-
Notifications
You must be signed in to change notification settings - Fork 0
/
replication.go
134 lines (117 loc) · 4.14 KB
/
replication.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// Copyright 2013, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package topo
import (
log "github.com/golang/glog"
"golang.org/x/net/context"
"github.com/youtube/vitess/go/trace"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/topo/topoproto"
pb "github.com/youtube/vitess/go/vt/proto/topodata"
)
// ShardReplicationInfo is the companion structure for ShardReplication.
type ShardReplicationInfo struct {
*pb.ShardReplication
cell string
keyspace string
shard string
}
// NewShardReplicationInfo is for topo.Server implementations to
// create the structure
func NewShardReplicationInfo(sr *pb.ShardReplication, cell, keyspace, shard string) *ShardReplicationInfo {
return &ShardReplicationInfo{
ShardReplication: sr,
cell: cell,
keyspace: keyspace,
shard: shard,
}
}
// Cell returns the cell for a ShardReplicationInfo
func (sri *ShardReplicationInfo) Cell() string {
return sri.cell
}
// Keyspace returns the keyspace for a ShardReplicationInfo
func (sri *ShardReplicationInfo) Keyspace() string {
return sri.keyspace
}
// Shard returns the shard for a ShardReplicationInfo
func (sri *ShardReplicationInfo) Shard() string {
return sri.shard
}
// GetShardReplicationNode finds a node for a given tablet.
func (sri *ShardReplicationInfo) GetShardReplicationNode(tabletAlias *pb.TabletAlias) (*pb.ShardReplication_Node, error) {
for _, rl := range sri.Nodes {
if *rl.TabletAlias == *tabletAlias {
return rl, nil
}
}
return nil, ErrNoNode
}
// UpdateShardReplicationRecord is a low level function to add / update an
// entry to the ShardReplication object.
func UpdateShardReplicationRecord(ctx context.Context, ts Server, keyspace, shard string, tabletAlias *pb.TabletAlias) error {
span := trace.NewSpanFromContext(ctx)
span.StartClient("TopoServer.UpdateShardReplicationFields")
span.Annotate("keyspace", keyspace)
span.Annotate("shard", shard)
span.Annotate("tablet", topoproto.TabletAliasString(tabletAlias))
defer span.Finish()
return ts.UpdateShardReplicationFields(ctx, tabletAlias.Cell, keyspace, shard, func(sr *pb.ShardReplication) error {
// not very efficient, but easy to read
nodes := make([]*pb.ShardReplication_Node, 0, len(sr.Nodes)+1)
found := false
for _, node := range sr.Nodes {
if *node.TabletAlias == *tabletAlias {
if found {
log.Warningf("Found a second ShardReplication_Node for tablet %v, deleting it", tabletAlias)
continue
}
found = true
}
nodes = append(nodes, node)
}
if !found {
nodes = append(nodes, &pb.ShardReplication_Node{TabletAlias: tabletAlias})
}
sr.Nodes = nodes
return nil
})
}
// RemoveShardReplicationRecord is a low level function to remove an
// entry from the ShardReplication object.
func RemoveShardReplicationRecord(ctx context.Context, ts Server, cell, keyspace, shard string, tabletAlias *pb.TabletAlias) error {
err := ts.UpdateShardReplicationFields(ctx, cell, keyspace, shard, func(sr *pb.ShardReplication) error {
nodes := make([]*pb.ShardReplication_Node, 0, len(sr.Nodes))
for _, node := range sr.Nodes {
if *node.TabletAlias != *tabletAlias {
nodes = append(nodes, node)
}
}
sr.Nodes = nodes
return nil
})
return err
}
// FixShardReplication will fix the first problem it encounters within
// a ShardReplication object
func FixShardReplication(ctx context.Context, ts Server, logger logutil.Logger, cell, keyspace, shard string) error {
sri, err := ts.GetShardReplication(ctx, cell, keyspace, shard)
if err != nil {
return err
}
for _, node := range sri.Nodes {
_, err := ts.GetTablet(ctx, node.TabletAlias)
if err == ErrNoNode {
logger.Warningf("Tablet %v is in the replication graph, but does not exist, removing it", node.TabletAlias)
return RemoveShardReplicationRecord(ctx, ts, cell, keyspace, shard, node.TabletAlias)
}
if err != nil {
// unknown error, we probably don't want to continue
return err
}
logger.Infof("Keeping tablet %v in the replication graph", node.TabletAlias)
}
logger.Infof("All entries in replication graph are valid")
return nil
}