Skip to content

feat concurrent chunk data #1398

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions doc/command-line-flags.md
Original file line number Diff line number Diff line change
@@ -64,6 +64,9 @@ It is not reliable to parse the `ALTER` statement to determine if it is instant
### binlogsyncer-max-reconnect-attempts
`--binlogsyncer-max-reconnect-attempts=0`, the maximum number of attempts to re-establish a broken inspector connection for sync binlog. `0` or `negative number` means infinite retry, default `0`

### chunk-concurrent-size
`--chunk-concurrent-size=1`, The number of goroutines to execute chunks concurrently in each copy time slot, default `1`, allowed range `0`-`100`.

### conf

`--conf=/path/to/my.cnf`: file where credentials are specified. Should be in (or contain) the following format:
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@ require (
golang.org/x/net v0.17.0
golang.org/x/term v0.13.0
golang.org/x/text v0.13.0
golang.org/x/sync v0.1.0
)

require (
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -94,6 +94,8 @@ golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
99 changes: 60 additions & 39 deletions go/base/context.go
Original file line number Diff line number Diff line change
@@ -72,6 +72,14 @@ func NewThrottleCheckResult(throttle bool, reason string, reasonHint ThrottleRea
}
}

type IterationRangeValues struct {
Min *sql.ColumnValues
Max *sql.ColumnValues
Size int64
IsIncludeMinValues bool
HasFurtherRange bool
}

// MigrationContext has the general, global state of migration. It is used by
// all components throughout the migration process.
type MigrationContext struct {
@@ -119,6 +127,7 @@ type MigrationContext struct {
HeartbeatIntervalMilliseconds int64
defaultNumRetries int64
ChunkSize int64
ChunkConcurrentSize int64
niceRatio float64
MaxLagMillisecondsThrottleThreshold int64
throttleControlReplicaKeys *mysql.InstanceKeyMap
@@ -210,25 +219,26 @@ type MigrationContext struct {
InCutOverCriticalSectionFlag int64
PanicAbort chan error

OriginalTableColumnsOnApplier *sql.ColumnList
OriginalTableColumns *sql.ColumnList
OriginalTableVirtualColumns *sql.ColumnList
OriginalTableUniqueKeys [](*sql.UniqueKey)
OriginalTableAutoIncrement uint64
GhostTableColumns *sql.ColumnList
GhostTableVirtualColumns *sql.ColumnList
GhostTableUniqueKeys [](*sql.UniqueKey)
UniqueKey *sql.UniqueKey
SharedColumns *sql.ColumnList
ColumnRenameMap map[string]string
DroppedColumnsMap map[string]bool
MappedSharedColumns *sql.ColumnList
MigrationRangeMinValues *sql.ColumnValues
MigrationRangeMaxValues *sql.ColumnValues
Iteration int64
MigrationIterationRangeMinValues *sql.ColumnValues
MigrationIterationRangeMaxValues *sql.ColumnValues
ForceTmpTableName string
OriginalTableColumnsOnApplier *sql.ColumnList
OriginalTableColumns *sql.ColumnList
OriginalTableVirtualColumns *sql.ColumnList
OriginalTableUniqueKeys [](*sql.UniqueKey)
OriginalTableAutoIncrement uint64
GhostTableColumns *sql.ColumnList
GhostTableVirtualColumns *sql.ColumnList
GhostTableUniqueKeys [](*sql.UniqueKey)
UniqueKey *sql.UniqueKey
SharedColumns *sql.ColumnList
ColumnRenameMap map[string]string
DroppedColumnsMap map[string]bool
MappedSharedColumns *sql.ColumnList
MigrationRangeMinValues *sql.ColumnValues
MigrationRangeMaxValues *sql.ColumnValues
Iteration int64
MigrationIterationRangeMinValues *sql.ColumnValues
MigrationIterationRangeMaxValues *sql.ColumnValues
CalculateNextIterationRangeEndValuesLock *sync.Mutex
ForceTmpTableName string

recentBinlogCoordinates mysql.BinlogCoordinates

@@ -269,26 +279,27 @@ type ContextConfig struct {

func NewMigrationContext() *MigrationContext {
return &MigrationContext{
Uuid: uuid.NewString(),
defaultNumRetries: 60,
ChunkSize: 1000,
InspectorConnectionConfig: mysql.NewConnectionConfig(),
ApplierConnectionConfig: mysql.NewConnectionConfig(),
MaxLagMillisecondsThrottleThreshold: 1500,
CutOverLockTimeoutSeconds: 3,
DMLBatchSize: 10,
etaNanoseonds: ETAUnknown,
maxLoad: NewLoadMap(),
criticalLoad: NewLoadMap(),
throttleMutex: &sync.Mutex{},
throttleHTTPMutex: &sync.Mutex{},
throttleControlReplicaKeys: mysql.NewInstanceKeyMap(),
configMutex: &sync.Mutex{},
pointOfInterestTimeMutex: &sync.Mutex{},
lastHeartbeatOnChangelogMutex: &sync.Mutex{},
ColumnRenameMap: make(map[string]string),
PanicAbort: make(chan error),
Log: NewDefaultLogger(),
Uuid: uuid.NewString(),
defaultNumRetries: 60,
ChunkSize: 1000,
InspectorConnectionConfig: mysql.NewConnectionConfig(),
ApplierConnectionConfig: mysql.NewConnectionConfig(),
MaxLagMillisecondsThrottleThreshold: 1500,
CutOverLockTimeoutSeconds: 3,
DMLBatchSize: 10,
etaNanoseonds: ETAUnknown,
maxLoad: NewLoadMap(),
criticalLoad: NewLoadMap(),
throttleMutex: &sync.Mutex{},
throttleHTTPMutex: &sync.Mutex{},
throttleControlReplicaKeys: mysql.NewInstanceKeyMap(),
configMutex: &sync.Mutex{},
pointOfInterestTimeMutex: &sync.Mutex{},
lastHeartbeatOnChangelogMutex: &sync.Mutex{},
CalculateNextIterationRangeEndValuesLock: &sync.Mutex{},
ColumnRenameMap: make(map[string]string),
PanicAbort: make(chan error),
Log: NewDefaultLogger(),
}
}

@@ -616,6 +627,16 @@ func (this *MigrationContext) SetChunkSize(chunkSize int64) {
atomic.StoreInt64(&this.ChunkSize, chunkSize)
}

func (this *MigrationContext) SetChunkConcurrentSize(chunkConcurrentSize int64) {
if chunkConcurrentSize < 1 {
chunkConcurrentSize = 1
}
if chunkConcurrentSize > 100 {
chunkConcurrentSize = 100
}
atomic.StoreInt64(&this.ChunkConcurrentSize, chunkConcurrentSize)
}

func (this *MigrationContext) SetDMLBatchSize(batchSize int64) {
if batchSize < 1 {
batchSize = 1
2 changes: 2 additions & 0 deletions go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
@@ -104,6 +104,7 @@ func main() {
flag.BoolVar(&migrationContext.CutOverExponentialBackoff, "cut-over-exponential-backoff", false, "Wait exponentially longer intervals between failed cut-over attempts. Wait intervals obey a maximum configurable with 'exponential-backoff-max-interval').")
exponentialBackoffMaxInterval := flag.Int64("exponential-backoff-max-interval", 64, "Maximum number of seconds to wait between attempts when performing various operations with exponential backoff.")
chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 10-100,000)")
chunkConcurrentSize := flag.Int64("chunk-concurrent-size", 1, "The number of goroutines to execute chunks concurrently in each copy time slot, default 1 (allowed range: 1-100)")
dmlBatchSize := flag.Int64("dml-batch-size", 10, "batch size for DML events to apply in a single transaction (range 1-100)")
defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking")
cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout)")
@@ -294,6 +295,7 @@ func main() {
migrationContext.SetHeartbeatIntervalMilliseconds(*heartbeatIntervalMillis)
migrationContext.SetNiceRatio(*niceRatio)
migrationContext.SetChunkSize(*chunkSize)
migrationContext.SetChunkConcurrentSize(*chunkConcurrentSize)
migrationContext.SetDMLBatchSize(*dmlBatchSize)
migrationContext.SetMaxLagMillisecondsThrottleThreshold(*maxLagMillis)
migrationContext.SetThrottleQuery(*throttleQuery)
57 changes: 35 additions & 22 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
@@ -76,6 +76,10 @@ func (this *Applier) InitDBConnections() (err error) {
if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, applierUri); err != nil {
return err
}
concurrentSize := atomic.LoadInt64(&this.migrationContext.ChunkConcurrentSize)
if concurrentSize > mysql.MaxDBPoolConnections {
this.db.SetMaxOpenConns(int(concurrentSize))
}
singletonApplierUri := fmt.Sprintf("%s&timeout=0", applierUri)
if this.singletonDB, _, err = mysql.GetDB(this.migrationContext.Uuid, singletonApplierUri); err != nil {
return err
@@ -561,11 +565,18 @@ func (this *Applier) ReadMigrationRangeValues() error {
// which will be used for copying the next chunk of rows. Ir returns "false" if there is
// no further chunk to work through, i.e. we're past the last chunk and are done with
// iterating the range (and this done with copying row chunks)
func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, err error) {
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMaxValues
if this.migrationContext.MigrationIterationRangeMinValues == nil {
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationRangeMinValues
func (this *Applier) CalculateNextIterationRangeEndValues() (values *base.IterationRangeValues, err error) {
this.migrationContext.CalculateNextIterationRangeEndValuesLock.Lock()
defer this.migrationContext.CalculateNextIterationRangeEndValuesLock.Unlock()

iterationRangeValues := &base.IterationRangeValues{}

iterationRangeValues.Min = this.migrationContext.MigrationIterationRangeMaxValues
if iterationRangeValues.Min == nil {
iterationRangeValues.Min = this.migrationContext.MigrationRangeMinValues
iterationRangeValues.IsIncludeMinValues = true
}

for i := 0; i < 2; i++ {
buildFunc := sql.BuildUniqueKeyRangeEndPreparedQueryViaOffset
if i == 1 {
@@ -575,46 +586,48 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo
this.migrationContext.DatabaseName,
this.migrationContext.OriginalTableName,
&this.migrationContext.UniqueKey.Columns,
this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
iterationRangeValues.Min.AbstractValues(),
this.migrationContext.MigrationRangeMaxValues.AbstractValues(),
atomic.LoadInt64(&this.migrationContext.ChunkSize),
this.migrationContext.GetIteration() == 0,
iterationRangeValues.IsIncludeMinValues,
fmt.Sprintf("iteration:%d", this.migrationContext.GetIteration()),
)
if err != nil {
return hasFurtherRange, err
return iterationRangeValues, err
}

rows, err := this.db.Query(query, explodedArgs...)
if err != nil {
return hasFurtherRange, err
return iterationRangeValues, err
}
defer rows.Close()

iterationRangeMaxValues := sql.NewColumnValues(this.migrationContext.UniqueKey.Len())
for rows.Next() {
if err = rows.Scan(iterationRangeMaxValues.ValuesPointers...); err != nil {
return hasFurtherRange, err
return iterationRangeValues, err
}
hasFurtherRange = true
iterationRangeValues.HasFurtherRange = true
}
if err = rows.Err(); err != nil {
return hasFurtherRange, err
return iterationRangeValues, err
}
if hasFurtherRange {
this.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues
return hasFurtherRange, nil
if iterationRangeValues.HasFurtherRange {
iterationRangeValues.Max = iterationRangeMaxValues
this.migrationContext.MigrationIterationRangeMinValues = iterationRangeValues.Min
this.migrationContext.MigrationIterationRangeMaxValues = iterationRangeValues.Max
return iterationRangeValues, nil
}
}
this.migrationContext.Log.Debugf("Iteration complete: no further range to iterate")
return hasFurtherRange, nil
return iterationRangeValues, nil
}

// ApplyIterationInsertQuery issues a chunk-INSERT query on the ghost table. It is where
// data actually gets copied from original table.
func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected int64, duration time.Duration, err error) {
func (this *Applier) ApplyIterationInsertQuery(iterationRangeValues *base.IterationRangeValues) (chunkSize int64, rowsAffected int64, duration time.Duration, err error) {
startTime := time.Now()
chunkSize = atomic.LoadInt64(&this.migrationContext.ChunkSize)
chunkSize = iterationRangeValues.Size

query, explodedArgs, err := sql.BuildRangeInsertPreparedQuery(
this.migrationContext.DatabaseName,
@@ -624,9 +637,9 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
this.migrationContext.MappedSharedColumns.Names(),
this.migrationContext.UniqueKey.Name,
&this.migrationContext.UniqueKey.Columns,
this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
this.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(),
this.migrationContext.GetIteration() == 0,
iterationRangeValues.Min.AbstractValues(),
iterationRangeValues.Max.AbstractValues(),
iterationRangeValues.IsIncludeMinValues,
this.migrationContext.IsTransactionalTable(),
)
if err != nil {
@@ -663,8 +676,8 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
duration = time.Since(startTime)
this.migrationContext.Log.Debugf(
"Issued INSERT on range: [%s]..[%s]; iteration: %d; chunk-size: %d",
this.migrationContext.MigrationIterationRangeMinValues,
this.migrationContext.MigrationIterationRangeMaxValues,
iterationRangeValues.Min,
iterationRangeValues.Max,
this.migrationContext.GetIteration(),
chunkSize)
return chunkSize, rowsAffected, duration, nil
Loading
Oops, something went wrong.