Skip to content

Commit

Permalink
Bigquery: do not use parrallel load
Browse files Browse the repository at this point in the history
Signed-off-by: Lloyd-Pottiger <yan1579196623@gmail.com>
  • Loading branch information
Lloyd-Pottiger committed Mar 1, 2024
1 parent a761603 commit a17f4cc
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 54 deletions.
2 changes: 1 addition & 1 deletion cmd/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func NewBigQueryCmd() *cobra.Command {
return Replicate(
&tidbConfigFromCli, tables, storageURI, snapshotURI, incrementURI, snapshotConcurrency,
cdcHost, cdcPort, cdcFlushInterval, cdcFileSize,
snapConnectorMap, increConnectorMap, "bigquery", mode,
snapConnectorMap, increConnectorMap, "bigquery", false, mode,
)
}

Expand Down
3 changes: 2 additions & 1 deletion cmd/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ func Replicate(
snapConnectorMap map[string]coreinterfaces.Connector,
increConnectorMap map[string]coreinterfaces.Connector,
csvOutputDialect string,
parrallelLoad bool,
mode RunMode,
) error {
metrics.TableNumGauge.Add(float64(len(tables)))
Expand All @@ -268,7 +269,7 @@ func Replicate(
ctx := context.Background()
if mode != RunModeIncrementalOnly && stage != StageSnapshotLoaded {
apiservice.GlobalInstance.APIInfo.SetTableStage(table, apiservice.TableStageLoadingSnapshot)
if err = replicate.StartReplicateSnapshot(ctx, snapConnectorMap[table], table, tidbConfig, snapshotURI); err != nil {
if err = replicate.StartReplicateSnapshot(ctx, snapConnectorMap[table], table, tidbConfig, snapshotURI, parrallelLoad); err != nil {
apiservice.GlobalInstance.APIInfo.SetTableFatalError(table, err)
metrics.AddCounter(metrics.ErrorCounter, 1, table)
return
Expand Down
2 changes: 1 addition & 1 deletion cmd/databricks.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func NewDatabricksCmd() *cobra.Command {

return Replicate(&tidbConfigFromCli, tables, storageURI, snapshotURI, incrementURI,
snapshotConcurrency, cdcHost, cdcPort, cdcFlushInterval, cdcFileSize, snapConnectorMap,
increConnectorMap, "default", mode, // // FIXME: to be confirmed whether to use default dialect
increConnectorMap, "default", true, mode, // // FIXME: to be confirmed whether to use default dialect
)
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func NewRedshiftCmd() *cobra.Command {

return Replicate(&tidbConfigFromCli, tables, storageURI, snapshotURI, incrementURI,
snapshotConcurrency, cdcHost, cdcPort, cdcFlushInterval, cdcFileSize,
snapConnectorMap, increConnectorMap, "redshift", mode,
snapConnectorMap, increConnectorMap, "redshift", true, mode,
)
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func NewSnowflakeCmd() *cobra.Command {

return Replicate(&tidbConfigFromCli, tables, storageURI, snapshotURI, incrementURI,
snapshotConcurrency, cdcHost, cdcPort, cdcFlushInterval, cdcFileSize,
snapConnectorMap, increConnectorMap, "snowflake", mode,
snapConnectorMap, increConnectorMap, "snowflake", true, mode,
)
}

Expand Down
108 changes: 59 additions & 49 deletions replicate/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type SnapshotReplicateSession struct {

StorageWorkspaceUri url.URL
externalStorage storage.ExternalStorage
ParrallelLoad bool

ctx context.Context
logger *zap.Logger
Expand All @@ -46,6 +47,7 @@ func NewSnapshotReplicateSession(
tidbConfig *tidbsql.TiDBConfig,
sourceDatabase, sourceTable string,
storageUri *url.URL,
parrallelLoad bool,
logger *zap.Logger,
) (*SnapshotReplicateSession, error) {
sess := &SnapshotReplicateSession{
Expand All @@ -54,6 +56,7 @@ func NewSnapshotReplicateSession(
SourceDatabase: sourceDatabase,
SourceTable: sourceTable,
StorageWorkspaceUri: *storageUri,
ParrallelLoad: parrallelLoad,
ctx: ctx,
logger: logger,
}
Expand Down Expand Up @@ -94,58 +97,64 @@ func (sess *SnapshotReplicateSession) Run() error {
}

startTime := time.Now()
var snapshotFileSize int64
var fileCount int64
tableFQN := fmt.Sprintf("%s.%s", sess.SourceDatabase, sess.SourceTable)
opt := &storage.WalkOption{ObjPrefix: fmt.Sprintf("%s.", tableFQN)}
if err := sess.externalStorage.WalkDir(sess.ctx, opt, func(path string, size int64) error {
if strings.HasSuffix(path, CSVFileExtension) {
snapshotFileSize += size
fileCount++
if sess.ParrallelLoad {
var snapshotFileSize int64
var fileCount int64
tableFQN := fmt.Sprintf("%s.%s", sess.SourceDatabase, sess.SourceTable)
opt := &storage.WalkOption{ObjPrefix: fmt.Sprintf("%s.", tableFQN)}
if err := sess.externalStorage.WalkDir(sess.ctx, opt, func(path string, size int64) error {
if strings.HasSuffix(path, CSVFileExtension) {
snapshotFileSize += size
fileCount++
}
return nil
}); err != nil {
return errors.Trace(err)
}
return nil
}); err != nil {
return errors.Trace(err)
}
metrics.AddCounter(metrics.SnapshotTotalSizeCounter, float64(snapshotFileSize), tableFQN)
errFileCh := make(chan string, fileCount)
blockCh := make(chan struct{}, DataWarehouseLoadConcurrency)
var wg sync.WaitGroup
if err := sess.externalStorage.WalkDir(sess.ctx, opt, func(path string, size int64) error {
if strings.HasSuffix(path, CSVFileExtension) {
blockCh <- struct{}{}
wg.Add(1)
go func(path string, size int64) {
defer func() {
<-blockCh
wg.Done()
}()
sess.logger.Info("Loading snapshot data into data warehouse", zap.String("path", path))
if err := sess.DataWarehousePool.LoadSnapshot(sess.SourceTable, path); err != nil {
sess.logger.Error("Failed to load snapshot data into data warehouse", zap.Error(err), zap.String("path", path))
errFileCh <- path
} else {
sess.logger.Info("Successfully load snapshot data into data warehouse", zap.String("path", path))
metrics.AddCounter(metrics.SnapshotLoadedSizeCounter, float64(size), tableFQN)
}
}(path, size)
metrics.AddCounter(metrics.SnapshotTotalSizeCounter, float64(snapshotFileSize), tableFQN)
errFileCh := make(chan string, fileCount)
blockCh := make(chan struct{}, DataWarehouseLoadConcurrency)
var wg sync.WaitGroup
if err := sess.externalStorage.WalkDir(sess.ctx, opt, func(path string, size int64) error {
if strings.HasSuffix(path, CSVFileExtension) {
blockCh <- struct{}{}
wg.Add(1)
go func(path string, size int64) {
defer func() {
<-blockCh
wg.Done()
}()
sess.logger.Info("Loading snapshot data into data warehouse", zap.String("path", path))
if err := sess.DataWarehousePool.LoadSnapshot(sess.SourceTable, path); err != nil {
sess.logger.Error("Failed to load snapshot data into data warehouse", zap.Error(err), zap.String("path", path))
errFileCh <- path
} else {
sess.logger.Info("Successfully load snapshot data into data warehouse", zap.String("path", path))
metrics.AddCounter(metrics.SnapshotLoadedSizeCounter, float64(size), tableFQN)
}
}(path, size)
}
return nil
}); err != nil {
return errors.Trace(err)
}
wg.Wait()
close(errFileCh)
close(blockCh)
errFileList := make([]string, 0, len(errFileCh))
for len(errFileCh) > 0 {
errFileList = append(errFileList, <-errFileCh)
}
if len(errFileList) > 0 {
return errors.Errorf("Failed to load snapshot data into data warehouse, error files: %v", errFileList)
}
} else {
if err := sess.DataWarehousePool.LoadSnapshot(sess.SourceTable, fmt.Sprintf("%s.%s.*", sess.SourceDatabase, sess.SourceTable)); err != nil {
sess.logger.Error("Failed to load snapshot data into data warehouse", zap.Error(err))
}
return nil
}); err != nil {
return errors.Trace(err)
}
wg.Wait()
close(errFileCh)
close(blockCh)
errFileList := make([]string, 0, len(errFileCh))
for len(errFileCh) > 0 {
errFileList = append(errFileList, <-errFileCh)
}
if len(errFileList) > 0 {
return errors.Errorf("Failed to load snapshot data into data warehouse, error files: %v", errFileList)
}
endTime := time.Now()
sess.logger.Info("Successfully load all snapshot data into data warehouse", zap.Int64("size", snapshotFileSize), zap.Duration("cost", endTime.Sub(startTime)))
sess.logger.Info("Successfully load all snapshot data into data warehouse", zap.Duration("cost", endTime.Sub(startTime)))

// Write load info to workspace to record the status of load,
// loadinfo exists means the data has been all loaded into data warehouse.
Expand All @@ -163,10 +172,11 @@ func StartReplicateSnapshot(
tableFQN string,
tidbConfig *tidbsql.TiDBConfig,
storageUri *url.URL,
parrallelLoad bool,
) error {
logger := log.L().With(zap.String("table", tableFQN))
sourceDatabase, sourceTable := utils.SplitTableFQN(tableFQN)
session, err := NewSnapshotReplicateSession(ctx, dwConnector, tidbConfig, sourceDatabase, sourceTable, storageUri, logger)
session, err := NewSnapshotReplicateSession(ctx, dwConnector, tidbConfig, sourceDatabase, sourceTable, storageUri, parrallelLoad, logger)
if err != nil {
logger.Error("Failed to create snapshot replicate session", zap.Error(err))
return errors.Trace(err)
Expand Down

0 comments on commit a17f4cc

Please sign in to comment.