-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
replication_status.go
221 lines (201 loc) · 9.15 KB
/
replication_status.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
/*
Copyright 2019 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mysql
import (
"fmt"
replicationdatapb "vitess.io/vitess/go/vt/proto/replicationdata"
"vitess.io/vitess/go/vt/vterrors"
)
// ReplicationStatus holds replication information from SHOW SLAVE STATUS.
type ReplicationStatus struct {
// Position is the current position of the replica. For GTID replication implementations
// it is the executed GTID set. For file replication implementation, it is same as
// FilePosition
Position Position
// RelayLogPosition is the Position that the replica would be at if it
// were to finish executing everything that's currently in its relay log.
// However, some MySQL flavors don't expose this information,
// in which case RelayLogPosition.IsZero() will be true.
// If ReplicationLagUnknown is true then we should not rely on the seconds
// behind value and we can instead try to calculate the lag ourselves when
// appropriate. For MySQL GTID replication implementation it is the union of
// executed GTID set and retrieved GTID set. For file replication implementation,
// it is same as RelayLogSourceBinlogEquivalentPosition
RelayLogPosition Position
// FilePosition stores the position of the source tablets binary log
// upto which the SQL thread of the replica has run.
FilePosition Position
// RelayLogSourceBinlogEquivalentPosition stores the position of the source tablets binary log
// upto which the IO thread has read and added to the relay log
RelayLogSourceBinlogEquivalentPosition Position
// RelayLogFilePosition stores the position in the relay log file
RelayLogFilePosition Position
SourceServerID uint32
IOState ReplicationState
LastIOError string
SQLState ReplicationState
LastSQLError string
ReplicationLagSeconds uint32
ReplicationLagUnknown bool
SourceHost string
SourcePort int32
SourceUser string
ConnectRetry int32
SourceUUID SID
SQLDelay uint32
AutoPosition bool
UsingGTID bool
HasReplicationFilters bool
SSLAllowed bool
}
// Running returns true if both the IO and SQL threads are running.
func (s *ReplicationStatus) Running() bool {
return s.IOState == ReplicationStateRunning && s.SQLState == ReplicationStateRunning
}
// Healthy returns true if both the SQL IO components are healthy
func (s *ReplicationStatus) Healthy() bool {
return s.SQLHealthy() && s.IOHealthy()
}
// IOHealthy returns true if the IO thread is running OR, the
// IO thread is connecting AND there's no IO error from the last
// attempt to connect to the source.
func (s *ReplicationStatus) IOHealthy() bool {
return s.IOState == ReplicationStateRunning ||
(s.IOState == ReplicationStateConnecting && s.LastIOError == "")
}
// SQLHealthy returns true if the SQLState is running.
// For consistency and to support altering this calculation in the future.
func (s *ReplicationStatus) SQLHealthy() bool {
return s.SQLState == ReplicationStateRunning
}
// ReplicationStatusToProto translates a Status to proto3.
func ReplicationStatusToProto(s ReplicationStatus) *replicationdatapb.Status {
replstatuspb := &replicationdatapb.Status{
Position: EncodePosition(s.Position),
RelayLogPosition: EncodePosition(s.RelayLogPosition),
FilePosition: EncodePosition(s.FilePosition),
RelayLogSourceBinlogEquivalentPosition: EncodePosition(s.RelayLogSourceBinlogEquivalentPosition),
SourceServerId: s.SourceServerID,
ReplicationLagSeconds: s.ReplicationLagSeconds,
ReplicationLagUnknown: s.ReplicationLagUnknown,
SqlDelay: s.SQLDelay,
RelayLogFilePosition: EncodePosition(s.RelayLogFilePosition),
SourceHost: s.SourceHost,
SourceUser: s.SourceUser,
SourcePort: s.SourcePort,
ConnectRetry: s.ConnectRetry,
SourceUuid: s.SourceUUID.String(),
IoState: int32(s.IOState),
LastIoError: s.LastIOError,
SqlState: int32(s.SQLState),
LastSqlError: s.LastSQLError,
SslAllowed: s.SSLAllowed,
HasReplicationFilters: s.HasReplicationFilters,
AutoPosition: s.AutoPosition,
UsingGtid: s.UsingGTID,
}
return replstatuspb
}
// ProtoToReplicationStatus translates a proto Status, or panics.
func ProtoToReplicationStatus(s *replicationdatapb.Status) ReplicationStatus {
pos, err := DecodePosition(s.Position)
if err != nil {
panic(vterrors.Wrapf(err, "cannot decode Position"))
}
relayPos, err := DecodePosition(s.RelayLogPosition)
if err != nil {
panic(vterrors.Wrapf(err, "cannot decode RelayLogPosition"))
}
filePos, err := DecodePosition(s.FilePosition)
if err != nil {
panic(vterrors.Wrapf(err, "cannot decode FilePosition"))
}
fileRelayPos, err := DecodePosition(s.RelayLogSourceBinlogEquivalentPosition)
if err != nil {
panic(vterrors.Wrapf(err, "cannot decode RelayLogSourceBinlogEquivalentPosition"))
}
relayFilePos, err := DecodePosition(s.RelayLogFilePosition)
if err != nil {
panic(vterrors.Wrapf(err, "cannot decode RelayLogFilePosition"))
}
var sid SID
if s.SourceUuid != "" {
sid, err = ParseSID(s.SourceUuid)
if err != nil {
panic(vterrors.Wrapf(err, "cannot decode SourceUUID"))
}
}
replstatus := ReplicationStatus{
Position: pos,
RelayLogPosition: relayPos,
FilePosition: filePos,
RelayLogSourceBinlogEquivalentPosition: fileRelayPos,
RelayLogFilePosition: relayFilePos,
SourceServerID: s.SourceServerId,
ReplicationLagSeconds: s.ReplicationLagSeconds,
ReplicationLagUnknown: s.ReplicationLagUnknown,
SQLDelay: s.SqlDelay,
SourceHost: s.SourceHost,
SourceUser: s.SourceUser,
SourcePort: s.SourcePort,
ConnectRetry: s.ConnectRetry,
SourceUUID: sid,
IOState: ReplicationState(s.IoState),
LastIOError: s.LastIoError,
SQLState: ReplicationState(s.SqlState),
LastSQLError: s.LastSqlError,
SSLAllowed: s.SslAllowed,
HasReplicationFilters: s.HasReplicationFilters,
AutoPosition: s.AutoPosition,
UsingGTID: s.UsingGtid,
}
return replstatus
}
// FindErrantGTIDs can be used to find errant GTIDs in the receiver's relay log, by comparing it against all known replicas,
// provided as a list of ReplicationStatus's. This method only works if the flavor for all retrieved ReplicationStatus's is MySQL.
// The result is returned as a Mysql56GTIDSet, each of whose elements is a found errant GTID.
// This function is best effort in nature. If it marks something as errant, then it is for sure errant. But there may be cases of errant GTIDs, which aren't caught by this function.
func (s *ReplicationStatus) FindErrantGTIDs(otherReplicaStatuses []*ReplicationStatus) (Mysql56GTIDSet, error) {
if len(otherReplicaStatuses) == 0 {
// If there is nothing to compare this replica against, then we must assume that its GTID set is the correct one.
return nil, nil
}
relayLogSet, ok := s.RelayLogPosition.GTIDSet.(Mysql56GTIDSet)
if !ok {
return nil, fmt.Errorf("errant GTIDs can only be computed on the MySQL flavor")
}
otherSets := make([]Mysql56GTIDSet, 0, len(otherReplicaStatuses))
for _, status := range otherReplicaStatuses {
otherSet, ok := status.RelayLogPosition.GTIDSet.(Mysql56GTIDSet)
if !ok {
panic("The receiver ReplicationStatus contained a Mysql56GTIDSet in its relay log, but a replica's ReplicationStatus is of another flavor. This should never happen.")
}
otherSets = append(otherSets, otherSet)
}
// Copy set for final diffSet so we don't mutate receiver.
diffSet := make(Mysql56GTIDSet, len(relayLogSet))
for sid, intervals := range relayLogSet {
if sid == s.SourceUUID {
continue
}
diffSet[sid] = intervals
}
for _, otherSet := range otherSets {
diffSet = diffSet.Difference(otherSet)
}
if len(diffSet) == 0 {
// If diffSet is empty, then we have no errant GTIDs.
return nil, nil
}
return diffSet, nil
}