From 10adcaa126cda560412223400f9f82c0a6bf06f6 Mon Sep 17 00:00:00 2001 From: asddongmen <414110582@qq.com> Date: Wed, 1 Sep 2021 18:00:33 +0800 Subject: [PATCH 1/5] check: refine some funcs --- cdc/model/owner.go | 2 +- cdc/server.go | 8 ++------ cmd/client.go | 9 ++------- pkg/cmd/factory/factory_impl.go | 2 +- pkg/version/check.go | 30 +++++++++++++++++++++++------- pkg/version/check_test.go | 19 ++++++++++--------- 6 files changed, 39 insertions(+), 31 deletions(-) diff --git a/cdc/model/owner.go b/cdc/model/owner.go index 0a988d33ae1..bfac5636cbb 100644 --- a/cdc/model/owner.go +++ b/cdc/model/owner.go @@ -83,7 +83,7 @@ type TaskPosition struct { ResolvedTs uint64 `json:"resolved-ts"` // The count of events were synchronized. This is updated by corresponding processor. Count uint64 `json:"count"` - // Error code when error happens + // Error when error happens Error *RunningError `json:"error"` } diff --git a/cdc/server.go b/cdc/server.go index 930ccd88440..23dc8c58f67 100644 --- a/cdc/server.go +++ b/cdc/server.go @@ -140,12 +140,8 @@ func (s *Server) Run(ctx context.Context) error { // To not block CDC server startup, we need to warn instead of error // when TiKV is incompatible. errorTiKVIncompatible := false - for _, pdEndpoint := range s.pdEndpoints { - err = version.CheckClusterVersion(ctx, s.pdClient, pdEndpoint, conf.Security, errorTiKVIncompatible) - if err == nil { - break - } - } + checkAllPD := false + err = version.CheckClusterVersion(ctx, s.pdClient, s.pdEndpoints, conf.Security, errorTiKVIncompatible, checkAllPD) if err != nil { return err } diff --git a/cmd/client.go b/cmd/client.go index e16b162e957..00bb8acf878 100644 --- a/cmd/client.go +++ b/cmd/client.go @@ -205,13 +205,8 @@ func newCliCommand() *cobra.Command { return err } errorTiKVIncompatible := true // Error if TiKV is incompatible. - for _, pdEndpoint := range pdEndpoints { - err = version.CheckClusterVersion( - ctx, pdCli, pdEndpoint, credential, errorTiKVIncompatible) - if err == nil { - break - } - } + checkAllPD := false + err = version.CheckClusterVersion(ctx, pdCli, pdEndpoints, credential, errorTiKVIncompatible, checkAllPD) if err != nil { return err } diff --git a/pkg/cmd/factory/factory_impl.go b/pkg/cmd/factory/factory_impl.go index b9f4014773e..1abb4569782 100644 --- a/pkg/cmd/factory/factory_impl.go +++ b/pkg/cmd/factory/factory_impl.go @@ -161,7 +161,7 @@ func (f factoryImpl) PdClient() (pd.Client, error) { // TODO: we need to check all pd endpoint and make sure they belong to the same cluster. // See also: https://github.com/pingcap/ticdc/pull/2341#discussion_r673021305. - err = version.CheckClusterVersion(ctx, pdClient, pdEndpoints[0], credential, true) + err = version.CheckClusterVersion(ctx, pdClient, pdEndpoints[0:1], credential, true, false) if err != nil { return nil, err } diff --git a/pkg/version/check.go b/pkg/version/check.go index 5c89914af08..1f10da89dae 100644 --- a/pkg/version/check.go +++ b/pkg/version/check.go @@ -22,11 +22,10 @@ import ( "regexp" "strings" - "github.com/pingcap/ticdc/cdc/model" - "github.com/coreos/go-semver/semver" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" + "github.com/pingcap/ticdc/cdc/model" cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/httputil" "github.com/pingcap/ticdc/pkg/security" @@ -75,8 +74,9 @@ func removeVAndHash(v string) string { } // CheckClusterVersion check TiKV and PD version. +// If checkAllPD is false, need only one PD alive and match the cdc version. func CheckClusterVersion( - ctx context.Context, client pd.Client, pdHTTP string, credential *security.Credential, errorTiKVIncompat bool, + ctx context.Context, client pd.Client, pdAddrs []string, credential *security.Credential, errorTiKVIncompat bool, checkAllPD bool, ) error { err := CheckStoreVersion(ctx, client, 0 /* check all TiKV */) if err != nil { @@ -86,17 +86,33 @@ func CheckClusterVersion( log.Warn("check TiKV version failed", zap.Error(err)) } - httpCli, err := httputil.NewClient(credential) - if err != nil { - return err + for _, pdAddr := range pdAddrs { + err = CheckPDVersion(ctx, pdAddr, credential) + if err == nil && !checkAllPD { + return nil + } + if err != nil && checkAllPD { + return err + } } + + return nil +} + +// CheckPDVersion check PD version. +func CheckPDVersion(ctx context.Context, pdAddr string, credential *security.Credential) error { // See more: https://github.com/pingcap/pd/blob/v4.0.0-rc.1/server/api/version.go pdVer := struct { Version string `json:"version"` }{} + httpCli, err := httputil.NewClient(credential) + if err != nil { + return err + } + req, err := http.NewRequestWithContext( - ctx, http.MethodGet, fmt.Sprintf("%s/pd/api/v1/version", pdHTTP), nil) + ctx, http.MethodGet, fmt.Sprintf("%s/pd/api/v1/version", pdAddr), nil) if err != nil { return cerror.WrapError(cerror.ErrCheckClusterVersionFromPD, err) } diff --git a/pkg/version/check_test.go b/pkg/version/check_test.go index 0515d1e6567..24afbf01e28 100644 --- a/pkg/version/check_test.go +++ b/pkg/version/check_test.go @@ -69,7 +69,8 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) { Client: nil, } pdURL, _ := url.Parse(tempurl.Alloc()) - pdHTTP := fmt.Sprintf("http://%s", pdURL.Host) + pdAddr := fmt.Sprintf("http://%s", pdURL.Host) + pdAddrs := []string{pdAddr} srv := http.Server{Addr: pdURL.Host, Handler: &mock} go func() { //nolint:errcheck @@ -78,7 +79,7 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) { defer srv.Close() for i := 0; i < 20; i++ { time.Sleep(100 * time.Millisecond) - _, err := http.Get(pdHTTP) + _, err := http.Get(pdAddr) if err == nil { break } @@ -95,7 +96,7 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) { mock.getAllStores = func() []*metapb.Store { return []*metapb.Store{{Version: MinTiKVVersion.String()}} } - err := CheckClusterVersion(context.Background(), &mock, pdHTTP, nil, true) + err := CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, true, true) c.Assert(err, check.IsNil) } @@ -106,7 +107,7 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) { mock.getAllStores = func() []*metapb.Store { return []*metapb.Store{{Version: MinTiKVVersion.String()}} } - err := CheckClusterVersion(context.Background(), &mock, pdHTTP, nil, true) + err := CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, true, true) c.Assert(err, check.ErrorMatches, ".*PD .* is not supported.*") } @@ -118,7 +119,7 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) { mock.getAllStores = func() []*metapb.Store { return []*metapb.Store{{Version: MinTiKVVersion.String()}} } - err := CheckClusterVersion(context.Background(), &mock, pdHTTP, nil, true) + err := CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, true, true) c.Assert(err, check.ErrorMatches, ".*PD .* is not supported.*") } @@ -130,9 +131,9 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) { // TiKV does not include 'v'. return []*metapb.Store{{Version: `1.0.0-alpha-271-g824ae7fd`}} } - err := CheckClusterVersion(context.Background(), &mock, pdHTTP, nil, true) + err := CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, true, true) c.Assert(err, check.ErrorMatches, ".*TiKV .* is not supported.*") - err = CheckClusterVersion(context.Background(), &mock, pdHTTP, nil, false) + err = CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, false, true) c.Assert(err, check.IsNil) } @@ -145,7 +146,7 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) { // TiKV does not include 'v'. return []*metapb.Store{{Version: `10000.0.0`}} } - err := CheckClusterVersion(context.Background(), &mock, pdHTTP, nil, true) + err := CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, true, true) c.Assert(err, check.ErrorMatches, ".*TiKV .* is not supported.*") } @@ -154,7 +155,7 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) { return http.StatusBadRequest } - err := CheckClusterVersion(context.Background(), &mock, pdHTTP, nil, false) + err := CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, false, true) c.Assert(err, check.ErrorMatches, ".*response status: .*") } } From 53cc8c5ee4629c3f588c4888ce47e3b997e84a14 Mon Sep 17 00:00:00 2001 From: asddongmen <414110582@qq.com> Date: Fri, 3 Sep 2021 10:00:46 +0800 Subject: [PATCH 2/5] factory_impl: update --- pkg/cmd/factory/factory_impl.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cmd/factory/factory_impl.go b/pkg/cmd/factory/factory_impl.go index 1abb4569782..f2d61f15222 100644 --- a/pkg/cmd/factory/factory_impl.go +++ b/pkg/cmd/factory/factory_impl.go @@ -161,7 +161,7 @@ func (f factoryImpl) PdClient() (pd.Client, error) { // TODO: we need to check all pd endpoint and make sure they belong to the same cluster. // See also: https://github.com/pingcap/ticdc/pull/2341#discussion_r673021305. - err = version.CheckClusterVersion(ctx, pdClient, pdEndpoints[0:1], credential, true, false) + err = version.CheckClusterVersion(ctx, pdClient, pdEndpoints, credential, true, false) if err != nil { return nil, err } From 3589fed8c1cf54100b5373c0cea7fcf03cb013eb Mon Sep 17 00:00:00 2001 From: asddongmen <414110582@qq.com> Date: Mon, 6 Sep 2021 13:49:09 +0800 Subject: [PATCH 3/5] check: update --- cdc/server.go | 3 +-- pkg/cmd/factory/factory_impl.go | 2 +- pkg/version/check.go | 14 +++++--------- pkg/version/check_test.go | 14 +++++++------- 4 files changed, 14 insertions(+), 19 deletions(-) diff --git a/cdc/server.go b/cdc/server.go index 5828690bbee..5217d0b8179 100644 --- a/cdc/server.go +++ b/cdc/server.go @@ -146,8 +146,7 @@ func (s *Server) Run(ctx context.Context) error { // To not block CDC server startup, we need to warn instead of error // when TiKV is incompatible. errorTiKVIncompatible := false - checkAllPD := false - err = version.CheckClusterVersion(ctx, s.pdClient, s.pdEndpoints, conf.Security, errorTiKVIncompatible, checkAllPD) + err = version.CheckClusterVersion(ctx, s.pdClient, s.pdEndpoints, conf.Security, errorTiKVIncompatible) if err != nil { return err } diff --git a/pkg/cmd/factory/factory_impl.go b/pkg/cmd/factory/factory_impl.go index f2d61f15222..e7e308d6fd4 100644 --- a/pkg/cmd/factory/factory_impl.go +++ b/pkg/cmd/factory/factory_impl.go @@ -161,7 +161,7 @@ func (f factoryImpl) PdClient() (pd.Client, error) { // TODO: we need to check all pd endpoint and make sure they belong to the same cluster. // See also: https://github.com/pingcap/ticdc/pull/2341#discussion_r673021305. - err = version.CheckClusterVersion(ctx, pdClient, pdEndpoints, credential, true, false) + err = version.CheckClusterVersion(ctx, pdClient, pdEndpoints, credential, true) if err != nil { return nil, err } diff --git a/pkg/version/check.go b/pkg/version/check.go index 1f10da89dae..d516a702aea 100644 --- a/pkg/version/check.go +++ b/pkg/version/check.go @@ -74,10 +74,9 @@ func removeVAndHash(v string) string { } // CheckClusterVersion check TiKV and PD version. -// If checkAllPD is false, need only one PD alive and match the cdc version. +// need only one PD alive and match the cdc version. func CheckClusterVersion( - ctx context.Context, client pd.Client, pdAddrs []string, credential *security.Credential, errorTiKVIncompat bool, checkAllPD bool, -) error { + ctx context.Context, client pd.Client, pdAddrs []string, credential *security.Credential, errorTiKVIncompat bool) error { err := CheckStoreVersion(ctx, client, 0 /* check all TiKV */) if err != nil { if errorTiKVIncompat { @@ -88,15 +87,12 @@ func CheckClusterVersion( for _, pdAddr := range pdAddrs { err = CheckPDVersion(ctx, pdAddr, credential) - if err == nil && !checkAllPD { - return nil - } - if err != nil && checkAllPD { - return err + if err == nil { + break } } - return nil + return err } // CheckPDVersion check PD version. diff --git a/pkg/version/check_test.go b/pkg/version/check_test.go index 24afbf01e28..3781e5a1a5f 100644 --- a/pkg/version/check_test.go +++ b/pkg/version/check_test.go @@ -96,7 +96,7 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) { mock.getAllStores = func() []*metapb.Store { return []*metapb.Store{{Version: MinTiKVVersion.String()}} } - err := CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, true, true) + err := CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, true) c.Assert(err, check.IsNil) } @@ -107,7 +107,7 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) { mock.getAllStores = func() []*metapb.Store { return []*metapb.Store{{Version: MinTiKVVersion.String()}} } - err := CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, true, true) + err := CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, true) c.Assert(err, check.ErrorMatches, ".*PD .* is not supported.*") } @@ -119,7 +119,7 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) { mock.getAllStores = func() []*metapb.Store { return []*metapb.Store{{Version: MinTiKVVersion.String()}} } - err := CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, true, true) + err := CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, true) c.Assert(err, check.ErrorMatches, ".*PD .* is not supported.*") } @@ -131,9 +131,9 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) { // TiKV does not include 'v'. return []*metapb.Store{{Version: `1.0.0-alpha-271-g824ae7fd`}} } - err := CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, true, true) + err := CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, true) c.Assert(err, check.ErrorMatches, ".*TiKV .* is not supported.*") - err = CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, false, true) + err = CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, false) c.Assert(err, check.IsNil) } @@ -146,7 +146,7 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) { // TiKV does not include 'v'. return []*metapb.Store{{Version: `10000.0.0`}} } - err := CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, true, true) + err := CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, true) c.Assert(err, check.ErrorMatches, ".*TiKV .* is not supported.*") } @@ -155,7 +155,7 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) { return http.StatusBadRequest } - err := CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, false, true) + err := CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, false) c.Assert(err, check.ErrorMatches, ".*response status: .*") } } From 9524cc2ed56ad5311983c2ca208ea19878d1c5b2 Mon Sep 17 00:00:00 2001 From: asddongmen <20351731+asddongmen@users.noreply.github.com> Date: Mon, 13 Sep 2021 10:15:50 +0800 Subject: [PATCH 4/5] check: improvement a var name --- pkg/version/check.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/version/check.go b/pkg/version/check.go index d516a702aea..a16c506ae38 100644 --- a/pkg/version/check.go +++ b/pkg/version/check.go @@ -102,7 +102,7 @@ func CheckPDVersion(ctx context.Context, pdAddr string, credential *security.Cre Version string `json:"version"` }{} - httpCli, err := httputil.NewClient(credential) + httpClient, err := httputil.NewClient(credential) if err != nil { return err } @@ -113,7 +113,7 @@ func CheckPDVersion(ctx context.Context, pdAddr string, credential *security.Cre return cerror.WrapError(cerror.ErrCheckClusterVersionFromPD, err) } - resp, err := httpCli.Do(req) + resp, err := httpClient.Do(req) if err != nil { return cerror.WrapError(cerror.ErrCheckClusterVersionFromPD, err) } From c48b5e9577349bbe7bb3249cc2e71346c3dd180b Mon Sep 17 00:00:00 2001 From: asddongmen <20351731+asddongmen@users.noreply.github.com> Date: Sat, 18 Sep 2021 15:53:57 +0800 Subject: [PATCH 5/5] factory_impl: update --- pkg/cmd/factory/factory_impl.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/cmd/factory/factory_impl.go b/pkg/cmd/factory/factory_impl.go index e7e308d6fd4..073823e1ed7 100644 --- a/pkg/cmd/factory/factory_impl.go +++ b/pkg/cmd/factory/factory_impl.go @@ -159,8 +159,6 @@ func (f factoryImpl) PdClient() (pd.Client, error) { return nil, errors.Annotatef(err, "fail to open PD client, pd=\"%s\"", pdAddr) } - // TODO: we need to check all pd endpoint and make sure they belong to the same cluster. - // See also: https://github.com/pingcap/ticdc/pull/2341#discussion_r673021305. err = version.CheckClusterVersion(ctx, pdClient, pdEndpoints, credential, true) if err != nil { return nil, err