Skip to content

Commit

Permalink
Bump tidb version to latest to support specific dumpling `csv-output-…
Browse files Browse the repository at this point in the history
…dialect` (#81)

Signed-off-by: Lloyd-Pottiger <yan1579196623@gmail.com>
  • Loading branch information
Lloyd-Pottiger authored Dec 15, 2023
1 parent 5aacc92 commit 6b6140f
Show file tree
Hide file tree
Showing 16 changed files with 641 additions and 548 deletions.
2 changes: 1 addition & 1 deletion cmd/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func NewBigQueryCmd() *cobra.Command {
return Replicate(
&tidbConfigFromCli, tables, storageURI, snapshotURI, incrementURI, snapshotConcurrency,
cdcHost, cdcPort, cdcFlushInterval, cdcFileSize,
snapConnectorMap, increConnectorMap, mode,
snapConnectorMap, increConnectorMap, "bigquery", mode,
)
}

Expand Down
15 changes: 13 additions & 2 deletions cmd/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/dumpling/export"
putil "github.com/pingcap/tiflow/pkg/util"
"github.com/thediveo/enumflag"
"go.uber.org/zap"
Expand Down Expand Up @@ -54,6 +55,14 @@ const (
StageSnapshotLoaded Stage = "snapshot-loaded"
)

var CsvOutputDialectMap = map[string]export.CSVDialect{
"": export.CSVDialectDefault,
"default": export.CSVDialectDefault,
"bigquery": export.CSVDialectBigQuery,
"snowflake": export.CSVDialectSnowflake,
"redshift": export.CSVDialectRedshift,
}

func checkStage(storage storage.ExternalStorage) (Stage, error) {
stage := StageInit
ctx := context.Background()
Expand Down Expand Up @@ -162,6 +171,7 @@ func Export(
cdcPort int,
cdcFlushInterval time.Duration,
cdcFileSize int,
csvOutputDialect string,
mode RunMode,
) (
stage Stage,
Expand Down Expand Up @@ -203,7 +213,7 @@ func Export(
fallthrough
case StageChangefeedCreated:
if mode != RunModeIncrementalOnly && mode != RunModeCloud {
if err := dumpling.RunDump(tidbConfig, snapshotConcurrency, snapshotURI, fmt.Sprint(startTSO), tables, onSnapshotDumpProgress); err != nil {
if err := dumpling.RunDump(tidbConfig, snapshotConcurrency, snapshotURI, fmt.Sprint(startTSO), tables, CsvOutputDialectMap[csvOutputDialect], onSnapshotDumpProgress); err != nil {
return "", errors.Trace(err)
}
}
Expand All @@ -225,11 +235,12 @@ func Replicate(
cdcFileSize int,
snapConnectorMap map[string]coreinterfaces.Connector,
increConnectorMap map[string]coreinterfaces.Connector,
csvOutputDialect string,
mode RunMode,
) error {
metrics.TableNumGauge.Add(float64(len(tables)))
stage, err := Export(tidbConfig, tables, storageURI, snapshotURI, incrementURI,
snapshotConcurrency, cdcHost, cdcPort, cdcFlushInterval, cdcFileSize, mode)
snapshotConcurrency, cdcHost, cdcPort, cdcFlushInterval, cdcFileSize, csvOutputDialect, mode)
if err != nil {
return errors.Trace(err)
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/databricks.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ func NewDatabricksCmd() *cobra.Command {

return Replicate(&tidbConfigFromCli, tables, storageURI, snapshotURI, incrementURI,
snapshotConcurrency, cdcHost, cdcPort, cdcFlushInterval, cdcFileSize, snapConnectorMap,
increConnectorMap, mode)
increConnectorMap, "default", mode, // // FIXME: to be confirmed whether to use default dialect
)
}

cmd := &cobra.Command{
Expand Down
4 changes: 3 additions & 1 deletion cmd/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func NewGCSCmd() *cobra.Command {
cdcFileSize int
logFile string
logLevel string
csvOutputDialect string
mode RunMode
)

Expand All @@ -49,7 +50,7 @@ func NewGCSCmd() *cobra.Command {

_, err = Export(
&tidbConfigFromCli, tables, storageURI, snapshotURI, incrementURI, snapshotConcurrency,
cdcHost, cdcPort, cdcFlushInterval, cdcFileSize, mode)
cdcHost, cdcPort, cdcFlushInterval, cdcFileSize, csvOutputDialect, mode)

return err
}
Expand All @@ -75,6 +76,7 @@ func NewGCSCmd() *cobra.Command {
cmd.Flags().StringArrayVarP(&tables, "table", "t", []string{}, "tables full qualified name, e.g. -t <db1>.<table1> -t <db2>.<table2>")
cmd.Flags().IntVar(&snapshotConcurrency, "snapshot-concurrency", 8, "the number of concurrent snapshot workers")
cmd.Flags().StringVarP(&storagePath, "storage", "s", "", "storage path: gs://<bucket>/<path>")
cmd.Flags().StringVar(&csvOutputDialect, "csv-output-dialect", "", "csv output dialect: default, redshift, snowflake, bigquery")
cmd.Flags().StringVar(&cdcHost, "cdc.host", "127.0.0.1", "TiCDC server host")
cmd.Flags().IntVar(&cdcPort, "cdc.port", 8300, "TiCDC server port")
cmd.Flags().DurationVar(&cdcFlushInterval, "cdc.flush-interval", 60*time.Second, "")
Expand Down
3 changes: 2 additions & 1 deletion cmd/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ func NewRedshiftCmd() *cobra.Command {

return Replicate(&tidbConfigFromCli, tables, storageURI, snapshotURI, incrementURI,
snapshotConcurrency, cdcHost, cdcPort, cdcFlushInterval, cdcFileSize,
snapConnectorMap, increConnectorMap, mode)
snapConnectorMap, increConnectorMap, "redshift", mode,
)
}

cmd := &cobra.Command{
Expand Down
4 changes: 3 additions & 1 deletion cmd/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func NewS3Cmd() *cobra.Command {
logLevel string
awsAccessKey string
awsSecretKey string
csvOutputDialect string
credValue *credentials.Value
mode RunMode
)
Expand Down Expand Up @@ -65,7 +66,7 @@ func NewS3Cmd() *cobra.Command {

_, err = Export(&tidbConfigFromCli, tables, storageURI, snapshotURI,
incrementURI, snapshotConcurrency, cdcHost, cdcPort,
cdcFlushInterval, cdcFileSize, mode)
cdcFlushInterval, cdcFileSize, csvOutputDialect, mode)
return err
}

Expand All @@ -89,6 +90,7 @@ func NewS3Cmd() *cobra.Command {
cmd.Flags().StringArrayVarP(&tables, "table", "t", []string{}, "tables full qualified name, e.g. -t <db1>.<table1> -t <db2>.<table2>")
cmd.Flags().IntVar(&snapshotConcurrency, "snapshot-concurrency", 8, "the number of concurrent snapshot workers")
cmd.Flags().StringVarP(&storagePath, "storage", "s", "", "storage path: s3://<bucket>/<path> or gcs://<bucket>/<path>")
cmd.Flags().StringVar(&csvOutputDialect, "csv-output-dialect", "", "csv output dialect: default, redshift, snowflake, bigquery")
cmd.Flags().StringVar(&cdcHost, "cdc.host", "127.0.0.1", "TiCDC server host")
cmd.Flags().IntVar(&cdcPort, "cdc.port", 8300, "TiCDC server port")
cmd.Flags().DurationVar(&cdcFlushInterval, "cdc.flush-interval", 60*time.Second, "")
Expand Down
3 changes: 2 additions & 1 deletion cmd/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ func NewSnowflakeCmd() *cobra.Command {

return Replicate(&tidbConfigFromCli, tables, storageURI, snapshotURI, incrementURI,
snapshotConcurrency, cdcHost, cdcPort, cdcFlushInterval, cdcFileSize,
snapConnectorMap, increConnectorMap, mode)
snapConnectorMap, increConnectorMap, "snowflake", mode,
)
}

cmd := &cobra.Command{
Expand Down
Loading

0 comments on commit 6b6140f

Please sign in to comment.