Skip to content

Before the successful renaming, a session accessed the ghost table, w… #1536

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jun 27, 2025
4 changes: 3 additions & 1 deletion go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,9 @@ type MigrationContext struct {

recentBinlogCoordinates mysql.BinlogCoordinates

BinlogSyncerMaxReconnectAttempts int
BinlogSyncerMaxReconnectAttempts int
AllowSetupMetadataLockInstruments bool
IsOpenMetadataLockInstruments bool

Log Logger
}
Expand Down
1 change: 1 addition & 0 deletions go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func main() {
flag.Int64Var(&migrationContext.HooksStatusIntervalSec, "hooks-status-interval", 60, "how many seconds to wait between calling onStatus hook")

flag.UintVar(&migrationContext.ReplicaServerId, "replica-server-id", 99999, "server id used by gh-ost process. Default: 99999")
flag.BoolVar(&migrationContext.AllowSetupMetadataLockInstruments, "allow-setup-metadata-lock-instruments", false, "validate rename session hold the MDL of original table before unlock tables in cut-over phase")
flag.IntVar(&migrationContext.BinlogSyncerMaxReconnectAttempts, "binlogsyncer-max-reconnect-attempts", 0, "when master node fails, the maximum number of binlog synchronization attempts to reconnect. 0 is unlimited")

flag.BoolVar(&migrationContext.IncludeTriggers, "include-triggers", false, "When true, the triggers (if exist) will be created on the new table")
Expand Down
62 changes: 61 additions & 1 deletion go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,28 @@ func (this *Applier) dropTable(tableName string) error {
return nil
}

func (this *Applier) StateMetadataLockInstrument() error {
query := `select /*+ MAX_EXECUTION_TIME(300) */ ENABLED, TIMED from performance_schema.setup_instruments WHERE NAME = 'wait/lock/metadata/sql/mdl'`
var enabled, timed string
if err := this.db.QueryRow(query).Scan(&enabled, &timed); err != nil {
return this.migrationContext.Log.Errorf("query performance_schema.setup_instruments with name wait/lock/metadata/sql/mdl error: %s", err)
}
if strings.EqualFold(enabled, "YES") && strings.EqualFold(timed, "YES") {
this.migrationContext.IsOpenMetadataLockInstruments = true
return nil
}
if !this.migrationContext.AllowSetupMetadataLockInstruments {
return nil
}
this.migrationContext.Log.Infof("instrument wait/lock/metadata/sql/mdl state: enabled %s, timed %s", enabled, timed)
if _, err := this.db.Exec(`UPDATE performance_schema.setup_instruments SET ENABLED = 'YES', TIMED = 'YES' WHERE NAME = 'wait/lock/metadata/sql/mdl'`); err != nil {
return this.migrationContext.Log.Errorf("enable instrument wait/lock/metadata/sql/mdl error: %s", err)
}
this.migrationContext.IsOpenMetadataLockInstruments = true
this.migrationContext.Log.Infof("instrument wait/lock/metadata/sql/mdl enabled")
return nil
}

// dropTriggers drop the triggers on the applied host
func (this *Applier) DropTriggersFromGhost() error {
if len(this.migrationContext.Triggers) > 0 {
Expand Down Expand Up @@ -1095,7 +1117,7 @@ func (this *Applier) RevertAtomicCutOverWaitTimeout() {
}

// AtomicCutOverMagicLock
func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error) error {
func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error, renameLockSessionId *int64) error {
tx, err := this.db.Begin()
if err != nil {
tableLocked <- err
Expand Down Expand Up @@ -1186,6 +1208,20 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
// We DO NOT return here because we must `UNLOCK TABLES`!
}

this.migrationContext.Log.Infof("Session renameLockSessionId is %+v", *renameLockSessionId)
// Checking the lock is held by rename session
if *renameLockSessionId > 0 && this.migrationContext.IsOpenMetadataLockInstruments {
sleepDuration := time.Duration(10*this.migrationContext.CutOverLockTimeoutSeconds) * time.Millisecond
for i := 1; i <= 100; i++ {
err := this.ExpectMetadataLock(*renameLockSessionId)
if err == nil {
this.migrationContext.Log.Infof("Rename session is pending lock on the origin table !")
break
} else {
time.Sleep(sleepDuration)
}
}
}
// Tables still locked
this.migrationContext.Log.Infof("Releasing lock from %s.%s, %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
Expand Down Expand Up @@ -1405,3 +1441,27 @@ func (this *Applier) Teardown() {
this.singletonDB.Close()
atomic.StoreInt64(&this.finishedMigrating, 1)
}

func (this *Applier) ExpectMetadataLock(sessionId int64) error {
found := false
query := `
select /* gh-ost */ m.owner_thread_id
from performance_schema.metadata_locks m join performance_schema.threads t
on m.owner_thread_id=t.thread_id
where m.object_type = 'TABLE' and m.object_schema = ? and m.object_name = ?
and m.lock_type = 'EXCLUSIVE' and m.lock_status = 'PENDING'
and t.processlist_id = ?
`
err := sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
found = true
return nil
}, this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, sessionId)
if err != nil {
return err
}
if !found {
err = fmt.Errorf("cannot find PENDING metadata lock on original table: `%s`.`%s`", this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName)
return this.migrationContext.Log.Errore(err)
}
return nil
}
8 changes: 7 additions & 1 deletion go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,8 +669,9 @@ func (this *Migrator) atomicCutOver() (err error) {
lockOriginalSessionIdChan := make(chan int64, 2)
tableLocked := make(chan error, 2)
tableUnlocked := make(chan error, 2)
var renameLockSessionId int64
go func() {
if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked); err != nil {
if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked, &renameLockSessionId); err != nil {
this.migrationContext.Log.Errore(err)
}
}()
Expand Down Expand Up @@ -735,6 +736,7 @@ func (this *Migrator) atomicCutOver() (err error) {
// Now that we've found the RENAME blocking, AND the locking connection still alive,
// we know it is safe to proceed to release the lock

renameLockSessionId = renameSessionId
okToUnlockTable <- true
// BAM! magic table dropped, original table lock is released
// -> RENAME released -> queries on original are unblocked.
Expand Down Expand Up @@ -1203,6 +1205,10 @@ func (this *Migrator) initiateApplier() error {
}
}
this.applier.WriteChangelogState(string(GhostTableMigrated))
if err := this.applier.StateMetadataLockInstrument(); err != nil {
this.migrationContext.Log.Errorf("Unable to enable metadata lock instrument, see further error details. Bailing out")
return err
}
go this.applier.InitiateHeartbeat()
return nil
}
Expand Down
84 changes: 83 additions & 1 deletion go/logic/migrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
gosql "database/sql"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
Expand All @@ -22,7 +23,7 @@ import (
"github.com/stretchr/testify/suite"
"github.com/testcontainers/testcontainers-go/modules/mysql"

"fmt"
"runtime"

"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/binlog"
Expand Down Expand Up @@ -335,6 +336,7 @@ func (suite *MigratorTestSuite) TestMigrateEmpty() {
migrationContext.ApplierConnectionConfig = connectionConfig
migrationContext.InspectorConnectionConfig = connectionConfig
migrationContext.SetConnectionConfig("innodb")
migrationContext.InitiallyDropOldTable = true

migrationContext.AlterStatementOptions = "ADD COLUMN foobar varchar(255), ENGINE=InnoDB"

Expand Down Expand Up @@ -573,6 +575,86 @@ func TestMigratorRetryWithExponentialBackoff(t *testing.T) {
assert.Equal(t, tries, 100)
}

func (suite *MigratorTestSuite) TestCutOverLossDataCaseLockGhostBeforeRename() {
ctx := context.Background()

_, err := suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, name VARCHAR(64))", getTestTableName()))
suite.Require().NoError(err)

_, err = suite.db.ExecContext(ctx, fmt.Sprintf("insert into %s values(1,'a')", getTestTableName()))
suite.Require().NoError(err)

done := make(chan error, 1)
go func() {
connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
if err != nil {
done <- err
return
}
migrationContext := newTestMigrationContext()
migrationContext.ApplierConnectionConfig = connectionConfig
migrationContext.InspectorConnectionConfig = connectionConfig
migrationContext.SetConnectionConfig("innodb")
migrationContext.AllowSetupMetadataLockInstruments = true
migrationContext.AlterStatementOptions = "ADD COLUMN foobar varchar(255)"
migrationContext.HeartbeatIntervalMilliseconds = 100
migrationContext.CutOverLockTimeoutSeconds = 4

_, filename, _, _ := runtime.Caller(0)
migrationContext.PostponeCutOverFlagFile = filepath.Join(filepath.Dir(filename), "../../tmp/ghost.postpone.flag")

migrator := NewMigrator(migrationContext, "0.0.0")

//nolint:contextcheck
done <- migrator.Migrate()
}()

time.Sleep(2 * time.Second)
//nolint:dogsled
_, filename, _, _ := runtime.Caller(0)
err = os.Remove(filepath.Join(filepath.Dir(filename), "../../tmp/ghost.postpone.flag"))
if err != nil {
suite.Require().NoError(err)
}
time.Sleep(1 * time.Second)
go func() {
holdConn, err := suite.db.Conn(ctx)
suite.Require().NoError(err)
_, err = holdConn.ExecContext(ctx, "SELECT *, sleep(2) FROM test._testing_gho WHERE id = 1")
suite.Require().NoError(err)
}()

dmlConn, err := suite.db.Conn(ctx)
suite.Require().NoError(err)

_, err = dmlConn.ExecContext(ctx, fmt.Sprintf("insert into %s (id, name) values(2,'b')", getTestTableName()))
fmt.Println("insert into table original table")
suite.Require().NoError(err)

migrateErr := <-done
suite.Require().NoError(migrateErr)

// Verify the new column was added
var delValue, OriginalValue int64
err = suite.db.QueryRow(
fmt.Sprintf("select count(*) from %s._%s_del", testMysqlDatabase, testMysqlTableName),
).Scan(&delValue)
suite.Require().NoError(err)

err = suite.db.QueryRow("select count(*) from " + getTestTableName()).Scan(&OriginalValue)
suite.Require().NoError(err)

suite.Require().LessOrEqual(delValue, OriginalValue)

var tableName, createTableSQL string
//nolint:execinquery
err = suite.db.QueryRow("SHOW CREATE TABLE "+getTestTableName()).Scan(&tableName, &createTableSQL)
suite.Require().NoError(err)

suite.Require().Equal(testMysqlTableName, tableName)
suite.Require().Equal("CREATE TABLE `testing` (\n `id` int NOT NULL,\n `name` varchar(64) DEFAULT NULL,\n `foobar` varchar(255) DEFAULT NULL,\n PRIMARY KEY (`id`)\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci", createTableSQL)
}

func TestMigrator(t *testing.T) {
suite.Run(t, new(MigratorTestSuite))
}
Loading