diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index bebc5fc458795..d7f1505c49ca3 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -754,6 +754,14 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf if err != nil { return errors.Trace(err) } + if cfg.CheckRequirements { + err := checkIncompatibleChangefeed(ctx, backupMeta.EndVersion, mgr.GetDomain().GetEtcdClient()) + log.Info("Checking incompatible TiCDC changefeeds before restoring.", + logutil.ShortError(err), zap.Uint64("restore-ts", backupMeta.EndVersion)) + if err != nil { + return errors.Trace(err) + } + } backupVersion := version.NormalizeBackupVersion(backupMeta.ClusterVersion) if cfg.CheckRequirements && backupVersion != nil { diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index d032b3c517c9e..e63718405a92a 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1116,15 +1116,16 @@ func checkTaskExists(ctx context.Context, cfg *RestoreConfig, etcdCLI *clientv3. "create log-backup task again and create a full backup on this cluster", tasks[0].Info.Name) } - // check cdc changefeed - if cfg.CheckRequirements { - nameSet, err := cdcutil.GetCDCChangefeedNameSet(ctx, etcdCLI) - if err != nil { - return err - } - if !nameSet.Empty() { - return errors.Errorf("%splease remove changefeed(s) before restore", nameSet.MessageToUser()) - } + return nil +} + +func checkIncompatibleChangefeed(ctx context.Context, backupTS uint64, etcdCLI *clientv3.Client) error { + nameSet, err := cdcutil.GetIncompatibleChangefeedsWithSafeTS(ctx, etcdCLI, backupTS) + if err != nil { + return err + } + if !nameSet.Empty() { + return errors.Errorf("%splease remove changefeed(s) before restore", nameSet.MessageToUser()) } return nil } diff --git a/lightning/pkg/importer/precheck_impl.go b/lightning/pkg/importer/precheck_impl.go index 04948dc12bdb6..3327813e701fa 100644 --- a/lightning/pkg/importer/precheck_impl.go +++ b/lightning/pkg/importer/precheck_impl.go @@ -837,7 +837,7 @@ func (ci *CDCPITRCheckItem) Check(ctx context.Context) (*precheck.CheckResult, e errorMsg = append(errorMsg, fmt.Sprintf("found PiTR log streaming task(s): %v,", names)) } - nameSet, err := cdcutil.GetCDCChangefeedNameSet(ctx, ci.etcdCli) + nameSet, err := cdcutil.GetRunningChangefeeds(ctx, ci.etcdCli) if err != nil { return nil, errors.Trace(err) } diff --git a/pkg/executor/importer/precheck.go b/pkg/executor/importer/precheck.go index db0739be0055d..5f4363ba5b900 100644 --- a/pkg/executor/importer/precheck.go +++ b/pkg/executor/importer/precheck.go @@ -130,7 +130,7 @@ func (*LoadDataController) checkCDCPiTRTasks(ctx context.Context) error { return exeerrors.ErrLoadDataPreCheckFailed.FastGenByArgs(fmt.Sprintf("found PiTR log streaming task(s): %v,", names)) } - nameSet, err := cdcutil.GetCDCChangefeedNameSet(ctx, cli.GetClient()) + nameSet, err := cdcutil.GetRunningChangefeeds(ctx, cli.GetClient()) if err != nil { return errors.Trace(err) } diff --git a/pkg/util/cdcutil/BUILD.bazel b/pkg/util/cdcutil/BUILD.bazel index f45d01d125739..5b70a0cc2973a 100644 --- a/pkg/util/cdcutil/BUILD.bazel +++ b/pkg/util/cdcutil/BUILD.bazel @@ -6,6 +6,7 @@ go_library( importpath = "github.com/pingcap/tidb/pkg/util/cdcutil", visibility = ["//visibility:public"], deps = [ + "//pkg/util/mathutil", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_log//:log", "@io_etcd_go_etcd_client_v3//:client", @@ -16,10 +17,13 @@ go_library( go_test( name = "cdcutil_test", timeout = "short", - srcs = ["cdc_test.go"], + srcs = [ + "cdc_test.go", + "export_for_test.go", + ], + embed = [":cdcutil"], flaky = True, deps = [ - ":cdcutil", "@com_github_stretchr_testify//require", "@io_etcd_go_etcd_client_v3//:client", "@io_etcd_go_etcd_tests_v3//integration", diff --git a/pkg/util/cdcutil/cdc.go b/pkg/util/cdcutil/cdc.go index f1a75cf905800..a8ea12e37bf47 100644 --- a/pkg/util/cdcutil/cdc.go +++ b/pkg/util/cdcutil/cdc.go @@ -19,11 +19,14 @@ import ( "context" "encoding/json" "fmt" + "math" + "path" "regexp" "strings" "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/tidb/pkg/util/mathutil" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) @@ -37,120 +40,227 @@ const ( CDCPrefixV61 = "/tidb/cdc/changefeed/info/" ) -// CDCNameSet saves CDC changefeed's information. -// nameSet maps `cluster/namespace` to `changefeed`s -type CDCNameSet struct { - nameSet map[string][]string -} +type keyVersion int -// Empty that the nameSet is empty means no changefeed exists. -func (s *CDCNameSet) Empty() bool { - return len(s.nameSet) == 0 +const ( + legacy keyVersion = 1 + namespaced keyVersion = 2 + + invalidTs uint64 = math.MaxUint64 +) + +var ( + // cluster id should be valid in + // https://github.com/pingcap/tiflow/blob/ca69c33948bea082aff9f4c0a357ace735b494ed/pkg/config/server_config.go#L218 + clusterNameRe = regexp.MustCompile("^[a-zA-Z0-9]+(-[a-zA-Z0-9]+)*$") +) + +type changefeed struct { + ID string + Cluster string + Namespace string + KeyVersion keyVersion } -// MessageToUser convert the map `nameSet` to a readable message to user. -func (s *CDCNameSet) MessageToUser() string { - var changefeedMsgBuf strings.Builder - changefeedMsgBuf.WriteString("found CDC changefeed(s): ") - for clusterID, captureIDs := range s.nameSet { - changefeedMsgBuf.WriteString("cluster/namespace: ") - changefeedMsgBuf.WriteString(clusterID) - changefeedMsgBuf.WriteString(" changefeed(s): ") - changefeedMsgBuf.WriteString(fmt.Sprintf("%v", captureIDs)) - changefeedMsgBuf.WriteString(", ") +func (c *changefeed) infoKey() string { + switch c.KeyVersion { + case legacy: + return path.Join(CDCPrefix, "changefeed", "info", c.ID) + case namespaced: + return path.Join(CDCPrefix, c.Cluster, c.Namespace, "changefeed", "info", c.ID) } - return changefeedMsgBuf.String() + log.Panic("Invalid changefeed version type.", zap.Any("this", c)) + return "" } -// GetCDCChangefeedNameSet gets CDC changefeed information and wraps them to a map -// for CDC >= v6.2, the etcd key format is /tidb/cdc///changefeed/info/ -// for CDC <= v6.1, the etcd key format is /tidb/cdc/changefeed/info/ -func GetCDCChangefeedNameSet(ctx context.Context, cli *clientv3.Client) (*CDCNameSet, error) { - nameSet := make(map[string][]string, 1) - // check etcd KV of CDC >= v6.2 - resp, err := cli.Get(ctx, CDCPrefix, clientv3.WithPrefix()) - if err != nil { - return nil, errors.Trace(err) +func (c *changefeed) statusKey() string { + switch c.KeyVersion { + case legacy: + return path.Join(CDCPrefix, "changefeed", "status", c.ID) + case namespaced: + return path.Join(CDCPrefix, c.Cluster, c.Namespace, "changefeed", "status", c.ID) } + log.Panic("Invalid changefeed version type.", zap.Any("this", c)) + return "" +} - // cluster id should be valid in - // https://github.com/pingcap/tiflow/blob/ca69c33948bea082aff9f4c0a357ace735b494ed/pkg/config/server_config.go#L218 - reg, err := regexp.Compile("^[a-zA-Z0-9]+(-[a-zA-Z0-9]+)*$") +type checkCDCClient struct { + cli *clientv3.Client +} + +func (c checkCDCClient) loadChangefeeds(ctx context.Context, out *[]changefeed) error { + resp, err := c.cli.Get(ctx, CDCPrefix, clientv3.WithPrefix(), clientv3.WithKeysOnly()) if err != nil { - log.L().Warn("failed to parse cluster id, skip it", zap.Error(err)) - reg = nil + return errors.Trace(err) } - for _, kv := range resp.Kvs { // example: /tidb/cdc///changefeed/info/ - k := kv.Key[len(CDCPrefix):] + k := kv.Key[len(CDCPrefix)-1:] + // example of k( >6.1): ///changefeed/info/ + // example of k(<=6.1): /changefeed/info/ clusterAndNamespace, changefeedID, found := bytes.Cut(k, []byte(ChangefeedPath)) if !found { continue } - // example: clusterAndNamespace normally is / + + if len(clusterAndNamespace) == 0 { + // They should be keys with format /tidb/cdc/changefeed/info. + *out = append(*out, changefeed{ + ID: string(changefeedID), + KeyVersion: legacy, + }) + continue + } + + // example: clusterAndNamespace[1:] normally is / // but in migration scenario it become __backup__. we need handle it // see https://github.com/pingcap/tiflow/issues/9807 - clusterID, _, found := bytes.Cut(clusterAndNamespace, []byte(`/`)) + clusterID, namespace, found := bytes.Cut(clusterAndNamespace[1:], []byte(`/`)) if !found { // ignore __backup__ or other formats continue } - if reg != nil { - matched := reg.Match(clusterID) - if !matched { - continue - } - if !isActiveCDCChangefeed(kv.Value) { - continue - } + matched := clusterNameRe.Match(clusterID) + if !matched { + continue } - nameSet[string(clusterAndNamespace)] = append(nameSet[string(clusterAndNamespace)], string(changefeedID)) + *out = append(*out, changefeed{ + ID: string(changefeedID), + Cluster: string(clusterID), + Namespace: string(namespace), + KeyVersion: namespaced, + }) + } + return nil +} + +type changefeedInfoView struct { + State string `json:"state"` + Start uint64 `json:"start-ts"` +} + +type changefeedStatusView struct { + Checkpoint uint64 `json:"checkpoint-ts"` +} + +func (c checkCDCClient) fetchCheckpointTSFromStatus(ctx context.Context, cf changefeed) (uint64, error) { + statusResp, err := c.cli.KV.Get(ctx, cf.statusKey()) + if err != nil { + return 0, errors.Trace(err) + } + if statusResp.Count == 0 || len(statusResp.Kvs[0].Value) == 0 { + // The changefeed might was created recently, or just a phantom in test cases... + return 0, nil } - if len(nameSet) == 0 { - // check etcd KV of CDC <= v6.1 - resp, err = cli.Get(ctx, CDCPrefixV61, clientv3.WithPrefix()) + var status changefeedStatusView + if err := json.Unmarshal(statusResp.Kvs[0].Value, &status); err != nil { + return 0, errors.Trace(err) + } + return status.Checkpoint, nil +} + +func (c checkCDCClient) checkpointTSFor(ctx context.Context, cf changefeed) (uint64, error) { + infoResp, err := c.cli.KV.Get(ctx, cf.infoKey()) + if err != nil { + return 0, errors.Trace(err) + } + if infoResp.Count == 0 { + // The changefeed have been removed. + return invalidTs, nil + } + var info changefeedInfoView + if err := json.Unmarshal(infoResp.Kvs[0].Value, &info); err != nil { + return 0, errors.Trace(err) + } + switch info.State { + // https://docs.pingcap.com/zh/tidb/stable/ticdc-changefeed-overview + case "failed", "finished": + return invalidTs, nil + case "running", "warning", "normal", "stopped", "error": + cts, err := c.fetchCheckpointTSFromStatus(ctx, cf) if err != nil { - return nil, errors.Trace(err) + return 0, err } - for _, kv := range resp.Kvs { - // example: /tidb/cdc/changefeed/info/ - k := kv.Key[len(CDCPrefixV61):] - if len(k) == 0 { - continue - } - if !isActiveCDCChangefeed(kv.Value) { - continue - } - - nameSet[""] = append(nameSet[""], string(k)) + return mathutil.Max(cts, info.Start), nil + default: + // This changefeed may be noise, ignore it. + log.Warn("Ignoring invalid changefeed.", zap.Any("changefeed", cf), zap.String("state", info.State)) + return invalidTs, nil + } +} + +func (c checkCDCClient) getIncompatible(ctx context.Context, safeTS uint64) (*CDCNameSet, error) { + changefeeds := make([]changefeed, 0) + if err := c.loadChangefeeds(ctx, &changefeeds); err != nil { + return nil, err + } + + nameset := new(CDCNameSet) + for _, cf := range changefeeds { + cts, err := c.checkpointTSFor(ctx, cf) + if err != nil { + return nil, errors.Annotatef(err, "failed to check changefeed %v", cf) + } + if cts < safeTS { + log.Info("Found incompatible changefeed.", zap.Any("changefeed", cf), zap.Uint64("checkpoint-ts", cts), zap.Uint64("safe-ts", safeTS)) + nameset.save(cf) } } - return &CDCNameSet{nameSet}, nil + return nameset, nil } -type onlyState struct { - State string `json:"state"` +// CDCNameSet saves CDC changefeed's information. +// nameSet maps `cluster/namespace` to `changefeed`s +type CDCNameSet struct { + changefeeds map[string][]string } -func isActiveCDCChangefeed(jsonBytes []byte) bool { - s := onlyState{} - err := json.Unmarshal(jsonBytes, &s) - if err != nil { - // maybe a compatible issue, skip this key - log.L().Error("unmarshal etcd value failed when check CDC changefeed, will skip this key", - zap.ByteString("value", jsonBytes), - zap.Error(err)) - return false +func (s *CDCNameSet) save(cf changefeed) { + if s.changefeeds == nil { + s.changefeeds = map[string][]string{} } - switch s.State { - // https://docs.pingcap.com/zh/tidb/stable/ticdc-changefeed-overview - case "normal", "stopped", "error", "warning": - return true - default: - return false + + switch cf.KeyVersion { + case legacy: + s.changefeeds[""] = append(s.changefeeds[""], cf.ID) + case namespaced: + k := path.Join(cf.Cluster, cf.Namespace) + s.changefeeds[k] = append(s.changefeeds[k], cf.ID) } } + +// Empty that the nameSet is empty means no changefeed exists. +func (s *CDCNameSet) Empty() bool { + return len(s.changefeeds) == 0 +} + +// MessageToUser convert the map `nameSet` to a readable message to user. +func (s *CDCNameSet) MessageToUser() string { + var changefeedMsgBuf strings.Builder + changefeedMsgBuf.WriteString("found CDC changefeed(s): ") + for clusterID, changefeedID := range s.changefeeds { + changefeedMsgBuf.WriteString("cluster/namespace: ") + changefeedMsgBuf.WriteString(clusterID) + changefeedMsgBuf.WriteString(" changefeed(s): ") + changefeedMsgBuf.WriteString(fmt.Sprintf("%v", changefeedID)) + changefeedMsgBuf.WriteString(", ") + } + return changefeedMsgBuf.String() +} + +// GetRunningChangefeeds gets CDC changefeed information and wraps them to a map +// for CDC >= v6.2, the etcd key format is /tidb/cdc///changefeed/info/ +// for CDC <= v6.1, the etcd key format is /tidb/cdc/changefeed/info/ +func GetRunningChangefeeds(ctx context.Context, cli *clientv3.Client) (*CDCNameSet, error) { + checkCli := checkCDCClient{cli: cli} + return checkCli.getIncompatible(ctx, invalidTs) +} + +// GetIncompatibleChangefeedsWithSafeTS gets CDC changefeed that may not compatible with the safe ts and wraps them to a map. +func GetIncompatibleChangefeedsWithSafeTS(ctx context.Context, cli *clientv3.Client, safeTS uint64) (*CDCNameSet, error) { + checkCli := checkCDCClient{cli: cli} + return checkCli.getIncompatible(ctx, safeTS) +} diff --git a/pkg/util/cdcutil/cdc_test.go b/pkg/util/cdcutil/cdc_test.go index 8e1acacdff15d..412acbb81a0b7 100644 --- a/pkg/util/cdcutil/cdc_test.go +++ b/pkg/util/cdcutil/cdc_test.go @@ -16,6 +16,8 @@ package cdcutil_test import ( "context" + "path" + "strconv" "testing" "github.com/pingcap/tidb/pkg/util/cdcutil" @@ -24,13 +26,72 @@ import ( "go.etcd.io/etcd/tests/v3/integration" ) -func TestGetCDCChangefeedNameSet(t *testing.T) { +func TestCDCCheckWithEmbedEtcd(t *testing.T) { integration.BeforeTestExternal(t) testEtcdCluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer testEtcdCluster.Terminate(t) + cli := testEtcdCluster.RandClient() + + t.Run("testGetCDCChangefeedNameSet", func(t *testing.T) { testGetCDCChangefeedNameSet(t, cli) }) + cli.KV.Delete(context.Background(), "", clientv3.WithPrefix()) + t.Run("testGEtConflictChangefeeds", func(t *testing.T) { testGetConflictChangefeeds(t, cli) }) +} +func testGetConflictChangefeeds(t *testing.T, cli *clientv3.Client) { + checkEtcdPut := func(key string, vals ...string) { + val := "" + if len(vals) == 1 { + val = vals[0] + } + _, err := cli.Put(context.Background(), key, val) + require.NoError(t, err) + } + putLameChangefeed := func(name string, status string, startTs uint64) { + checkEtcdPut( + path.Join("/tidb/cdc/default/default/changefeed/info/", name), + `{"state":"`+status+`", "start-ts": `+strconv.Itoa(int(startTs))+`}`, + ) + } + putChangefeed := func(name string, status string, startTs uint64, checkpointTs uint64) { + putLameChangefeed(name, status, startTs) + checkEtcdPut(path.Join("/tidb/cdc/default/default/changefeed/status/", name), `{"checkpoint-ts": `+strconv.Itoa(int(checkpointTs))+`}`) + } + + checkEtcdPut("/tidb/cdc/default/__cdc_meta__/capture/3ecd5c98-0148-4086-adfd-17641995e71f") + checkEtcdPut("/tidb/cdc/default/__cdc_meta__/meta/meta-version") + checkEtcdPut("/tidb/cdc/default/__cdc_meta__/meta/ticdc-delete-etcd-key-count") + checkEtcdPut("/tidb/cdc/default/__cdc_meta__/owner/22318498f4dd6639") + checkEtcdPut("/tidb/cdc/default/default/upstream/7168358383033671922") + + putChangefeed("st-ok", "normal", 1, 43) + putChangefeed("st-fail", "normal", 1, 41) + putLameChangefeed("skipped", "failed", 1) + putLameChangefeed("nost-ok", "normal", 43) + putLameChangefeed("nost-fail", "normal", 41) + + names, err := cdcutil.GetIncompatibleChangefeedsWithSafeTS(context.Background(), cli, 42) + require.NoError(t, err) + require.ElementsMatch(t, names.TESTGetChangefeedNames(), []string{ + "default/default/nost-fail", + "default/default/st-fail", + }) + + names2, err := cdcutil.GetIncompatibleChangefeedsWithSafeTS(context.Background(), cli, 40) + require.NoError(t, err) + require.ElementsMatch(t, names2.TESTGetChangefeedNames(), []string{}) + + names3, err := cdcutil.GetIncompatibleChangefeedsWithSafeTS(context.Background(), cli, 48) + require.NoError(t, err) + require.ElementsMatch(t, names3.TESTGetChangefeedNames(), []string{ + "default/default/nost-fail", + "default/default/st-fail", + "default/default/nost-ok", + "default/default/st-ok", + }) +} + +func testGetCDCChangefeedNameSet(t *testing.T, cli *clientv3.Client) { ctx := context.Background() - cli := testEtcdCluster.RandClient() checkEtcdPut := func(key string, vals ...string) { val := "" if len(vals) == 1 { @@ -40,7 +101,7 @@ func TestGetCDCChangefeedNameSet(t *testing.T) { require.NoError(t, err) } - nameSet, err := cdcutil.GetCDCChangefeedNameSet(ctx, cli) + nameSet, err := cdcutil.GetRunningChangefeeds(ctx, cli) require.NoError(t, err) require.True(t, nameSet.Empty()) @@ -51,22 +112,20 @@ func TestGetCDCChangefeedNameSet(t *testing.T) { checkEtcdPut("/tidb/cdc/default/__cdc_meta__/owner/22318498f4dd6639") checkEtcdPut( "/tidb/cdc/default/default/changefeed/info/test", - `{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"normal","error":null,"creator-version":"v6.5.0-master-dirty"}`, + `{"state":"normal"}`, ) checkEtcdPut( "/tidb/cdc/default/default/changefeed/info/test-1", - `{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"failed","error":null,"creator-version":"v6.5.0-master-dirty"}`, + `{"state":"failed"}`, ) - checkEtcdPut("/tidb/cdc/default/default/changefeed/status/test") checkEtcdPut("/tidb/cdc/default/default/changefeed/status/test-1") checkEtcdPut("/tidb/cdc/default/default/task/position/3ecd5c98-0148-4086-adfd-17641995e71f/test-1") checkEtcdPut("/tidb/cdc/default/default/upstream/7168358383033671922") - nameSet, err = cdcutil.GetCDCChangefeedNameSet(ctx, cli) + nameSet, err = cdcutil.GetRunningChangefeeds(ctx, cli) require.NoError(t, err) require.False(t, nameSet.Empty()) - require.Equal(t, "found CDC changefeed(s): cluster/namespace: default/default changefeed(s): [test], ", - nameSet.MessageToUser()) + require.ElementsMatch(t, nameSet.TESTGetChangefeedNames(), []string{"default/default/test"}) _, err = cli.Delete(ctx, "/tidb/cdc/", clientv3.WithPrefix()) require.NoError(t, err) @@ -75,17 +134,16 @@ func TestGetCDCChangefeedNameSet(t *testing.T) { checkEtcdPut("/tidb/cdc/capture/f14cb04d-5ba1-410e-a59b-ccd796920e9d") checkEtcdPut( "/tidb/cdc/changefeed/info/test", - `{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"stopped","error":null,"creator-version":"v6.5.0-master-dirty"}`, + `{"state":"stopped"}`, ) checkEtcdPut("/tidb/cdc/job/test") checkEtcdPut("/tidb/cdc/owner/223184ad80a88b0b") checkEtcdPut("/tidb/cdc/task/position/f14cb04d-5ba1-410e-a59b-ccd796920e9d/test") - nameSet, err = cdcutil.GetCDCChangefeedNameSet(ctx, cli) + nameSet, err = cdcutil.GetRunningChangefeeds(ctx, cli) require.NoError(t, err) require.False(t, nameSet.Empty()) - require.Equal(t, "found CDC changefeed(s): cluster/namespace: changefeed(s): [test], ", - nameSet.MessageToUser()) + require.ElementsMatch(t, nameSet.TESTGetChangefeedNames(), []string{"/test"}) _, err = cli.Delete(ctx, "/tidb/cdc/", clientv3.WithPrefix()) require.NoError(t, err) @@ -100,7 +158,7 @@ func TestGetCDCChangefeedNameSet(t *testing.T) { "/tidb/cdc/5402613591834624000/changefeed/info/test", `{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"normal","error":null,"creator-version":"v6.5.0-master-dirty"}`, ) - nameSet, err = cdcutil.GetCDCChangefeedNameSet(ctx, cli) + nameSet, err = cdcutil.GetRunningChangefeeds(ctx, cli) require.NoError(t, err) require.True(t, nameSet.Empty()) } diff --git a/pkg/util/cdcutil/export_for_test.go b/pkg/util/cdcutil/export_for_test.go new file mode 100644 index 0000000000000..5e13d85094cea --- /dev/null +++ b/pkg/util/cdcutil/export_for_test.go @@ -0,0 +1,27 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cdcutil + +import "path" + +func (c *CDCNameSet) TESTGetChangefeedNames() []string { + names := make([]string, 0, len(c.changefeeds)) + for ns, cfs := range c.changefeeds { + for _, cf := range cfs { + names = append(names, path.Join(ns, cf)) + } + } + return names +}