-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
flavor_mysqlgr.go
278 lines (243 loc) · 8.65 KB
/
flavor_mysqlgr.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
/*
Copyright 2021 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mysql
import (
"errors"
"fmt"
"math"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/sqltypes"
)
// GRFlavorID is the string identifier for the MysqlGR flavor.
const GRFlavorID = "MysqlGR"
// ErrNoGroupStatus means no status for group replication.
var ErrNoGroupStatus = errors.New("no group status")
// mysqlGRFlavor implements the Flavor interface for Mysql.
type mysqlGRFlavor struct {
mysqlFlavor
}
// newMysqlGRFlavor creates a new mysqlGR flavor.
func newMysqlGRFlavor() flavor {
return &mysqlGRFlavor{}
}
// startReplicationCommand returns the command to start the replication.
// we return empty here since `START GROUP_REPLICATION` should be called by
// the external orchestrator
func (mysqlGRFlavor) startReplicationCommand() string {
return ""
}
// restartReplicationCommands is disabled in mysqlGRFlavor
func (mysqlGRFlavor) restartReplicationCommands() []string {
return []string{}
}
// startReplicationUntilAfter is disabled in mysqlGRFlavor
func (mysqlGRFlavor) startReplicationUntilAfter(pos Position) string {
return ""
}
// startSQLThreadUntilAfter is disabled in mysqlGRFlavor
func (mysqlGRFlavor) startSQLThreadUntilAfter(pos Position) string {
return ""
}
// stopReplicationCommand returns the command to stop the replication.
// we return empty here since `STOP GROUP_REPLICATION` should be called by
// the external orchestrator
func (mysqlGRFlavor) stopReplicationCommand() string {
return ""
}
// stopIOThreadCommand is disabled in mysqlGRFlavor
func (mysqlGRFlavor) stopIOThreadCommand() string {
return ""
}
// stopSQLThreadCommand is disabled in mysqlGRFlavor
func (mysqlGRFlavor) stopSQLThreadCommand() string {
return ""
}
// startSQLThreadCommand is disabled in mysqlGRFlavor
func (mysqlGRFlavor) startSQLThreadCommand() string {
return ""
}
// resetReplicationCommands is disabled in mysqlGRFlavor
func (mysqlGRFlavor) resetReplicationCommands(c *Conn) []string {
return []string{}
}
// resetReplicationParametersCommands is part of the Flavor interface.
func (mysqlGRFlavor) resetReplicationParametersCommands(c *Conn) []string {
return []string{}
}
// setReplicationPositionCommands is disabled in mysqlGRFlavor
func (mysqlGRFlavor) setReplicationPositionCommands(pos Position) []string {
return []string{}
}
// status returns the result of the appropriate status command,
// with parsed replication position.
//
// Note: primary will skip this function, only replica will call it.
// TODO: Right now the GR's lag is defined as the lag between a node processing a txn
// and the time the txn was committed. We should consider reporting lag between current queueing txn timestamp
// from replication_connection_status and the current processing txn's commit timestamp
func (mysqlGRFlavor) status(c *Conn) (ReplicationStatus, error) {
res := ReplicationStatus{}
// Get primary node information
query := `SELECT
MEMBER_HOST,
MEMBER_PORT
FROM
performance_schema.replication_group_members
WHERE
MEMBER_ROLE='PRIMARY' AND MEMBER_STATE='ONLINE'`
err := fetchStatusForGroupReplication(c, query, func(values []sqltypes.Value) error {
parsePrimaryGroupMember(&res, values)
return nil
})
if err != nil {
return ReplicationStatus{}, err
}
query = `SELECT
MEMBER_STATE
FROM
performance_schema.replication_group_members
WHERE
MEMBER_HOST=convert(@@hostname using ascii) AND MEMBER_PORT=@@port`
var chanel string
err = fetchStatusForGroupReplication(c, query, func(values []sqltypes.Value) error {
state := values[0].ToString()
if state == "ONLINE" {
chanel = "group_replication_applier"
} else if state == "RECOVERING" {
chanel = "group_replication_recovery"
} else { // OFFLINE, ERROR, UNREACHABLE
// If the member is not in healthy state, use max int as lag
res.ReplicationLagSeconds = math.MaxUint32
}
return nil
})
if err != nil {
return ReplicationStatus{}, err
}
// if chanel is not set, it means the state is not ONLINE or RECOVERING
// return partial result early
if chanel == "" {
return res, nil
}
// Populate IOState from replication_connection_status
query = fmt.Sprintf(`SELECT SERVICE_STATE
FROM performance_schema.replication_connection_status
WHERE CHANNEL_NAME='%s'`, chanel)
var connectionState ReplicationState
err = fetchStatusForGroupReplication(c, query, func(values []sqltypes.Value) error {
connectionState = ReplicationStatusToState(values[0].ToString())
return nil
})
if err != nil {
return ReplicationStatus{}, err
}
res.IOState = connectionState
// Populate SQLState from replication_connection_status
var applierState ReplicationState
query = fmt.Sprintf(`SELECT SERVICE_STATE
FROM performance_schema.replication_applier_status_by_coordinator
WHERE CHANNEL_NAME='%s'`, chanel)
err = fetchStatusForGroupReplication(c, query, func(values []sqltypes.Value) error {
applierState = ReplicationStatusToState(values[0].ToString())
return nil
})
if err != nil {
return ReplicationStatus{}, err
}
res.SQLState = applierState
// Collect lag information
// we use the difference between the last processed transaction's commit time
// and the end buffer time as the proxy to the lag
query = fmt.Sprintf(`SELECT
TIMESTAMPDIFF(SECOND, LAST_PROCESSED_TRANSACTION_ORIGINAL_COMMIT_TIMESTAMP, LAST_PROCESSED_TRANSACTION_END_BUFFER_TIMESTAMP)
FROM
performance_schema.replication_applier_status_by_coordinator
WHERE
CHANNEL_NAME='%s'`, chanel)
err = fetchStatusForGroupReplication(c, query, func(values []sqltypes.Value) error {
parseReplicationApplierLag(&res, values)
return nil
})
if err != nil {
return ReplicationStatus{}, err
}
return res, nil
}
func parsePrimaryGroupMember(res *ReplicationStatus, row []sqltypes.Value) {
res.SourceHost = row[0].ToString() /* MEMBER_HOST */
memberPort, _ := row[1].ToInt64() /* MEMBER_PORT */
res.SourcePort = int(memberPort)
}
func parseReplicationApplierLag(res *ReplicationStatus, row []sqltypes.Value) {
lagSec, err := row[0].ToInt64()
// if the error is not nil, ReplicationLagSeconds will remain to be MaxUint32
if err == nil {
// Only set where there is no error
// The value can be NULL when there is no replication applied yet
res.ReplicationLagSeconds = uint(lagSec)
}
}
func fetchStatusForGroupReplication(c *Conn, query string, onResult func([]sqltypes.Value) error) error {
qr, err := c.ExecuteFetch(query, 100, true /* wantfields */)
if err != nil {
return err
}
// if group replication related query returns 0 rows, it means the group replication is not set up
if len(qr.Rows) == 0 {
return ErrNoGroupStatus
}
if len(qr.Rows) > 1 {
return vterrors.Errorf(vtrpc.Code_INTERNAL, "unexpected results for %v: %v", query, qr.Rows)
}
return onResult(qr.Rows[0])
}
// primaryStatus returns the result of 'SHOW MASTER STATUS',
// with parsed executed position.
func (mysqlGRFlavor) primaryStatus(c *Conn) (PrimaryStatus, error) {
return mysqlFlavor{}.primaryStatus(c)
}
func (mysqlGRFlavor) baseShowTables() string {
return mysqlFlavor{}.baseShowTables()
}
func (mysqlGRFlavor) baseShowTablesWithSizes() string {
return TablesWithSize80
}
// supportsCapability is part of the Flavor interface.
func (mysqlGRFlavor) supportsCapability(serverVersion string, capability FlavorCapability) (bool, error) {
switch capability {
case InstantDDLFlavorCapability,
InstantExpandEnumCapability,
InstantAddLastColumnFlavorCapability,
InstantAddDropVirtualColumnFlavorCapability,
InstantChangeColumnDefaultFlavorCapability:
return ServerVersionAtLeast(serverVersion, 8, 0, 0)
case InstantAddDropColumnFlavorCapability:
return ServerVersionAtLeast(serverVersion, 8, 0, 29)
case TransactionalGtidExecutedFlavorCapability:
return ServerVersionAtLeast(serverVersion, 8, 0, 17)
case FastDropTableFlavorCapability:
return ServerVersionAtLeast(serverVersion, 8, 0, 23)
case MySQLJSONFlavorCapability:
return ServerVersionAtLeast(serverVersion, 5, 7, 0)
case MySQLUpgradeInServerFlavorCapability:
return ServerVersionAtLeast(serverVersion, 8, 0, 16)
case DynamicRedoLogCapacityFlavorCapability:
return ServerVersionAtLeast(serverVersion, 8, 0, 30)
default:
return false, nil
}
}
func init() {
flavors[GRFlavorID] = newMysqlGRFlavor
}