diff --git a/cdc/model/owner.go b/cdc/model/owner.go index 942079b9454..ace3aacdd70 100644 --- a/cdc/model/owner.go +++ b/cdc/model/owner.go @@ -83,7 +83,7 @@ type TaskPosition struct { ResolvedTs uint64 `json:"resolved-ts"` // The count of events were synchronized. This is updated by corresponding processor. Count uint64 `json:"count"` - // Error code when error happens + // Error when error happens Error *RunningError `json:"error"` } diff --git a/cdc/server.go b/cdc/server.go index ac387db3a53..d261a537776 100644 --- a/cdc/server.go +++ b/cdc/server.go @@ -146,12 +146,7 @@ func (s *Server) Run(ctx context.Context) error { // To not block CDC server startup, we need to warn instead of error // when TiKV is incompatible. errorTiKVIncompatible := false - for _, pdEndpoint := range s.pdEndpoints { - err = version.CheckClusterVersion(ctx, s.pdClient, pdEndpoint, conf.Security, errorTiKVIncompatible) - if err == nil { - break - } - } + err = version.CheckClusterVersion(ctx, s.pdClient, s.pdEndpoints, conf.Security, errorTiKVIncompatible) if err != nil { return err } diff --git a/pkg/cmd/factory/factory_impl.go b/pkg/cmd/factory/factory_impl.go index b9f4014773e..073823e1ed7 100644 --- a/pkg/cmd/factory/factory_impl.go +++ b/pkg/cmd/factory/factory_impl.go @@ -159,9 +159,7 @@ func (f factoryImpl) PdClient() (pd.Client, error) { return nil, errors.Annotatef(err, "fail to open PD client, pd=\"%s\"", pdAddr) } - // TODO: we need to check all pd endpoint and make sure they belong to the same cluster. - // See also: https://github.com/pingcap/ticdc/pull/2341#discussion_r673021305. - err = version.CheckClusterVersion(ctx, pdClient, pdEndpoints[0], credential, true) + err = version.CheckClusterVersion(ctx, pdClient, pdEndpoints, credential, true) if err != nil { return nil, err } diff --git a/pkg/version/check.go b/pkg/version/check.go index 5c89914af08..a16c506ae38 100644 --- a/pkg/version/check.go +++ b/pkg/version/check.go @@ -22,11 +22,10 @@ import ( "regexp" "strings" - "github.com/pingcap/ticdc/cdc/model" - "github.com/coreos/go-semver/semver" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" + "github.com/pingcap/ticdc/cdc/model" cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/httputil" "github.com/pingcap/ticdc/pkg/security" @@ -75,9 +74,9 @@ func removeVAndHash(v string) string { } // CheckClusterVersion check TiKV and PD version. +// need only one PD alive and match the cdc version. func CheckClusterVersion( - ctx context.Context, client pd.Client, pdHTTP string, credential *security.Credential, errorTiKVIncompat bool, -) error { + ctx context.Context, client pd.Client, pdAddrs []string, credential *security.Credential, errorTiKVIncompat bool) error { err := CheckStoreVersion(ctx, client, 0 /* check all TiKV */) if err != nil { if errorTiKVIncompat { @@ -86,22 +85,35 @@ func CheckClusterVersion( log.Warn("check TiKV version failed", zap.Error(err)) } - httpCli, err := httputil.NewClient(credential) - if err != nil { - return err + for _, pdAddr := range pdAddrs { + err = CheckPDVersion(ctx, pdAddr, credential) + if err == nil { + break + } } + + return err +} + +// CheckPDVersion check PD version. +func CheckPDVersion(ctx context.Context, pdAddr string, credential *security.Credential) error { // See more: https://github.com/pingcap/pd/blob/v4.0.0-rc.1/server/api/version.go pdVer := struct { Version string `json:"version"` }{} + httpClient, err := httputil.NewClient(credential) + if err != nil { + return err + } + req, err := http.NewRequestWithContext( - ctx, http.MethodGet, fmt.Sprintf("%s/pd/api/v1/version", pdHTTP), nil) + ctx, http.MethodGet, fmt.Sprintf("%s/pd/api/v1/version", pdAddr), nil) if err != nil { return cerror.WrapError(cerror.ErrCheckClusterVersionFromPD, err) } - resp, err := httpCli.Do(req) + resp, err := httpClient.Do(req) if err != nil { return cerror.WrapError(cerror.ErrCheckClusterVersionFromPD, err) } diff --git a/pkg/version/check_test.go b/pkg/version/check_test.go index 0515d1e6567..3781e5a1a5f 100644 --- a/pkg/version/check_test.go +++ b/pkg/version/check_test.go @@ -69,7 +69,8 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) { Client: nil, } pdURL, _ := url.Parse(tempurl.Alloc()) - pdHTTP := fmt.Sprintf("http://%s", pdURL.Host) + pdAddr := fmt.Sprintf("http://%s", pdURL.Host) + pdAddrs := []string{pdAddr} srv := http.Server{Addr: pdURL.Host, Handler: &mock} go func() { //nolint:errcheck @@ -78,7 +79,7 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) { defer srv.Close() for i := 0; i < 20; i++ { time.Sleep(100 * time.Millisecond) - _, err := http.Get(pdHTTP) + _, err := http.Get(pdAddr) if err == nil { break } @@ -95,7 +96,7 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) { mock.getAllStores = func() []*metapb.Store { return []*metapb.Store{{Version: MinTiKVVersion.String()}} } - err := CheckClusterVersion(context.Background(), &mock, pdHTTP, nil, true) + err := CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, true) c.Assert(err, check.IsNil) } @@ -106,7 +107,7 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) { mock.getAllStores = func() []*metapb.Store { return []*metapb.Store{{Version: MinTiKVVersion.String()}} } - err := CheckClusterVersion(context.Background(), &mock, pdHTTP, nil, true) + err := CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, true) c.Assert(err, check.ErrorMatches, ".*PD .* is not supported.*") } @@ -118,7 +119,7 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) { mock.getAllStores = func() []*metapb.Store { return []*metapb.Store{{Version: MinTiKVVersion.String()}} } - err := CheckClusterVersion(context.Background(), &mock, pdHTTP, nil, true) + err := CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, true) c.Assert(err, check.ErrorMatches, ".*PD .* is not supported.*") } @@ -130,9 +131,9 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) { // TiKV does not include 'v'. return []*metapb.Store{{Version: `1.0.0-alpha-271-g824ae7fd`}} } - err := CheckClusterVersion(context.Background(), &mock, pdHTTP, nil, true) + err := CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, true) c.Assert(err, check.ErrorMatches, ".*TiKV .* is not supported.*") - err = CheckClusterVersion(context.Background(), &mock, pdHTTP, nil, false) + err = CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, false) c.Assert(err, check.IsNil) } @@ -145,7 +146,7 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) { // TiKV does not include 'v'. return []*metapb.Store{{Version: `10000.0.0`}} } - err := CheckClusterVersion(context.Background(), &mock, pdHTTP, nil, true) + err := CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, true) c.Assert(err, check.ErrorMatches, ".*TiKV .* is not supported.*") } @@ -154,7 +155,7 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) { return http.StatusBadRequest } - err := CheckClusterVersion(context.Background(), &mock, pdHTTP, nil, false) + err := CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, false) c.Assert(err, check.ErrorMatches, ".*response status: .*") } }