Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mysql: Handle more deprecated SQL commands #15907

Merged
merged 2 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions examples/compose/fix_replication.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ cd "$(dirname "${BASH_SOURCE[0]}")"

function get_replication_status() {
# Get replication status
STATUS_LINE=$(mysql -u$DB_USER -p$DB_PASS -h 127.0.0.1 -e "SHOW SLAVE STATUS\G")
STATUS_LINE=$(mysql -u$DB_USER -p$DB_PASS -h 127.0.0.1 -e "SHOW REPLICA STATUS\G")
LAST_ERRNO=$(grep "Last_IO_Errno:" <<< "$STATUS_LINE" | awk '{ print $2 }')
SLAVE_SQL_RUNNING=$(grep "Slave_SQL_Running:" <<< "$STATUS_LINE" | awk '{ print $2 }')
SLAVE_IO_RUNNING=$(grep "Slave_IO_Running:" <<< "$STATUS_LINE" | awk '{ print $2 }')
MASTER_HOST=$(grep "Master_Host:" <<< "$STATUS_LINE" | awk '{ print $2 }')
MASTER_PORT=$(grep "Master_Port:" <<< "$STATUS_LINE" | awk '{ print $2 }')
REPLICA_SQL_RUNNING=$(grep "Replica_SQL_Running:" <<< "$STATUS_LINE" | awk '{ print $2 }')
REPLICA_IO_RUNNING=$(grep "Replica_IO_Running:" <<< "$STATUS_LINE" | awk '{ print $2 }')
SOURCE_HOST=$(grep "Source_Host:" <<< "$STATUS_LINE" | awk '{ print $2 }')
SOURCE_PORT=$(grep "Source_Port:" <<< "$STATUS_LINE" | awk '{ print $2 }')

echo "Slave_SQL_Running: $SLAVE_SQL_RUNNING"
echo "Slave_IO_Running: $SLAVE_IO_RUNNING"
echo "Replica_SQL_Running: $REPLICA_SQL_RUNNING"
echo "Replica_IO_Running: $REPLICA_IO_RUNNING"
echo "Last_IO_Errno: $LAST_ERRNO"
}

Expand All @@ -54,7 +54,7 @@ get_replication_status
[ ${1:-''} != 'status' ] || exit 0;

# Check if IO_Thread is running
if [[ $SLAVE_IO_RUNNING = "No" && $LAST_ERRNO = 1236 ]]; then
if [[ $REPLICA_IO_RUNNING = "No" && $LAST_ERRNO = 1236 ]]; then

echo "Primary has purged bin logs that replica requires. Sync will require restore from mysqldump"
if [[ -f $KEYSPACE.sql ]] ; then
Expand All @@ -64,7 +64,7 @@ if [[ $SLAVE_IO_RUNNING = "No" && $LAST_ERRNO = 1236 ]]; then
else
echo "Starting mysqldump. This may take a while.."
# Modify flags to user's requirements
if mysqldump -h $MASTER_HOST -P $MASTER_PORT -u$DB_USER -p$DB_PASS --databases $KEYSPACE \
if mysqldump -h $SOURCE_HOST -P $SOURCE_PORT -u$DB_USER -p$DB_PASS --databases $KEYSPACE \
--triggers --routines --events --hex-blob --master-data=1 --quick --order-by-primary \
--no-autocommit --skip-comments --skip-add-drop-table --skip-add-locks \
--skip-disable-keys --single-transaction --set-gtid-purged=on --verbose > $KEYSPACE.sql; then
Expand Down
2 changes: 0 additions & 2 deletions go/mysql/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,4 @@ func IsNum(typ uint8) bool {

const (
readReplicationConnectionConfiguration = "SELECT * FROM performance_schema.replication_connection_configuration"
readReplicaNetTimeout = "select @@global.replica_net_timeout"
readSlaveNetTimeout = "select @@global.slave_net_timeout"
)
30 changes: 9 additions & 21 deletions go/mysql/endtoend/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/mysql/sqlerror"

"vitess.io/vitess/go/mysql"
Expand All @@ -46,15 +47,6 @@ func connectForReplication(t *testing.T, rbr bool) (*mysql.Conn, mysql.BinlogFor
t.Fatal(err)
}

// We need to know if this is MariaDB, to set the right flag.
if conn.IsMariaDB() {
// This flag is required to get GTIDs from MariaDB.
t.Log("MariaDB: sensing SET @mariadb_slave_capability=4")
if _, err := conn.ExecuteFetch("SET @mariadb_slave_capability=4", 0, false); err != nil {
t.Fatalf("failed to set @mariadb_slave_capability=4: %v", err)
}
}

// Switch server to RBR if needed.
if rbr {
if _, err := conn.ExecuteFetch("SET GLOBAL binlog_format='ROW'", 0, false); err != nil {
Expand All @@ -63,25 +55,21 @@ func connectForReplication(t *testing.T, rbr bool) (*mysql.Conn, mysql.BinlogFor
}

// First we get the current binlog position.
result, err := conn.ExecuteFetch("SHOW MASTER STATUS", 1, true)
require.NoError(t, err, "SHOW MASTER STATUS failed: %v", err)
status, err := conn.ShowPrimaryStatus()
require.NoError(t, err, "retrieving primary status failed: %v", err)

if len(result.Fields) < 2 || result.Fields[0].Name != "File" || result.Fields[1].Name != "Position" ||
len(result.Rows) != 1 {
t.Fatalf("SHOW MASTER STATUS returned unexpected result: %v", result)
}
file := result.Rows[0][0].ToString()
position, err := result.Rows[0][1].ToCastUint64()
require.NoError(t, err, "SHOW MASTER STATUS returned invalid position: %v", result.Rows[0][1])
filePos := status.FilePosition.GTIDSet.(replication.FilePosGTID)
file := filePos.File
position := filePos.Pos

// Tell the server that we understand the format of events
// that will be used if binlog_checksum is enabled on the server.
if _, err := conn.ExecuteFetch("SET @master_binlog_checksum=@@global.binlog_checksum", 0, false); err != nil {
t.Fatalf("failed to set @master_binlog_checksum=@@global.binlog_checksum: %v", err)
if _, err := conn.ExecuteFetch("SET @source_binlog_checksum = @@global.binlog_checksum, @master_binlog_checksum=@@global.binlog_checksum", 0, false); err != nil {
t.Fatalf("failed to set @source_binlog_checksum=@@global.binlog_checksum: %v", err)
}

// Write ComBinlogDump packet with to start streaming events from here.
if err := conn.WriteComBinlogDump(1, file, uint32(position), 0); err != nil {
if err := conn.WriteComBinlogDump(1, file, position, 0); err != nil {
t.Fatalf("WriteComBinlogDump failed: %v", err)
}

Expand Down
16 changes: 10 additions & 6 deletions go/mysql/flavor.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,14 @@ type flavor interface {
// as the new replication source (without changing any GTID position).
setReplicationSourceCommand(params *ConnParams, host string, port int32, heartbeatInterval float64, connectRetry int) string

// resetBinaryLogsCommand returns the command to reset the binary logs.
resetBinaryLogsCommand() string

GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
// status returns the result of the appropriate status command,
// with parsed replication position.
status(c *Conn) (replication.ReplicationStatus, error)

// primaryStatus returns the result of 'SHOW MASTER STATUS',
// primaryStatus returns the result of 'SHOW BINARY LOG STATUS',
// with parsed executed position.
primaryStatus(c *Conn) (replication.PrimaryStatus, error)

Expand Down Expand Up @@ -185,15 +188,16 @@ func GetFlavor(serverVersion string, flavorFunc func() flavor) (f flavor, capabl
f = mariadbFlavor102{mariadbFlavor{serverVersion: fmt.Sprintf("%f", mariadbVersion)}}
}
case strings.HasPrefix(serverVersion, mysql8VersionPrefix):
recent, _ := capabilities.MySQLVersionHasCapability(serverVersion, capabilities.ReplicaTerminologyCapability)
if recent {
if latest, _ := capabilities.ServerVersionAtLeast(serverVersion, 8, 2, 0); latest {
f = mysqlFlavor82{mysqlFlavor{serverVersion: serverVersion}}
} else if recent, _ := capabilities.MySQLVersionHasCapability(serverVersion, capabilities.ReplicaTerminologyCapability); recent {
f = mysqlFlavor8{mysqlFlavor{serverVersion: serverVersion}}
} else {
f = mysqlFlavor8Legacy{mysqlFlavor{serverVersion: serverVersion}}
f = mysqlFlavor8Legacy{mysqlFlavorLegacy{mysqlFlavor{serverVersion: serverVersion}}}
}
default:
// If unknown, return the most basic flavor: MySQL 57.
f = mysqlFlavor57{mysqlFlavor{serverVersion: serverVersion}}
f = mysqlFlavor57{mysqlFlavorLegacy{mysqlFlavor{serverVersion: serverVersion}}}
}
return f, f.supportsCapability, canonicalVersion
}
Expand Down Expand Up @@ -400,7 +404,7 @@ func (c *Conn) ShowReplicationStatus() (replication.ReplicationStatus, error) {
return c.flavor.status(c)
}

// ShowPrimaryStatus executes the right SHOW MASTER STATUS command,
// ShowPrimaryStatus executes the right SHOW BINARY LOG STATUS command,
// and returns a parsed executed Position, as well as file based Position.
func (c *Conn) ShowPrimaryStatus() (replication.PrimaryStatus, error) {
return c.flavor.primaryStatus(c)
Expand Down
5 changes: 5 additions & 0 deletions go/mysql/flavor_filepos.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,11 @@ func (flv *filePosFlavor) setReplicationSourceCommand(params *ConnParams, host s
return "unsupported"
}

// resetBinaryLogsCommand is part of the Flavor interface.
func (flv *filePosFlavor) resetBinaryLogsCommand() string {
return "unsupported"
}

// status is part of the Flavor interface.
func (flv *filePosFlavor) status(c *Conn) (replication.ReplicationStatus, error) {
qr, err := c.ExecuteFetch("SHOW SLAVE STATUS", 100, true /* wantfields */)
Expand Down
6 changes: 5 additions & 1 deletion go/mysql/flavor_mariadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ func (mariadbFlavor) setReplicationSourceCommand(params *ConnParams, host string
return "CHANGE MASTER TO\n " + strings.Join(args, ",\n ")
}

func (mariadbFlavor) resetBinaryLogsCommand() string {
return "RESET MASTER"
}

// status is part of the Flavor interface.
func (mariadbFlavor) status(c *Conn) (replication.ReplicationStatus, error) {
qr, err := c.ExecuteFetch("SHOW ALL SLAVES STATUS", 100, true /* wantfields */)
Expand Down Expand Up @@ -288,7 +292,7 @@ func (mariadbFlavor) replicationConfiguration(c *Conn) (*replicationdata.Configu

// replicationNetTimeout is part of the Flavor interface.
func (mariadbFlavor) replicationNetTimeout(c *Conn) (int32, error) {
qr, err := c.ExecuteFetch(readSlaveNetTimeout, 1, false)
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
qr, err := c.ExecuteFetch("select @@global.slave_net_timeout", 1, false)
if err != nil {
return 0, err
}
Expand Down
Loading
Loading