From 911ca6024d03d5a697c4040aef833ac9bddb5065 Mon Sep 17 00:00:00 2001 From: Yu Juncen Date: Wed, 8 May 2024 10:34:35 +0800 Subject: [PATCH 1/8] first version Signed-off-by: Yu Juncen --- br/pkg/lightning/importer/precheck_impl.go | 2 +- br/pkg/task/stream.go | 2 +- br/pkg/utils/cdc.go | 263 +++++++++++++++------ br/pkg/utils/cdc_test.go | 8 +- pkg/executor/importer/precheck.go | 2 +- 5 files changed, 193 insertions(+), 84 deletions(-) diff --git a/br/pkg/lightning/importer/precheck_impl.go b/br/pkg/lightning/importer/precheck_impl.go index bf9920b19f50a..8865eea4b7408 100644 --- a/br/pkg/lightning/importer/precheck_impl.go +++ b/br/pkg/lightning/importer/precheck_impl.go @@ -836,7 +836,7 @@ func (ci *CDCPITRCheckItem) Check(ctx context.Context) (*precheck.CheckResult, e errorMsg = append(errorMsg, fmt.Sprintf("found PiTR log streaming task(s): %v,", names)) } - nameSet, err := utils.GetCDCChangefeedNameSet(ctx, ci.etcdCli) + nameSet, err := utils.GetRunningChangefeeds(ctx, ci.etcdCli) if err != nil { return nil, errors.Trace(err) } diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 07f77f10294cc..e05bb1b6dc357 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1083,7 +1083,7 @@ func checkTaskExists(ctx context.Context, cfg *RestoreConfig, etcdCLI *clientv3. // check cdc changefeed if cfg.CheckRequirements { - nameSet, err := utils.GetCDCChangefeedNameSet(ctx, etcdCLI) + nameSet, err := utils.GetRunningChangefeeds(ctx, etcdCLI) if err != nil { return err } diff --git a/br/pkg/utils/cdc.go b/br/pkg/utils/cdc.go index 69e029135879c..d762eb762119c 100644 --- a/br/pkg/utils/cdc.go +++ b/br/pkg/utils/cdc.go @@ -18,11 +18,14 @@ import ( "context" "encoding/json" "fmt" + "math" + "path" "regexp" "strings" "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/tidb/pkg/util/mathutil" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) @@ -33,50 +36,57 @@ const ( CDCPrefixV61 = "/tidb/cdc/changefeed/info/" ) -// CDCNameSet saves CDC changefeed's information. -// nameSet maps `cluster/namespace` to `changefeed`s -type CDCNameSet struct { - nameSet map[string][]string -} +type keyVersion int -// that the nameSet is empty means no changefeed exists. -func (s *CDCNameSet) Empty() bool { - return len(s.nameSet) == 0 +const ( + legacy keyVersion = 1 + namespaced keyVersion = 2 +) + +var ( + // cluster id should be valid in + // https://github.com/pingcap/tiflow/blob/ca69c33948bea082aff9f4c0a357ace735b494ed/pkg/config/server_config.go#L218 + clusterNameRe = regexp.MustCompile("^[a-zA-Z0-9]+(-[a-zA-Z0-9]+)*$") +) + +type changefeed struct { + ID string + Cluster string + Namespace string + KeyVersion keyVersion } -// MessageToUser convert the map `nameSet` to a readable message to user. -func (s *CDCNameSet) MessageToUser() string { - var changefeedMsgBuf strings.Builder - changefeedMsgBuf.WriteString("found CDC changefeed(s): ") - for clusterID, captureIDs := range s.nameSet { - changefeedMsgBuf.WriteString("cluster/namespace: ") - changefeedMsgBuf.WriteString(clusterID) - changefeedMsgBuf.WriteString(" changefeed(s): ") - changefeedMsgBuf.WriteString(fmt.Sprintf("%v", captureIDs)) - changefeedMsgBuf.WriteString(", ") +func (c *changefeed) infoKey() string { + switch c.KeyVersion { + case legacy: + return path.Join(CDCPrefix, "changefeed", "info", c.ID) + case namespaced: + return path.Join(CDCPrefix, c.Cluster, c.Namespace, "changefeed", "info", c.ID) } - return changefeedMsgBuf.String() + log.Panic("Invalid changefeed version type.", zap.Any("this", c)) + panic("unreachable") } -// GetCDCChangefeedNameSet gets CDC changefeed information and wraps them to a map -// for CDC >= v6.2, the etcd key format is /tidb/cdc///changefeed/info/ -// for CDC <= v6.1, the etcd key format is /tidb/cdc/changefeed/info/ -func GetCDCChangefeedNameSet(ctx context.Context, cli *clientv3.Client) (*CDCNameSet, error) { - nameSet := make(map[string][]string, 1) - // check etcd KV of CDC >= v6.2 - resp, err := cli.Get(ctx, CDCPrefix, clientv3.WithPrefix()) - if err != nil { - return nil, errors.Trace(err) +func (c *changefeed) statusKey() string { + switch c.KeyVersion { + case legacy: + return path.Join(CDCPrefix, "changefeed", "status", c.ID) + case namespaced: + return path.Join(CDCPrefix, c.Cluster, c.Namespace, "changefeed", "status", c.ID) } + log.Panic("Invalid changefeed version type.", zap.Any("this", c)) + panic("unreachable") +} - // cluster id should be valid in - // https://github.com/pingcap/tiflow/blob/ca69c33948bea082aff9f4c0a357ace735b494ed/pkg/config/server_config.go#L218 - reg, err := regexp.Compile("^[a-zA-Z0-9]+(-[a-zA-Z0-9]+)*$") +type checkCDCClient struct { + cli *clientv3.Client +} + +func (c checkCDCClient) loadChangefeeds(ctx context.Context, out *[]changefeed) error { + resp, err := c.cli.Get(ctx, CDCPrefix, clientv3.WithPrefix(), clientv3.WithKeysOnly()) if err != nil { - log.L().Warn("failed to parse cluster id, skip it", zap.Error(err)) - reg = nil + return errors.Trace(err) } - for _, kv := range resp.Kvs { // example: /tidb/cdc///changefeed/info/ k := kv.Key[len(CDCPrefix):] @@ -84,69 +94,168 @@ func GetCDCChangefeedNameSet(ctx context.Context, cli *clientv3.Client) (*CDCNam if !found { continue } + if len(clusterAndNamespace) == 0 { + // They should be keys with format /tidb/cdc/changefeed/info. + *out = append(*out, changefeed{ + ID: string(changefeedID), + KeyVersion: legacy, + }) + continue + } + // example: clusterAndNamespace normally is / // but in migration scenario it become __backup__. we need handle it // see https://github.com/pingcap/tiflow/issues/9807 - clusterID, _, found := bytes.Cut(clusterAndNamespace, []byte(`/`)) + clusterID, namespace, found := bytes.Cut(clusterAndNamespace, []byte(`/`)) if !found { // ignore __backup__ or other formats continue } - if reg != nil { - matched := reg.Match(clusterID) - if !matched { - continue - } - if !isActiveCDCChangefeed(kv.Value) { - continue - } + matched := clusterNameRe.Match(clusterID) + if !matched { + continue + } + + *out = append(*out, changefeed{ + ID: string(changefeedID), + Cluster: string(clusterID), + Namespace: string(namespace), + KeyVersion: namespaced, + }) + } + return nil +} + +type changefeedInfoView struct { + State string `json:"state"` + Start uint64 `json:"start-ts"` +} + +type changefeedStatusView struct { + Checkpoint uint64 `json:"checkpoint-ts"` +} + +func (c checkCDCClient) fetchCheckpointTSFromStatus(ctx context.Context, cf changefeed) (uint64, error) { + statusResp, err := c.cli.KV.Get(ctx, cf.statusKey()) + if err != nil { + return 0, err + } + if statusResp.Count == 0 { + // The changefeed might was created recently. + return 0, nil + } + var status changefeedStatusView + if err := json.Unmarshal(statusResp.Kvs[0].Value, &status); err != nil { + return 0, err + } + return status.Checkpoint, nil +} + +func (c checkCDCClient) checkpointTSFor(ctx context.Context, cf changefeed) (uint64, error) { + infoResp, err := c.cli.KV.Get(ctx, cf.infoKey()) + if err != nil { + return 0, err + } + if infoResp.Count == 0 { + // The changefeed have been removed. + return math.MaxUint64, nil + } + var info changefeedInfoView + if err := json.Unmarshal(infoResp.Kvs[0].Value, &info); err != nil { + return 0, err + } + switch info.State { + // https://docs.pingcap.com/zh/tidb/stable/ticdc-changefeed-overview + case "stopped", "error": + return math.MaxInt64, nil + case "running", "warning": + cts, err := c.fetchCheckpointTSFromStatus(ctx, cf) + if err != nil { + return 0, err } + return mathutil.Max(cts, info.Start), nil + default: + // This changefeed may be noise, ignore it. + log.Warn("Ignoring invalid changefeed.", zap.Any("changefeed", cf)) + return math.MaxInt64, nil + } +} - nameSet[string(clusterAndNamespace)] = append(nameSet[string(clusterAndNamespace)], string(changefeedID)) +func (c checkCDCClient) getNameSet(ctx context.Context, safeTS uint64) (*CDCNameSet, error) { + changefeeds := make([]changefeed, 0) + if err := c.loadChangefeeds(ctx, &changefeeds); err != nil { + return nil, err } - if len(nameSet) == 0 { - // check etcd KV of CDC <= v6.1 - resp, err = cli.Get(ctx, CDCPrefixV61, clientv3.WithPrefix()) + + nameset := new(CDCNameSet) + for _, cf := range changefeeds { + // Special case: when safe ts not provided, just record all changefeeds. + if safeTS == math.MaxUint64 { + nameset.save(cf) + continue + } + + cts, err := c.checkpointTSFor(ctx, cf) if err != nil { - return nil, errors.Trace(err) + return nil, errors.Annotatef(err, "failed to check changefeed %v", cf) } - for _, kv := range resp.Kvs { - // example: /tidb/cdc/changefeed/info/ - k := kv.Key[len(CDCPrefixV61):] - if len(k) == 0 { - continue - } - if !isActiveCDCChangefeed(kv.Value) { - continue - } - - nameSet[""] = append(nameSet[""], string(k)) + if cts < safeTS { + nameset.save(cf) } } - return &CDCNameSet{nameSet}, nil + return nameset, nil } -type onlyState struct { - State string `json:"state"` +// CDCNameSet saves CDC changefeed's information. +// nameSet maps `cluster/namespace` to `changefeed`s +type CDCNameSet struct { + changefeeds map[string][]string } -func isActiveCDCChangefeed(jsonBytes []byte) bool { - s := onlyState{} - err := json.Unmarshal(jsonBytes, &s) - if err != nil { - // maybe a compatible issue, skip this key - log.L().Error("unmarshal etcd value failed when check CDC changefeed, will skip this key", - zap.ByteString("value", jsonBytes), - zap.Error(err)) - return false +func (s *CDCNameSet) save(cf changefeed) { + if s.changefeeds == nil { + s.changefeeds = map[string][]string{} } - switch s.State { - // https://docs.pingcap.com/zh/tidb/stable/ticdc-changefeed-overview - case "normal", "stopped", "error", "warning": - return true - default: - return false + + switch cf.KeyVersion { + case legacy: + s.changefeeds[""] = append(s.changefeeds[""], cf.ID) + case namespaced: + s.changefeeds[cf.Cluster] = append(s.changefeeds[cf.Cluster], cf.ID) } } + +// that the nameSet is empty means no changefeed exists. +func (s *CDCNameSet) Empty() bool { + return len(s.changefeeds) == 0 +} + +// MessageToUser convert the map `nameSet` to a readable message to user. +func (s *CDCNameSet) MessageToUser() string { + var changefeedMsgBuf strings.Builder + changefeedMsgBuf.WriteString("found CDC changefeed(s): ") + for clusterID, changefeedID := range s.changefeeds { + changefeedMsgBuf.WriteString("cluster/namespace: ") + changefeedMsgBuf.WriteString(clusterID) + changefeedMsgBuf.WriteString(" changefeed(s): ") + changefeedMsgBuf.WriteString(fmt.Sprintf("%v", changefeedID)) + changefeedMsgBuf.WriteString(", ") + } + return changefeedMsgBuf.String() +} + +// GetRunningChangefeeds gets CDC changefeed information and wraps them to a map +// for CDC >= v6.2, the etcd key format is /tidb/cdc///changefeed/info/ +// for CDC <= v6.1, the etcd key format is /tidb/cdc/changefeed/info/ +func GetRunningChangefeeds(ctx context.Context, cli *clientv3.Client) (*CDCNameSet, error) { + checkCli := checkCDCClient{cli: cli} + return checkCli.getNameSet(ctx, math.MaxUint64) +} + +// GetIncompatibleChangefeedsWithSafeTS gets CDC changefeed that may not compatible with the safe ts and wraps them to a map. +func GetIncompatibleChangefeedsWithSafeTS(ctx context.Context, cli *clientv3.Client, safeTS uint64) (*CDCNameSet, error) { + checkCli := checkCDCClient{cli: cli} + return checkCli.getNameSet(ctx, safeTS) +} diff --git a/br/pkg/utils/cdc_test.go b/br/pkg/utils/cdc_test.go index 3b5644c22631d..c716c1ca747b8 100644 --- a/br/pkg/utils/cdc_test.go +++ b/br/pkg/utils/cdc_test.go @@ -39,7 +39,7 @@ func TestGetCDCChangefeedNameSet(t *testing.T) { require.NoError(t, err) } - nameSet, err := utils.GetCDCChangefeedNameSet(ctx, cli) + nameSet, err := utils.GetRunningChangefeeds(ctx, cli) require.NoError(t, err) require.True(t, nameSet.Empty()) @@ -61,7 +61,7 @@ func TestGetCDCChangefeedNameSet(t *testing.T) { checkEtcdPut("/tidb/cdc/default/default/task/position/3ecd5c98-0148-4086-adfd-17641995e71f/test-1") checkEtcdPut("/tidb/cdc/default/default/upstream/7168358383033671922") - nameSet, err = utils.GetCDCChangefeedNameSet(ctx, cli) + nameSet, err = utils.GetRunningChangefeeds(ctx, cli) require.NoError(t, err) require.False(t, nameSet.Empty()) require.Equal(t, "found CDC changefeed(s): cluster/namespace: default/default changefeed(s): [test], ", @@ -80,7 +80,7 @@ func TestGetCDCChangefeedNameSet(t *testing.T) { checkEtcdPut("/tidb/cdc/owner/223184ad80a88b0b") checkEtcdPut("/tidb/cdc/task/position/f14cb04d-5ba1-410e-a59b-ccd796920e9d/test") - nameSet, err = utils.GetCDCChangefeedNameSet(ctx, cli) + nameSet, err = utils.GetRunningChangefeeds(ctx, cli) require.NoError(t, err) require.False(t, nameSet.Empty()) require.Equal(t, "found CDC changefeed(s): cluster/namespace: changefeed(s): [test], ", @@ -99,7 +99,7 @@ func TestGetCDCChangefeedNameSet(t *testing.T) { "/tidb/cdc/5402613591834624000/changefeed/info/test", `{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"normal","error":null,"creator-version":"v6.5.0-master-dirty"}`, ) - nameSet, err = utils.GetCDCChangefeedNameSet(ctx, cli) + nameSet, err = utils.GetRunningChangefeeds(ctx, cli) require.NoError(t, err) require.True(t, nameSet.Empty()) } diff --git a/pkg/executor/importer/precheck.go b/pkg/executor/importer/precheck.go index 6a2f208318e36..5734038a0ae59 100644 --- a/pkg/executor/importer/precheck.go +++ b/pkg/executor/importer/precheck.go @@ -120,7 +120,7 @@ func (*LoadDataController) checkCDCPiTRTasks(ctx context.Context) error { return exeerrors.ErrLoadDataPreCheckFailed.FastGenByArgs(fmt.Sprintf("found PiTR log streaming task(s): %v,", names)) } - nameSet, err := utils.GetCDCChangefeedNameSet(ctx, cli.GetClient()) + nameSet, err := utils.GetRunningChangefeeds(ctx, cli.GetClient()) if err != nil { return errors.Trace(err) } From c0b1585b96b4930d038cebf09f377794e9386458 Mon Sep 17 00:00:00 2001 From: Yu Juncen Date: Wed, 8 May 2024 15:05:58 +0800 Subject: [PATCH 2/8] added test cases Signed-off-by: Yu Juncen --- br/pkg/task/stream.go | 1 + br/pkg/utils/cdc.go | 30 ++++++------- br/pkg/utils/cdc_test.go | 79 ++++++++++++++++++++++++++++----- br/pkg/utils/export_for_test.go | 13 ++++++ 4 files changed, 97 insertions(+), 26 deletions(-) create mode 100644 br/pkg/utils/export_for_test.go diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index e05bb1b6dc357..9ea1ed77a71e4 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -54,6 +54,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/util/sqlexec" "github.com/spf13/pflag" + "github.com/stretchr/testify/assert" "github.com/tikv/client-go/v2/config" "github.com/tikv/client-go/v2/oracle" clientv3 "go.etcd.io/etcd/client/v3" diff --git a/br/pkg/utils/cdc.go b/br/pkg/utils/cdc.go index d762eb762119c..ba9cf5659b4b7 100644 --- a/br/pkg/utils/cdc.go +++ b/br/pkg/utils/cdc.go @@ -41,6 +41,8 @@ type keyVersion int const ( legacy keyVersion = 1 namespaced keyVersion = 2 + + invalidTs uint64 = math.MaxUint64 ) var ( @@ -89,7 +91,7 @@ func (c checkCDCClient) loadChangefeeds(ctx context.Context, out *[]changefeed) } for _, kv := range resp.Kvs { // example: /tidb/cdc///changefeed/info/ - k := kv.Key[len(CDCPrefix):] + k := kv.Key[len(CDCPrefix)-1:] clusterAndNamespace, changefeedID, found := bytes.Cut(k, []byte(ChangefeedPath)) if !found { continue @@ -106,7 +108,7 @@ func (c checkCDCClient) loadChangefeeds(ctx context.Context, out *[]changefeed) // example: clusterAndNamespace normally is / // but in migration scenario it become __backup__. we need handle it // see https://github.com/pingcap/tiflow/issues/9807 - clusterID, namespace, found := bytes.Cut(clusterAndNamespace, []byte(`/`)) + clusterID, namespace, found := bytes.Cut(clusterAndNamespace[1:], []byte(`/`)) if !found { // ignore __backup__ or other formats continue @@ -159,7 +161,7 @@ func (c checkCDCClient) checkpointTSFor(ctx context.Context, cf changefeed) (uin } if infoResp.Count == 0 { // The changefeed have been removed. - return math.MaxUint64, nil + return invalidTs, nil } var info changefeedInfoView if err := json.Unmarshal(infoResp.Kvs[0].Value, &info); err != nil { @@ -167,9 +169,9 @@ func (c checkCDCClient) checkpointTSFor(ctx context.Context, cf changefeed) (uin } switch info.State { // https://docs.pingcap.com/zh/tidb/stable/ticdc-changefeed-overview - case "stopped", "error": - return math.MaxInt64, nil - case "running", "warning": + case "error", "failed": + return invalidTs, nil + case "running", "warning", "normal", "stopped": cts, err := c.fetchCheckpointTSFromStatus(ctx, cf) if err != nil { return 0, err @@ -177,8 +179,8 @@ func (c checkCDCClient) checkpointTSFor(ctx context.Context, cf changefeed) (uin return mathutil.Max(cts, info.Start), nil default: // This changefeed may be noise, ignore it. - log.Warn("Ignoring invalid changefeed.", zap.Any("changefeed", cf)) - return math.MaxInt64, nil + log.Warn("Ignoring invalid changefeed.", zap.Any("changefeed", cf), zap.String("state", info.State)) + return invalidTs, nil } } @@ -190,17 +192,12 @@ func (c checkCDCClient) getNameSet(ctx context.Context, safeTS uint64) (*CDCName nameset := new(CDCNameSet) for _, cf := range changefeeds { - // Special case: when safe ts not provided, just record all changefeeds. - if safeTS == math.MaxUint64 { - nameset.save(cf) - continue - } - cts, err := c.checkpointTSFor(ctx, cf) if err != nil { return nil, errors.Annotatef(err, "failed to check changefeed %v", cf) } if cts < safeTS { + log.Info("Found incompatible changefeed.", zap.Any("changefeed", cf), zap.Uint64("checkpoint-ts", cts), zap.Uint64("safe-ts", safeTS)) nameset.save(cf) } } @@ -223,7 +220,8 @@ func (s *CDCNameSet) save(cf changefeed) { case legacy: s.changefeeds[""] = append(s.changefeeds[""], cf.ID) case namespaced: - s.changefeeds[cf.Cluster] = append(s.changefeeds[cf.Cluster], cf.ID) + k := path.Join(cf.Cluster, cf.Namespace) + s.changefeeds[k] = append(s.changefeeds[k], cf.ID) } } @@ -251,7 +249,7 @@ func (s *CDCNameSet) MessageToUser() string { // for CDC <= v6.1, the etcd key format is /tidb/cdc/changefeed/info/ func GetRunningChangefeeds(ctx context.Context, cli *clientv3.Client) (*CDCNameSet, error) { checkCli := checkCDCClient{cli: cli} - return checkCli.getNameSet(ctx, math.MaxUint64) + return checkCli.getNameSet(ctx, invalidTs) } // GetIncompatibleChangefeedsWithSafeTS gets CDC changefeed that may not compatible with the safe ts and wraps them to a map. diff --git a/br/pkg/utils/cdc_test.go b/br/pkg/utils/cdc_test.go index c716c1ca747b8..c20a81ff618d8 100644 --- a/br/pkg/utils/cdc_test.go +++ b/br/pkg/utils/cdc_test.go @@ -15,6 +15,8 @@ package utils_test import ( "context" + "path" + "strconv" "testing" "github.com/pingcap/tidb/br/pkg/utils" @@ -23,13 +25,73 @@ import ( "go.etcd.io/etcd/tests/v3/integration" ) -func TestGetCDCChangefeedNameSet(t *testing.T) { +func TestCDCCheckWithEmbedEtcd(t *testing.T) { integration.BeforeTestExternal(t) testEtcdCluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer testEtcdCluster.Terminate(t) + cli := testEtcdCluster.RandClient() + + t.Run("testGetCDCChangefeedNameSet", func(t *testing.T) { testGetCDCChangefeedNameSet(t, cli) }) + cli.KV.Delete(context.Background(), "", clientv3.WithPrefix()) + t.Run("testGEtConflictChangefeeds", func(t *testing.T) { testGetConflictChangefeeds(t, cli) }) +} + +func testGetConflictChangefeeds(t *testing.T, cli *clientv3.Client) { + checkEtcdPut := func(key string, vals ...string) { + val := "" + if len(vals) == 1 { + val = vals[0] + } + _, err := cli.Put(context.Background(), key, val) + require.NoError(t, err) + } + putLameChangefeed := func(name string, status string, startTs uint64) { + checkEtcdPut( + path.Join("/tidb/cdc/default/default/changefeed/info/", name), + `{"state":"`+status+`", "start-ts": `+strconv.Itoa(int(startTs))+`}`, + ) + } + putChangefeed := func(name string, status string, startTs uint64, checkpointTs uint64) { + putLameChangefeed(name, status, startTs) + checkEtcdPut(path.Join("/tidb/cdc/default/default/changefeed/status/", name), `{"checkpoint-ts": `+strconv.Itoa(int(checkpointTs))+`}`) + } + + checkEtcdPut("/tidb/cdc/default/__cdc_meta__/capture/3ecd5c98-0148-4086-adfd-17641995e71f") + checkEtcdPut("/tidb/cdc/default/__cdc_meta__/meta/meta-version") + checkEtcdPut("/tidb/cdc/default/__cdc_meta__/meta/ticdc-delete-etcd-key-count") + checkEtcdPut("/tidb/cdc/default/__cdc_meta__/owner/22318498f4dd6639") + checkEtcdPut("/tidb/cdc/default/default/upstream/7168358383033671922") + putChangefeed("st-ok", "normal", 1, 43) + putChangefeed("st-fail", "normal", 1, 41) + putLameChangefeed("skipped", "failed", 1) + putLameChangefeed("nost-ok", "normal", 43) + putLameChangefeed("nost-fail", "normal", 41) + + names, err := utils.GetIncompatibleChangefeedsWithSafeTS(context.Background(), cli, 42) + require.NoError(t, err) + require.ElementsMatch(t, names.TEST_getChangefeedNames(), []string{ + "default/default/nost-fail", + "default/default/st-fail", + }) + + names2, err := utils.GetIncompatibleChangefeedsWithSafeTS(context.Background(), cli, 40) + require.NoError(t, err) + require.ElementsMatch(t, names2.TEST_getChangefeedNames(), []string{}) + + names3, err := utils.GetIncompatibleChangefeedsWithSafeTS(context.Background(), cli, 48) + require.NoError(t, err) + require.ElementsMatch(t, names3.TEST_getChangefeedNames(), []string{ + "default/default/nost-fail", + "default/default/st-fail", + "default/default/nost-ok", + "default/default/st-ok", + }) + +} + +func testGetCDCChangefeedNameSet(t *testing.T, cli *clientv3.Client) { ctx := context.Background() - cli := testEtcdCluster.RandClient() checkEtcdPut := func(key string, vals ...string) { val := "" if len(vals) == 1 { @@ -50,13 +112,12 @@ func TestGetCDCChangefeedNameSet(t *testing.T) { checkEtcdPut("/tidb/cdc/default/__cdc_meta__/owner/22318498f4dd6639") checkEtcdPut( "/tidb/cdc/default/default/changefeed/info/test", - `{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"normal","error":null,"creator-version":"v6.5.0-master-dirty"}`, + `{"state":"normal"}`, ) checkEtcdPut( "/tidb/cdc/default/default/changefeed/info/test-1", - `{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"failed","error":null,"creator-version":"v6.5.0-master-dirty"}`, + `{"state":"failed"}`, ) - checkEtcdPut("/tidb/cdc/default/default/changefeed/status/test") checkEtcdPut("/tidb/cdc/default/default/changefeed/status/test-1") checkEtcdPut("/tidb/cdc/default/default/task/position/3ecd5c98-0148-4086-adfd-17641995e71f/test-1") checkEtcdPut("/tidb/cdc/default/default/upstream/7168358383033671922") @@ -64,8 +125,7 @@ func TestGetCDCChangefeedNameSet(t *testing.T) { nameSet, err = utils.GetRunningChangefeeds(ctx, cli) require.NoError(t, err) require.False(t, nameSet.Empty()) - require.Equal(t, "found CDC changefeed(s): cluster/namespace: default/default changefeed(s): [test], ", - nameSet.MessageToUser()) + require.ElementsMatch(t, nameSet.TEST_getChangefeedNames(), []string{"default/default/test"}) _, err = cli.Delete(ctx, "/tidb/cdc/", clientv3.WithPrefix()) require.NoError(t, err) @@ -74,7 +134,7 @@ func TestGetCDCChangefeedNameSet(t *testing.T) { checkEtcdPut("/tidb/cdc/capture/f14cb04d-5ba1-410e-a59b-ccd796920e9d") checkEtcdPut( "/tidb/cdc/changefeed/info/test", - `{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"stopped","error":null,"creator-version":"v6.5.0-master-dirty"}`, + `{"state":"stopped"}`, ) checkEtcdPut("/tidb/cdc/job/test") checkEtcdPut("/tidb/cdc/owner/223184ad80a88b0b") @@ -83,8 +143,7 @@ func TestGetCDCChangefeedNameSet(t *testing.T) { nameSet, err = utils.GetRunningChangefeeds(ctx, cli) require.NoError(t, err) require.False(t, nameSet.Empty()) - require.Equal(t, "found CDC changefeed(s): cluster/namespace: changefeed(s): [test], ", - nameSet.MessageToUser()) + require.ElementsMatch(t, nameSet.TEST_getChangefeedNames(), []string{"/test"}) _, err = cli.Delete(ctx, "/tidb/cdc/", clientv3.WithPrefix()) require.NoError(t, err) diff --git a/br/pkg/utils/export_for_test.go b/br/pkg/utils/export_for_test.go new file mode 100644 index 0000000000000..ff256662272be --- /dev/null +++ b/br/pkg/utils/export_for_test.go @@ -0,0 +1,13 @@ +package utils + +import "path" + +func (c *CDCNameSet) TEST_getChangefeedNames() []string { + names := make([]string, 0, len(c.changefeeds)) + for ns, cfs := range c.changefeeds { + for _, cf := range cfs { + names = append(names, path.Join(ns, cf)) + } + } + return names +} From be727dab9f001bd82cf3d5c00a8b504dae615aa2 Mon Sep 17 00:00:00 2001 From: Yu Juncen Date: Wed, 8 May 2024 17:02:04 +0800 Subject: [PATCH 3/8] make clippy happy Signed-off-by: Yu Juncen --- br/pkg/task/stream.go | 1 - br/pkg/utils/BUILD.bazel | 2 ++ br/pkg/utils/cdc.go | 4 ++-- br/pkg/utils/cdc_test.go | 1 - 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 9ea1ed77a71e4..e05bb1b6dc357 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -54,7 +54,6 @@ import ( "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/util/sqlexec" "github.com/spf13/pflag" - "github.com/stretchr/testify/assert" "github.com/tikv/client-go/v2/config" "github.com/tikv/client-go/v2/oracle" clientv3 "go.etcd.io/etcd/client/v3" diff --git a/br/pkg/utils/BUILD.bazel b/br/pkg/utils/BUILD.bazel index 1294deed838d2..094b63065026e 100644 --- a/br/pkg/utils/BUILD.bazel +++ b/br/pkg/utils/BUILD.bazel @@ -37,6 +37,7 @@ go_library( "//pkg/parser/types", "//pkg/sessionctx", "//pkg/util", + "//pkg/util/mathutil", "//pkg/util/sqlexec", "@com_github_cheggaaa_pb_v3//:pb", "@com_github_docker_go_units//:go-units", @@ -74,6 +75,7 @@ go_test( "cdc_test.go", "db_test.go", "env_test.go", + "export_for_test.go", "json_test.go", "key_test.go", "main_test.go", diff --git a/br/pkg/utils/cdc.go b/br/pkg/utils/cdc.go index ba9cf5659b4b7..830c74848b4eb 100644 --- a/br/pkg/utils/cdc.go +++ b/br/pkg/utils/cdc.go @@ -66,7 +66,7 @@ func (c *changefeed) infoKey() string { return path.Join(CDCPrefix, c.Cluster, c.Namespace, "changefeed", "info", c.ID) } log.Panic("Invalid changefeed version type.", zap.Any("this", c)) - panic("unreachable") + return "" } func (c *changefeed) statusKey() string { @@ -77,7 +77,7 @@ func (c *changefeed) statusKey() string { return path.Join(CDCPrefix, c.Cluster, c.Namespace, "changefeed", "status", c.ID) } log.Panic("Invalid changefeed version type.", zap.Any("this", c)) - panic("unreachable") + return "" } type checkCDCClient struct { diff --git a/br/pkg/utils/cdc_test.go b/br/pkg/utils/cdc_test.go index c20a81ff618d8..6ed1eda88831c 100644 --- a/br/pkg/utils/cdc_test.go +++ b/br/pkg/utils/cdc_test.go @@ -87,7 +87,6 @@ func testGetConflictChangefeeds(t *testing.T, cli *clientv3.Client) { "default/default/nost-ok", "default/default/st-ok", }) - } func testGetCDCChangefeedNameSet(t *testing.T, cli *clientv3.Client) { From 5d85495d8c442acc6446e3853cdb566cd7f84ed1 Mon Sep 17 00:00:00 2001 From: Yu Juncen Date: Thu, 9 May 2024 11:22:41 +0800 Subject: [PATCH 4/8] resolve conflicts Signed-off-by: Yu Juncen --- br/pkg/utils/export_for_test.go | 13 ------------- pkg/util/cdcutil/BUILD.bazel | 8 ++++++-- pkg/util/cdcutil/cdc.go | 11 +++++++---- pkg/util/cdcutil/export_for_test.go | 27 +++++++++++++++++++++++++++ 4 files changed, 40 insertions(+), 19 deletions(-) delete mode 100644 br/pkg/utils/export_for_test.go create mode 100644 pkg/util/cdcutil/export_for_test.go diff --git a/br/pkg/utils/export_for_test.go b/br/pkg/utils/export_for_test.go deleted file mode 100644 index ff256662272be..0000000000000 --- a/br/pkg/utils/export_for_test.go +++ /dev/null @@ -1,13 +0,0 @@ -package utils - -import "path" - -func (c *CDCNameSet) TEST_getChangefeedNames() []string { - names := make([]string, 0, len(c.changefeeds)) - for ns, cfs := range c.changefeeds { - for _, cf := range cfs { - names = append(names, path.Join(ns, cf)) - } - } - return names -} diff --git a/pkg/util/cdcutil/BUILD.bazel b/pkg/util/cdcutil/BUILD.bazel index f45d01d125739..5b70a0cc2973a 100644 --- a/pkg/util/cdcutil/BUILD.bazel +++ b/pkg/util/cdcutil/BUILD.bazel @@ -6,6 +6,7 @@ go_library( importpath = "github.com/pingcap/tidb/pkg/util/cdcutil", visibility = ["//visibility:public"], deps = [ + "//pkg/util/mathutil", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_log//:log", "@io_etcd_go_etcd_client_v3//:client", @@ -16,10 +17,13 @@ go_library( go_test( name = "cdcutil_test", timeout = "short", - srcs = ["cdc_test.go"], + srcs = [ + "cdc_test.go", + "export_for_test.go", + ], + embed = [":cdcutil"], flaky = True, deps = [ - ":cdcutil", "@com_github_stretchr_testify//require", "@io_etcd_go_etcd_client_v3//:client", "@io_etcd_go_etcd_tests_v3//integration", diff --git a/pkg/util/cdcutil/cdc.go b/pkg/util/cdcutil/cdc.go index a2b88a269df54..bf58306aee46d 100644 --- a/pkg/util/cdcutil/cdc.go +++ b/pkg/util/cdcutil/cdc.go @@ -96,10 +96,13 @@ func (c checkCDCClient) loadChangefeeds(ctx context.Context, out *[]changefeed) for _, kv := range resp.Kvs { // example: /tidb/cdc///changefeed/info/ k := kv.Key[len(CDCPrefix)-1:] + // example of k( >6.1): ///changefeed/info/ + // example of k(<=6.1): /changefeed/info/ clusterAndNamespace, changefeedID, found := bytes.Cut(k, []byte(ChangefeedPath)) if !found { continue } + if len(clusterAndNamespace) == 0 { // They should be keys with format /tidb/cdc/changefeed/info. *out = append(*out, changefeed{ @@ -109,7 +112,7 @@ func (c checkCDCClient) loadChangefeeds(ctx context.Context, out *[]changefeed) continue } - // example: clusterAndNamespace normally is / + // example: clusterAndNamespace[1:] normally is / // but in migration scenario it become __backup__. we need handle it // see https://github.com/pingcap/tiflow/issues/9807 clusterID, namespace, found := bytes.Cut(clusterAndNamespace[1:], []byte(`/`)) @@ -188,7 +191,7 @@ func (c checkCDCClient) checkpointTSFor(ctx context.Context, cf changefeed) (uin } } -func (c checkCDCClient) getNameSet(ctx context.Context, safeTS uint64) (*CDCNameSet, error) { +func (c checkCDCClient) getIncompatible(ctx context.Context, safeTS uint64) (*CDCNameSet, error) { changefeeds := make([]changefeed, 0) if err := c.loadChangefeeds(ctx, &changefeeds); err != nil { return nil, err @@ -253,11 +256,11 @@ func (s *CDCNameSet) MessageToUser() string { // for CDC <= v6.1, the etcd key format is /tidb/cdc/changefeed/info/ func GetRunningChangefeeds(ctx context.Context, cli *clientv3.Client) (*CDCNameSet, error) { checkCli := checkCDCClient{cli: cli} - return checkCli.getNameSet(ctx, invalidTs) + return checkCli.getIncompatible(ctx, invalidTs) } // GetIncompatibleChangefeedsWithSafeTS gets CDC changefeed that may not compatible with the safe ts and wraps them to a map. func GetIncompatibleChangefeedsWithSafeTS(ctx context.Context, cli *clientv3.Client, safeTS uint64) (*CDCNameSet, error) { checkCli := checkCDCClient{cli: cli} - return checkCli.getNameSet(ctx, safeTS) + return checkCli.getIncompatible(ctx, safeTS) } diff --git a/pkg/util/cdcutil/export_for_test.go b/pkg/util/cdcutil/export_for_test.go new file mode 100644 index 0000000000000..a06386407f03d --- /dev/null +++ b/pkg/util/cdcutil/export_for_test.go @@ -0,0 +1,27 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cdcutil + +import "path" + +func (c *CDCNameSet) TEST_getChangefeedNames() []string { + names := make([]string, 0, len(c.changefeeds)) + for ns, cfs := range c.changefeeds { + for _, cf := range cfs { + names = append(names, path.Join(ns, cf)) + } + } + return names +} From 2eb39cb65745254fc8f1f82ec9eb4c950879511b Mon Sep 17 00:00:00 2001 From: Yu Juncen Date: Thu, 9 May 2024 12:01:49 +0800 Subject: [PATCH 5/8] Add check for incompatible TiCDC changefeeds before restoring Signed-off-by: Yu Juncen --- br/pkg/task/restore.go | 8 ++++++++ br/pkg/task/stream.go | 19 ++++++++++--------- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index bebc5fc458795..d7f1505c49ca3 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -754,6 +754,14 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf if err != nil { return errors.Trace(err) } + if cfg.CheckRequirements { + err := checkIncompatibleChangefeed(ctx, backupMeta.EndVersion, mgr.GetDomain().GetEtcdClient()) + log.Info("Checking incompatible TiCDC changefeeds before restoring.", + logutil.ShortError(err), zap.Uint64("restore-ts", backupMeta.EndVersion)) + if err != nil { + return errors.Trace(err) + } + } backupVersion := version.NormalizeBackupVersion(backupMeta.ClusterVersion) if cfg.CheckRequirements && backupVersion != nil { diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index f3a0cc5206f22..e63718405a92a 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1116,15 +1116,16 @@ func checkTaskExists(ctx context.Context, cfg *RestoreConfig, etcdCLI *clientv3. "create log-backup task again and create a full backup on this cluster", tasks[0].Info.Name) } - // check cdc changefeed - if cfg.CheckRequirements { - nameSet, err := cdcutil.GetRunningChangefeeds(ctx, etcdCLI) - if err != nil { - return err - } - if !nameSet.Empty() { - return errors.Errorf("%splease remove changefeed(s) before restore", nameSet.MessageToUser()) - } + return nil +} + +func checkIncompatibleChangefeed(ctx context.Context, backupTS uint64, etcdCLI *clientv3.Client) error { + nameSet, err := cdcutil.GetIncompatibleChangefeedsWithSafeTS(ctx, etcdCLI, backupTS) + if err != nil { + return err + } + if !nameSet.Empty() { + return errors.Errorf("%splease remove changefeed(s) before restore", nameSet.MessageToUser()) } return nil } From f3167fe80190ebc43b5f9301d4087a08b8c35024 Mon Sep 17 00:00:00 2001 From: Yu Juncen Date: Thu, 9 May 2024 13:20:21 +0800 Subject: [PATCH 6/8] make clippy happy Signed-off-by: Yu Juncen --- pkg/util/cdcutil/cdc_test.go | 10 +++++----- pkg/util/cdcutil/export_for_test.go | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/util/cdcutil/cdc_test.go b/pkg/util/cdcutil/cdc_test.go index bf37373f9da3c..412acbb81a0b7 100644 --- a/pkg/util/cdcutil/cdc_test.go +++ b/pkg/util/cdcutil/cdc_test.go @@ -71,18 +71,18 @@ func testGetConflictChangefeeds(t *testing.T, cli *clientv3.Client) { names, err := cdcutil.GetIncompatibleChangefeedsWithSafeTS(context.Background(), cli, 42) require.NoError(t, err) - require.ElementsMatch(t, names.TEST_getChangefeedNames(), []string{ + require.ElementsMatch(t, names.TESTGetChangefeedNames(), []string{ "default/default/nost-fail", "default/default/st-fail", }) names2, err := cdcutil.GetIncompatibleChangefeedsWithSafeTS(context.Background(), cli, 40) require.NoError(t, err) - require.ElementsMatch(t, names2.TEST_getChangefeedNames(), []string{}) + require.ElementsMatch(t, names2.TESTGetChangefeedNames(), []string{}) names3, err := cdcutil.GetIncompatibleChangefeedsWithSafeTS(context.Background(), cli, 48) require.NoError(t, err) - require.ElementsMatch(t, names3.TEST_getChangefeedNames(), []string{ + require.ElementsMatch(t, names3.TESTGetChangefeedNames(), []string{ "default/default/nost-fail", "default/default/st-fail", "default/default/nost-ok", @@ -125,7 +125,7 @@ func testGetCDCChangefeedNameSet(t *testing.T, cli *clientv3.Client) { nameSet, err = cdcutil.GetRunningChangefeeds(ctx, cli) require.NoError(t, err) require.False(t, nameSet.Empty()) - require.ElementsMatch(t, nameSet.TEST_getChangefeedNames(), []string{"default/default/test"}) + require.ElementsMatch(t, nameSet.TESTGetChangefeedNames(), []string{"default/default/test"}) _, err = cli.Delete(ctx, "/tidb/cdc/", clientv3.WithPrefix()) require.NoError(t, err) @@ -143,7 +143,7 @@ func testGetCDCChangefeedNameSet(t *testing.T, cli *clientv3.Client) { nameSet, err = cdcutil.GetRunningChangefeeds(ctx, cli) require.NoError(t, err) require.False(t, nameSet.Empty()) - require.ElementsMatch(t, nameSet.TEST_getChangefeedNames(), []string{"/test"}) + require.ElementsMatch(t, nameSet.TESTGetChangefeedNames(), []string{"/test"}) _, err = cli.Delete(ctx, "/tidb/cdc/", clientv3.WithPrefix()) require.NoError(t, err) diff --git a/pkg/util/cdcutil/export_for_test.go b/pkg/util/cdcutil/export_for_test.go index a06386407f03d..5e13d85094cea 100644 --- a/pkg/util/cdcutil/export_for_test.go +++ b/pkg/util/cdcutil/export_for_test.go @@ -16,7 +16,7 @@ package cdcutil import "path" -func (c *CDCNameSet) TEST_getChangefeedNames() []string { +func (c *CDCNameSet) TESTGetChangefeedNames() []string { names := make([]string, 0, len(c.changefeeds)) for ns, cfs := range c.changefeeds { for _, cf := range cfs { From 0ad59980a88107415705066ba8978b7e71a74464 Mon Sep 17 00:00:00 2001 From: Yu Juncen Date: Thu, 9 May 2024 14:14:55 +0800 Subject: [PATCH 7/8] Fix error handling in fetchCheckpointTSFromStatus and checkpointTSFor functions Signed-off-by: Yu Juncen --- pkg/util/cdcutil/cdc.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/util/cdcutil/cdc.go b/pkg/util/cdcutil/cdc.go index bf58306aee46d..61c26586d4b43 100644 --- a/pkg/util/cdcutil/cdc.go +++ b/pkg/util/cdcutil/cdc.go @@ -148,15 +148,15 @@ type changefeedStatusView struct { func (c checkCDCClient) fetchCheckpointTSFromStatus(ctx context.Context, cf changefeed) (uint64, error) { statusResp, err := c.cli.KV.Get(ctx, cf.statusKey()) if err != nil { - return 0, err + return 0, errors.Trace(err) } - if statusResp.Count == 0 { - // The changefeed might was created recently. + if statusResp.Count == 0 || len(statusResp.Kvs[0].Value) == 0 { + // The changefeed might was created recently, or just a phantom in test cases... return 0, nil } var status changefeedStatusView if err := json.Unmarshal(statusResp.Kvs[0].Value, &status); err != nil { - return 0, err + return 0, errors.Trace(err) } return status.Checkpoint, nil } @@ -164,7 +164,7 @@ func (c checkCDCClient) fetchCheckpointTSFromStatus(ctx context.Context, cf chan func (c checkCDCClient) checkpointTSFor(ctx context.Context, cf changefeed) (uint64, error) { infoResp, err := c.cli.KV.Get(ctx, cf.infoKey()) if err != nil { - return 0, err + return 0, errors.Trace(err) } if infoResp.Count == 0 { // The changefeed have been removed. @@ -172,7 +172,7 @@ func (c checkCDCClient) checkpointTSFor(ctx context.Context, cf changefeed) (uin } var info changefeedInfoView if err := json.Unmarshal(infoResp.Kvs[0].Value, &info); err != nil { - return 0, err + return 0, errors.Trace(err) } switch info.State { // https://docs.pingcap.com/zh/tidb/stable/ticdc-changefeed-overview From cbbd4324a5d3a81432e84b2a9d11df449d2497aa Mon Sep 17 00:00:00 2001 From: Yu Juncen Date: Thu, 9 May 2024 14:44:51 +0800 Subject: [PATCH 8/8] handing state by comment Signed-off-by: Yu Juncen --- pkg/util/cdcutil/cdc.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/util/cdcutil/cdc.go b/pkg/util/cdcutil/cdc.go index 61c26586d4b43..a8ea12e37bf47 100644 --- a/pkg/util/cdcutil/cdc.go +++ b/pkg/util/cdcutil/cdc.go @@ -176,9 +176,9 @@ func (c checkCDCClient) checkpointTSFor(ctx context.Context, cf changefeed) (uin } switch info.State { // https://docs.pingcap.com/zh/tidb/stable/ticdc-changefeed-overview - case "error", "failed": + case "failed", "finished": return invalidTs, nil - case "running", "warning", "normal", "stopped": + case "running", "warning", "normal", "stopped", "error": cts, err := c.fetchCheckpointTSFromStatus(ctx, cf) if err != nil { return 0, err