Skip to content

Commit ae2d7b8

Browse files
committed
support OceanBase Binlog Service
1 parent 59db6fa commit ae2d7b8

File tree

7 files changed

+149
-32
lines changed

7 files changed

+149
-32
lines changed

go/base/context.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ type MigrationContext struct {
102102
GoogleCloudPlatform bool
103103
AzureMySQL bool
104104
AttemptInstantDDL bool
105+
OceanBaseBinlogService bool
105106

106107
config ContextConfig
107108
configMutex *sync.Mutex

go/base/utils.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,13 @@
66
package base
77

88
import (
9+
gosql "database/sql"
910
"fmt"
1011
"os"
1112
"regexp"
1213
"strings"
1314
"time"
1415

15-
gosql "database/sql"
16-
1716
"github.com/github/gh-ost/go/mysql"
1817
)
1918

@@ -75,7 +74,7 @@ func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig,
7574
// AliyunRDS set users port to "NULL", replace it by gh-ost param
7675
// GCP set users port to "NULL", replace it by gh-ost param
7776
// Azure MySQL set users port to a different value by design, replace it by gh-ost para
78-
if migrationContext.AliyunRDS || migrationContext.GoogleCloudPlatform || migrationContext.AzureMySQL {
77+
if migrationContext.AliyunRDS || migrationContext.GoogleCloudPlatform || migrationContext.AzureMySQL || migrationContext.OceanBaseBinlogService {
7978
port = connectionConfig.Key.Port
8079
} else {
8180
portQuery := `select @@global.port`

go/cmd/gh-ost/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ func main() {
8787
flag.BoolVar(&migrationContext.AliyunRDS, "aliyun-rds", false, "set to 'true' when you execute on Aliyun RDS.")
8888
flag.BoolVar(&migrationContext.GoogleCloudPlatform, "gcp", false, "set to 'true' when you execute on a 1st generation Google Cloud Platform (GCP).")
8989
flag.BoolVar(&migrationContext.AzureMySQL, "azure", false, "set to 'true' when you execute on Azure Database on MySQL.")
90+
flag.BoolVar(&migrationContext.OceanBaseBinlogService, "oceanbase", false, "set to 'true' when you execute on OceanBase Binlog Service")
9091

9192
executeFlag := flag.Bool("execute", false, "actually execute the alter & migrate the table. Default is noop: do some tests and exit")
9293
flag.BoolVar(&migrationContext.TestOnReplica, "test-on-replica", false, "Have the migration run on a replica, not on the master. At the end of migration replication is stopped, and tables are swapped and immediately swap-revert. Replication remains stopped and you can compare the two tables for building trust")

go/logic/applier.go

Lines changed: 56 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func (this *Applier) InitDBConnections() (err error) {
9292
if err := this.validateAndReadTimeZone(); err != nil {
9393
return err
9494
}
95-
if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL {
95+
if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL && !this.migrationContext.OceanBaseBinlogService {
9696
if impliedKey, err := mysql.GetInstanceKey(this.db); err != nil {
9797
return err
9898
} else {
@@ -670,24 +670,35 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
670670
return chunkSize, rowsAffected, duration, nil
671671
}
672672

673-
// LockOriginalTable places a write lock on the original table
674-
func (this *Applier) LockOriginalTable() error {
675-
query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`,
676-
sql.EscapeName(this.migrationContext.DatabaseName),
677-
sql.EscapeName(this.migrationContext.OriginalTableName),
678-
)
679-
this.migrationContext.Log.Infof("Locking %s.%s",
680-
sql.EscapeName(this.migrationContext.DatabaseName),
681-
sql.EscapeName(this.migrationContext.OriginalTableName),
682-
)
673+
// lockTables places a write lock on the specific tables
674+
func (this *Applier) lockTables(tableNames ...string) error {
675+
databaseName := this.migrationContext.DatabaseName
676+
query := `lock /* gh-ost */ tables `
677+
for i, tableName := range tableNames {
678+
if i != 0 {
679+
query = query + `, `
680+
}
681+
query = query + fmt.Sprintf(`%s.%s write`, databaseName, tableName)
682+
}
683+
this.migrationContext.Log.Infof("Locking tables %v in database %s", tableNames, databaseName)
683684
this.migrationContext.LockTablesStartTime = time.Now()
684685
if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
685686
return err
686687
}
687-
this.migrationContext.Log.Infof("Table locked")
688+
this.migrationContext.Log.Infof("Tables %v in database %s locked", tableNames, databaseName)
688689
return nil
689690
}
690691

692+
// LockOriginalTable places a write lock on the original table
693+
func (this *Applier) LockOriginalTable() error {
694+
return this.lockTables(this.migrationContext.OriginalTableName)
695+
}
696+
697+
// LockOriginAndGhostTable places a write lock on the original table and the ghost table
698+
func (this *Applier) LockOriginAndGhostTable() error {
699+
return this.lockTables(this.migrationContext.OriginalTableName, this.migrationContext.GetGhostTableName())
700+
}
701+
691702
// UnlockTables makes tea. No wait, it unlocks tables.
692703
func (this *Applier) UnlockTables() error {
693704
query := `unlock /* gh-ost */ tables`
@@ -968,7 +979,7 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
968979

969980
tableLockTimeoutSeconds := this.migrationContext.CutOverLockTimeoutSeconds * 2
970981
this.migrationContext.Log.Infof("Setting LOCK timeout as %d seconds", tableLockTimeoutSeconds)
971-
query = fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout:=%d`, tableLockTimeoutSeconds)
982+
query = fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout=%d`, tableLockTimeoutSeconds)
972983
if _, err := tx.Exec(query); err != nil {
973984
tableLocked <- err
974985
return err
@@ -1037,25 +1048,31 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
10371048
return nil
10381049
}
10391050

1040-
// AtomicCutoverRename
1041-
func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed chan<- error) error {
1042-
tx, err := this.db.Begin()
1051+
func (this *Applier) atomicCutoverRename(db *gosql.DB, sessionIdChan chan int64, tablesRenamed chan<- error) error {
1052+
tx, err := db.Begin()
10431053
if err != nil {
10441054
return err
10451055
}
10461056
defer func() {
10471057
tx.Rollback()
1048-
sessionIdChan <- -1
1049-
tablesRenamed <- fmt.Errorf("Unexpected error in AtomicCutoverRename(), injected to release blocking channel reads")
1058+
if sessionIdChan != nil {
1059+
sessionIdChan <- -1
1060+
}
1061+
if tablesRenamed != nil {
1062+
tablesRenamed <- fmt.Errorf("Unexpected error in AtomicCutoverRename(), injected to release blocking channel reads")
1063+
}
10501064
}()
1051-
var sessionId int64
1052-
if err := tx.QueryRow(`select /* gh-ost */ connection_id()`).Scan(&sessionId); err != nil {
1053-
return err
1065+
1066+
if sessionIdChan != nil {
1067+
var sessionId int64
1068+
if err := tx.QueryRow(`select /* gh-ost */ connection_id()`).Scan(&sessionId); err != nil {
1069+
return err
1070+
}
1071+
sessionIdChan <- sessionId
10541072
}
1055-
sessionIdChan <- sessionId
10561073

10571074
this.migrationContext.Log.Infof("Setting RENAME timeout as %d seconds", this.migrationContext.CutOverLockTimeoutSeconds)
1058-
query := fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout:=%d`, this.migrationContext.CutOverLockTimeoutSeconds)
1075+
query := fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout=%d`, this.migrationContext.CutOverLockTimeoutSeconds)
10591076
if _, err := tx.Exec(query); err != nil {
10601077
return err
10611078
}
@@ -1072,14 +1089,28 @@ func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed
10721089
)
10731090
this.migrationContext.Log.Infof("Issuing and expecting this to block: %s", query)
10741091
if _, err := tx.Exec(query); err != nil {
1075-
tablesRenamed <- err
1092+
if tablesRenamed != nil {
1093+
tablesRenamed <- err
1094+
}
10761095
return this.migrationContext.Log.Errore(err)
10771096
}
1078-
tablesRenamed <- nil
1097+
if tablesRenamed != nil {
1098+
tablesRenamed <- nil
1099+
}
10791100
this.migrationContext.Log.Infof("Tables renamed")
10801101
return nil
10811102
}
10821103

1104+
// AtomicCutoverRename renames tables for atomic cut over in non lock session
1105+
func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed chan<- error) error {
1106+
return this.atomicCutoverRename(this.db, sessionIdChan, tablesRenamed)
1107+
}
1108+
1109+
// AtomicCutoverRenameWithLock renames tables for atomic cut over in the lock session
1110+
func (this *Applier) AtomicCutoverRenameWithLock() error {
1111+
return this.atomicCutoverRename(this.singletonDB, nil, nil)
1112+
}
1113+
10831114
func (this *Applier) ShowStatusVariable(variableName string) (result int64, err error) {
10841115
query := fmt.Sprintf(`show /* gh-ost */ global status like '%s'`, variableName)
10851116
if err := this.db.QueryRow(query).Scan(&variableName, &result); err != nil {

go/logic/inspect.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func (this *Inspector) InitDBConnections() (err error) {
5656
if err := this.validateConnection(); err != nil {
5757
return err
5858
}
59-
if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL {
59+
if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL && !this.migrationContext.OceanBaseBinlogService {
6060
if impliedKey, err := mysql.GetInstanceKey(this.db); err != nil {
6161
return err
6262
} else {

go/logic/migrator.go

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,11 @@ func (this *Migrator) canStopStreaming() bool {
200200

201201
// onChangelogEvent is called when a binlog event operation on the changelog table is intercepted.
202202
func (this *Migrator) onChangelogEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) {
203+
if dmlEvent.NewColumnValues == nil {
204+
// in some compatible systems, such as OceanBase Binlog Service, an UPSERT event is
205+
// converted to a DELETE event and an INSERT event, we need to skip the DELETE event.
206+
return nil
207+
}
203208
// Hey, I created the changelog table, I know the type of columns it has!
204209
switch hint := dmlEvent.NewColumnValues.StringColumn(2); hint {
205210
case "state":
@@ -551,9 +556,15 @@ func (this *Migrator) cutOver() (err error) {
551556

552557
switch this.migrationContext.CutOverType {
553558
case base.CutOverAtomic:
554-
// Atomic solution: we use low timeout and multiple attempts. But for
555-
// each failed attempt, we throttle until replication lag is back to normal
556-
err = this.atomicCutOver()
559+
if this.migrationContext.OceanBaseBinlogService || !mysql.IsSmallerMinorVersion(this.migrationContext.ApplierMySQLVersion, "8.0.13") {
560+
// Atomic solution for latest MySQL: cut over the tables in the same session where the origin
561+
// table and ghost table are both locked, it can only work on MySQL 8.0.13 or later versions
562+
err = this.atomicCutOverMySQL8()
563+
} else {
564+
// Atomic solution: we use low timeout and multiple attempts. But for
565+
// each failed attempt, we throttle until replication lag is back to normal
566+
err = this.atomicCutOver()
567+
}
557568
case base.CutOverTwoStep:
558569
err = this.cutOverTwoStep()
559570
default:
@@ -632,6 +643,35 @@ func (this *Migrator) cutOverTwoStep() (err error) {
632643
return nil
633644
}
634645

646+
// atomicCutOverMySQL8 will lock down the original table and the ghost table, execute
647+
// what's left of last DML entries, and atomically swap original->old, then new->original.
648+
// It requires to execute RENAME TABLE when the table is LOCKED under WRITE LOCK, which is
649+
// supported from MySQL 8.0.13, see https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-13.html.
650+
func (this *Migrator) atomicCutOverMySQL8() (err error) {
651+
atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1)
652+
defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0)
653+
atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0)
654+
655+
if err := this.retryOperation(this.applier.LockOriginAndGhostTable); err != nil {
656+
return err
657+
}
658+
659+
if err := this.retryOperation(this.waitForEventsUpToLock); err != nil {
660+
return err
661+
}
662+
if err := this.applier.AtomicCutoverRenameWithLock(); err != nil {
663+
return err
664+
}
665+
if err := this.retryOperation(this.applier.UnlockTables); err != nil {
666+
return err
667+
}
668+
669+
lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime)
670+
renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime)
671+
this.migrationContext.Log.Debugf("Lock & rename duration: %s (rename only: %s). During this time, queries on %s were locked or failing", lockAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName))
672+
return nil
673+
}
674+
635675
// atomicCutOver
636676
func (this *Migrator) atomicCutOver() (err error) {
637677
atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1)

go/mysql/utils.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package mysql
88
import (
99
gosql "database/sql"
1010
"fmt"
11+
"strconv"
1112
"strings"
1213
"sync"
1314
"time"
@@ -211,3 +212,47 @@ func Kill(db *gosql.DB, connectionID string) error {
211212
_, err := db.Exec(`KILL QUERY %s`, connectionID)
212213
return err
213214
}
215+
216+
func versionTokens(version string, digits int) []int {
217+
v := strings.Split(version, "-")[0]
218+
tokens := strings.Split(v, ".")
219+
intTokens := make([]int, digits)
220+
for i := range tokens {
221+
if i >= digits {
222+
break
223+
}
224+
intTokens[i], _ = strconv.Atoi(tokens[i])
225+
}
226+
return intTokens
227+
}
228+
229+
func isSmallerVersion(version string, otherVersion string, digits int) bool {
230+
v := versionTokens(version, digits)
231+
o := versionTokens(otherVersion, digits)
232+
for i := 0; i < len(v); i++ {
233+
if v[i] < o[i] {
234+
return true
235+
}
236+
if v[i] > o[i] {
237+
return false
238+
}
239+
if i == digits {
240+
break
241+
}
242+
}
243+
return false
244+
}
245+
246+
// IsSmallerMajorVersion tests two versions against another and returns true if
247+
// the former is a smaller "major" version than the latter.
248+
// e.g. 5.5.36 is NOT a smaller major version as compared to 5.5.40, but IS as compared to 5.6.9
249+
func IsSmallerMajorVersion(version string, otherVersion string) bool {
250+
return isSmallerVersion(version, otherVersion, 2)
251+
}
252+
253+
// IsSmallerMinorVersion tests two versions against another and returns true if
254+
// the former is a smaller "minor" version than the latter.
255+
// e.g. 5.5.36 is a smaller major version as compared to 5.5.40, as well as compared to 5.6.7
256+
func IsSmallerMinorVersion(version string, otherVersion string) bool {
257+
return isSmallerVersion(version, otherVersion, 3)
258+
}

0 commit comments

Comments
 (0)