Skip to content

Commit

Permalink
feat: data compare snapshot, close #9
Browse files Browse the repository at this point in the history
  • Loading branch information
wentaojin committed Jun 25, 2024
1 parent 1fb3845 commit 0002de6
Show file tree
Hide file tree
Showing 22 changed files with 812 additions and 663 deletions.
2 changes: 2 additions & 0 deletions component/cli/migrate/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type DataCompareParam struct {
EnableCheckpoint bool `toml:"enable-checkpoint" json:"enableCheckpoint"`
EnableConsistentRead bool `toml:"enable-consistent-read" json:"enableConsistentRead"`
OnlyCompareRow bool `toml:"only-compare-row" json:"onlyCompareRow"`
ConsistentReadPointS string `toml:"consistent-read-point-s" json:"consistentReadPointS"`
ConsistentReadPointT string `toml:"consistent-read-point-t" json:"consistentReadPointT"`
}

func (d *CompareConfig) String() string {
Expand Down
125 changes: 63 additions & 62 deletions database/oracle/taskflow/csv_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ func (cmt *CsvMigrateTask) InitCsvMigrateTask(databaseS database.IDatabase, dbVe
includeTables []string
excludeTables []string
databaseTables []string // task tables
globalScn uint64
globalScn string
)
databaseTableTypeMap := make(map[string]string)

Expand Down Expand Up @@ -608,10 +608,11 @@ func (cmt *CsvMigrateTask) InitCsvMigrateTask(databaseS database.IDatabase, dbVe
return err
}

globalScn, err = databaseS.GetDatabaseConsistentPos()
globalScnS, err := databaseS.GetDatabaseConsistentPos()
if err != nil {
return err
}
globalScn = strconv.FormatUint(globalScnS, 10)

// database tables
// init database table
Expand Down Expand Up @@ -680,7 +681,7 @@ func (cmt *CsvMigrateTask) InitCsvMigrateTask(databaseS database.IDatabase, dbVe
SchemaNameT: attsRule.SchemaNameT,
TableNameT: attsRule.TableNameT,
TableTypeS: attsRule.TableTypeS,
GlobalScnS: globalScn,
SnapshotPointS: globalScn,
ColumnDetailO: attsRule.ColumnDetailO,
ColumnDetailS: attsRule.ColumnDetailS,
ColumnDetailT: attsRule.ColumnDetailT,
Expand All @@ -695,15 +696,15 @@ func (cmt *CsvMigrateTask) InitCsvMigrateTask(databaseS database.IDatabase, dbVe
return err
}
_, err = model.GetIDataMigrateSummaryRW().CreateDataMigrateSummary(txnCtx, &task.DataMigrateSummary{
TaskName: cmt.Task.TaskName,
SchemaNameS: attsRule.SchemaNameS,
TableNameS: attsRule.TableNameS,
SchemaNameT: attsRule.SchemaNameT,
TableNameT: attsRule.TableNameT,
GlobalScnS: globalScn,
TableRowsS: tableRows,
TableSizeS: tableSize,
ChunkTotals: 1,
TaskName: cmt.Task.TaskName,
SchemaNameS: attsRule.SchemaNameS,
TableNameS: attsRule.TableNameS,
SchemaNameT: attsRule.SchemaNameT,
TableNameT: attsRule.TableNameT,
SnapshotPointS: globalScn,
TableRowsS: tableRows,
TableSizeS: tableSize,
ChunkTotals: 1,
})
if err != nil {
return err
Expand Down Expand Up @@ -755,7 +756,7 @@ func (cmt *CsvMigrateTask) InitCsvMigrateTask(databaseS database.IDatabase, dbVe
SchemaNameT: attsRule.SchemaNameT,
TableNameT: attsRule.TableNameT,
TableTypeS: attsRule.TableTypeS,
GlobalScnS: globalScn,
SnapshotPointS: globalScn,
ColumnDetailO: attsRule.ColumnDetailO,
ColumnDetailS: attsRule.ColumnDetailS,
ColumnDetailT: attsRule.ColumnDetailT,
Expand All @@ -770,15 +771,15 @@ func (cmt *CsvMigrateTask) InitCsvMigrateTask(databaseS database.IDatabase, dbVe
return err
}
_, err = model.GetIDataMigrateSummaryRW().CreateDataMigrateSummary(txnCtx, &task.DataMigrateSummary{
TaskName: cmt.Task.TaskName,
SchemaNameS: attsRule.SchemaNameS,
TableNameS: attsRule.TableNameS,
SchemaNameT: attsRule.SchemaNameT,
TableNameT: attsRule.TableNameT,
GlobalScnS: globalScn,
TableRowsS: tableRows,
TableSizeS: tableSize,
ChunkTotals: 1,
TaskName: cmt.Task.TaskName,
SchemaNameS: attsRule.SchemaNameS,
TableNameS: attsRule.TableNameS,
SchemaNameT: attsRule.SchemaNameT,
TableNameT: attsRule.TableNameT,
SnapshotPointS: globalScn,
TableRowsS: tableRows,
TableSizeS: tableSize,
ChunkTotals: 1,
})
if err != nil {
return err
Expand Down Expand Up @@ -827,7 +828,7 @@ func (cmt *CsvMigrateTask) InitCsvMigrateTask(databaseS database.IDatabase, dbVe
SchemaNameT: attsRule.SchemaNameT,
TableNameT: attsRule.TableNameT,
TableTypeS: attsRule.TableTypeS,
GlobalScnS: globalScn,
SnapshotPointS: globalScn,
ColumnDetailO: attsRule.ColumnDetailO,
ColumnDetailS: attsRule.ColumnDetailS,
ColumnDetailT: attsRule.ColumnDetailT,
Expand All @@ -842,15 +843,15 @@ func (cmt *CsvMigrateTask) InitCsvMigrateTask(databaseS database.IDatabase, dbVe
return err
}
_, err = model.GetIDataMigrateSummaryRW().CreateDataMigrateSummary(txnCtx, &task.DataMigrateSummary{
TaskName: cmt.Task.TaskName,
SchemaNameS: attsRule.SchemaNameS,
TableNameS: attsRule.TableNameS,
SchemaNameT: attsRule.SchemaNameT,
TableNameT: attsRule.TableNameT,
GlobalScnS: globalScn,
TableRowsS: tableRows,
TableSizeS: tableSize,
ChunkTotals: 1,
TaskName: cmt.Task.TaskName,
SchemaNameS: attsRule.SchemaNameS,
TableNameS: attsRule.TableNameS,
SchemaNameT: attsRule.SchemaNameT,
TableNameT: attsRule.TableNameT,
SnapshotPointS: globalScn,
TableRowsS: tableRows,
TableSizeS: tableSize,
ChunkTotals: 1,
})
if err != nil {
return err
Expand Down Expand Up @@ -885,7 +886,7 @@ func (cmt *CsvMigrateTask) InitCsvMigrateTask(databaseS database.IDatabase, dbVe
SchemaNameT: attsRule.SchemaNameT,
TableNameT: attsRule.TableNameT,
TableTypeS: attsRule.TableTypeS,
GlobalScnS: globalScn,
SnapshotPointS: globalScn,
ColumnDetailO: attsRule.ColumnDetailO,
ColumnDetailS: attsRule.ColumnDetailS,
ColumnDetailT: attsRule.ColumnDetailT,
Expand All @@ -904,15 +905,15 @@ func (cmt *CsvMigrateTask) InitCsvMigrateTask(databaseS database.IDatabase, dbVe
return err
}
_, err = model.GetIDataMigrateSummaryRW().CreateDataMigrateSummary(txnCtx, &task.DataMigrateSummary{
TaskName: cmt.Task.TaskName,
SchemaNameS: attsRule.SchemaNameS,
TableNameS: attsRule.TableNameS,
SchemaNameT: attsRule.SchemaNameT,
TableNameT: attsRule.TableNameT,
GlobalScnS: globalScn,
TableRowsS: tableRows,
TableSizeS: tableSize,
ChunkTotals: uint64(len(bucketRanges)),
TaskName: cmt.Task.TaskName,
SchemaNameS: attsRule.SchemaNameS,
TableNameS: attsRule.TableNameS,
SchemaNameT: attsRule.SchemaNameT,
TableNameT: attsRule.TableNameT,
SnapshotPointS: globalScn,
TableRowsS: tableRows,
TableSizeS: tableSize,
ChunkTotals: uint64(len(bucketRanges)),
})
if err != nil {
return err
Expand Down Expand Up @@ -955,7 +956,7 @@ func (cmt *CsvMigrateTask) InitCsvMigrateTask(databaseS database.IDatabase, dbVe
SchemaNameT: attsRule.SchemaNameT,
TableNameT: attsRule.TableNameT,
TableTypeS: attsRule.TableTypeS,
GlobalScnS: globalScn,
SnapshotPointS: globalScn,
ColumnDetailO: attsRule.ColumnDetailO,
ColumnDetailS: attsRule.ColumnDetailS,
ColumnDetailT: attsRule.ColumnDetailT,
Expand All @@ -970,15 +971,15 @@ func (cmt *CsvMigrateTask) InitCsvMigrateTask(databaseS database.IDatabase, dbVe
return err
}
_, err = model.GetIDataMigrateSummaryRW().CreateDataMigrateSummary(txnCtx, &task.DataMigrateSummary{
TaskName: cmt.Task.TaskName,
SchemaNameS: attsRule.SchemaNameS,
TableNameS: attsRule.TableNameS,
SchemaNameT: attsRule.SchemaNameT,
TableNameT: attsRule.TableNameT,
GlobalScnS: globalScn,
TableRowsS: tableRows,
TableSizeS: tableSize,
ChunkTotals: 1,
TaskName: cmt.Task.TaskName,
SchemaNameS: attsRule.SchemaNameS,
TableNameS: attsRule.TableNameS,
SchemaNameT: attsRule.SchemaNameT,
TableNameT: attsRule.TableNameT,
SnapshotPointS: globalScn,
TableRowsS: tableRows,
TableSizeS: tableSize,
ChunkTotals: 1,
})
if err != nil {
return err
Expand Down Expand Up @@ -1017,7 +1018,7 @@ func (cmt *CsvMigrateTask) InitCsvMigrateTask(databaseS database.IDatabase, dbVe
SchemaNameT: attsRule.SchemaNameT,
TableNameT: attsRule.TableNameT,
TableTypeS: attsRule.TableTypeS,
GlobalScnS: globalScn,
SnapshotPointS: globalScn,
ColumnDetailO: attsRule.ColumnDetailO,
ColumnDetailS: attsRule.ColumnDetailS,
ColumnDetailT: attsRule.ColumnDetailT,
Expand All @@ -1035,15 +1036,15 @@ func (cmt *CsvMigrateTask) InitCsvMigrateTask(databaseS database.IDatabase, dbVe
return err
}
_, err = model.GetIDataMigrateSummaryRW().CreateDataMigrateSummary(txnCtx, &task.DataMigrateSummary{
TaskName: cmt.Task.TaskName,
SchemaNameS: attsRule.SchemaNameS,
TableNameS: attsRule.TableNameS,
SchemaNameT: attsRule.SchemaNameT,
TableNameT: attsRule.TableNameT,
GlobalScnS: globalScn,
TableRowsS: tableRows,
TableSizeS: tableSize,
ChunkTotals: uint64(len(chunks)),
TaskName: cmt.Task.TaskName,
SchemaNameS: attsRule.SchemaNameS,
TableNameS: attsRule.TableNameS,
SchemaNameT: attsRule.SchemaNameT,
TableNameT: attsRule.TableNameT,
SnapshotPointS: globalScn,
TableRowsS: tableRows,
TableSizeS: tableSize,
ChunkTotals: uint64(len(chunks)),
})
if err != nil {
return err
Expand Down
9 changes: 4 additions & 5 deletions database/oracle/taskflow/csv_migrate_row.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"fmt"
"os"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -78,11 +77,11 @@ func (r *CsvMigrateRow) MigrateRead() error {

switch {
case strings.EqualFold(r.Dmt.ConsistentReadS, "YES") && strings.EqualFold(r.Dmt.SqlHintS, ""):
originQuerySQL = stringutil.StringBuilder(`SELECT `, r.Dmt.ColumnDetailS, ` FROM "`, r.Dmt.SchemaNameS, `"."`, r.Dmt.TableNameS, `" AS OF SCN `, strconv.FormatUint(r.Dmt.GlobalScnS, 10), ` WHERE `, chunkDetailS)
execQuerySQL = stringutil.StringBuilder(`SELECT `, columnDetailS, ` FROM "`, r.Dmt.SchemaNameS, `"."`, r.Dmt.TableNameS, `" AS OF SCN `, strconv.FormatUint(r.Dmt.GlobalScnS, 10), ` WHERE `, chunkDetailS)
originQuerySQL = stringutil.StringBuilder(`SELECT `, r.Dmt.ColumnDetailS, ` FROM "`, r.Dmt.SchemaNameS, `"."`, r.Dmt.TableNameS, `" AS OF SCN `, r.Dmt.SnapshotPointS, ` WHERE `, chunkDetailS)
execQuerySQL = stringutil.StringBuilder(`SELECT `, columnDetailS, ` FROM "`, r.Dmt.SchemaNameS, `"."`, r.Dmt.TableNameS, `" AS OF SCN `, r.Dmt.SnapshotPointS, ` WHERE `, chunkDetailS)
case strings.EqualFold(r.Dmt.ConsistentReadS, "YES") && !strings.EqualFold(r.Dmt.SqlHintS, ""):
originQuerySQL = stringutil.StringBuilder(`SELECT `, r.Dmt.SqlHintS, ` `, r.Dmt.ColumnDetailS, ` FROM "`, r.Dmt.SchemaNameS, `"."`, r.Dmt.TableNameS, `" AS OF SCN `, strconv.FormatUint(r.Dmt.GlobalScnS, 10), ` WHERE `, chunkDetailS)
execQuerySQL = stringutil.StringBuilder(`SELECT `, r.Dmt.SqlHintS, ` `, columnDetailS, ` FROM "`, r.Dmt.SchemaNameS, `"."`, r.Dmt.TableNameS, `" AS OF SCN `, strconv.FormatUint(r.Dmt.GlobalScnS, 10), ` WHERE `, chunkDetailS)
originQuerySQL = stringutil.StringBuilder(`SELECT `, r.Dmt.SqlHintS, ` `, r.Dmt.ColumnDetailS, ` FROM "`, r.Dmt.SchemaNameS, `"."`, r.Dmt.TableNameS, `" AS OF SCN `, r.Dmt.SnapshotPointS, ` WHERE `, chunkDetailS)
execQuerySQL = stringutil.StringBuilder(`SELECT `, r.Dmt.SqlHintS, ` `, columnDetailS, ` FROM "`, r.Dmt.SchemaNameS, `"."`, r.Dmt.TableNameS, `" AS OF SCN `, r.Dmt.SnapshotPointS, ` WHERE `, chunkDetailS)
case strings.EqualFold(r.Dmt.ConsistentReadS, "NO") && !strings.EqualFold(r.Dmt.SqlHintS, ""):
originQuerySQL = stringutil.StringBuilder(`SELECT `, r.Dmt.SqlHintS, ` `, r.Dmt.ColumnDetailS, ` FROM "`, r.Dmt.SchemaNameS, `"."`, r.Dmt.TableNameS, `" WHERE `, chunkDetailS)
execQuerySQL = stringutil.StringBuilder(`SELECT `, r.Dmt.SqlHintS, ` `, columnDetailS, ` FROM "`, r.Dmt.SchemaNameS, `"."`, r.Dmt.TableNameS, `" WHERE `, chunkDetailS)
Expand Down
Loading

0 comments on commit 0002de6

Please sign in to comment.