Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

check: refine CheckClusterVersion #2713

Merged
merged 13 commits into from
Sep 23, 2021
Merged
2 changes: 1 addition & 1 deletion cdc/model/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type TaskPosition struct {
ResolvedTs uint64 `json:"resolved-ts"`
// The count of events were synchronized. This is updated by corresponding processor.
Count uint64 `json:"count"`
// Error code when error happens
// Error when error happens
Error *RunningError `json:"error"`
}

Expand Down
7 changes: 1 addition & 6 deletions cdc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,7 @@ func (s *Server) Run(ctx context.Context) error {
// To not block CDC server startup, we need to warn instead of error
// when TiKV is incompatible.
errorTiKVIncompatible := false
for _, pdEndpoint := range s.pdEndpoints {
err = version.CheckClusterVersion(ctx, s.pdClient, pdEndpoint, conf.Security, errorTiKVIncompatible)
if err == nil {
break
}
}
err = version.CheckClusterVersion(ctx, s.pdClient, s.pdEndpoints, conf.Security, errorTiKVIncompatible)
if err != nil {
return err
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/cmd/factory/factory_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,7 @@ func (f factoryImpl) PdClient() (pd.Client, error) {
return nil, errors.Annotatef(err, "fail to open PD client, pd=\"%s\"", pdAddr)
}

// TODO: we need to check all pd endpoint and make sure they belong to the same cluster.
// See also: https://github.com/pingcap/ticdc/pull/2341#discussion_r673021305.
err = version.CheckClusterVersion(ctx, pdClient, pdEndpoints[0], credential, true)
err = version.CheckClusterVersion(ctx, pdClient, pdEndpoints, credential, true)
if err != nil {
return nil, err
}
Expand Down
30 changes: 21 additions & 9 deletions pkg/version/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ import (
"regexp"
"strings"

"github.com/pingcap/ticdc/cdc/model"

"github.com/coreos/go-semver/semver"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/httputil"
"github.com/pingcap/ticdc/pkg/security"
Expand Down Expand Up @@ -75,9 +74,9 @@ func removeVAndHash(v string) string {
}

// CheckClusterVersion check TiKV and PD version.
// need only one PD alive and match the cdc version.
func CheckClusterVersion(
ctx context.Context, client pd.Client, pdHTTP string, credential *security.Credential, errorTiKVIncompat bool,
) error {
ctx context.Context, client pd.Client, pdAddrs []string, credential *security.Credential, errorTiKVIncompat bool) error {
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
err := CheckStoreVersion(ctx, client, 0 /* check all TiKV */)
if err != nil {
if errorTiKVIncompat {
Expand All @@ -86,22 +85,35 @@ func CheckClusterVersion(
log.Warn("check TiKV version failed", zap.Error(err))
}

httpCli, err := httputil.NewClient(credential)
if err != nil {
return err
for _, pdAddr := range pdAddrs {
err = CheckPDVersion(ctx, pdAddr, credential)
if err == nil {
break
}
}

return err
}

// CheckPDVersion check PD version.
func CheckPDVersion(ctx context.Context, pdAddr string, credential *security.Credential) error {
// See more: https://github.com/pingcap/pd/blob/v4.0.0-rc.1/server/api/version.go
pdVer := struct {
Version string `json:"version"`
}{}

httpClient, err := httputil.NewClient(credential)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to name it cli, we know that it's an HTTP client by the right-hand side.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cli is a very confusing acronym (first thought is command line interface), and it is not a recognized acronym for the client, it does not have a consensus acronym. So I prefer to change it to client to reduce confusion.

if err != nil {
return err
}

req, err := http.NewRequestWithContext(
ctx, http.MethodGet, fmt.Sprintf("%s/pd/api/v1/version", pdHTTP), nil)
ctx, http.MethodGet, fmt.Sprintf("%s/pd/api/v1/version", pdAddr), nil)
if err != nil {
return cerror.WrapError(cerror.ErrCheckClusterVersionFromPD, err)
}

resp, err := httpCli.Do(req)
resp, err := httpClient.Do(req)
if err != nil {
return cerror.WrapError(cerror.ErrCheckClusterVersionFromPD, err)
}
Expand Down
19 changes: 10 additions & 9 deletions pkg/version/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) {
Client: nil,
}
pdURL, _ := url.Parse(tempurl.Alloc())
pdHTTP := fmt.Sprintf("http://%s", pdURL.Host)
pdAddr := fmt.Sprintf("http://%s", pdURL.Host)
pdAddrs := []string{pdAddr}
srv := http.Server{Addr: pdURL.Host, Handler: &mock}
go func() {
//nolint:errcheck
Expand All @@ -78,7 +79,7 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) {
defer srv.Close()
for i := 0; i < 20; i++ {
time.Sleep(100 * time.Millisecond)
_, err := http.Get(pdHTTP)
_, err := http.Get(pdAddr)
if err == nil {
break
}
Expand All @@ -95,7 +96,7 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) {
mock.getAllStores = func() []*metapb.Store {
return []*metapb.Store{{Version: MinTiKVVersion.String()}}
}
err := CheckClusterVersion(context.Background(), &mock, pdHTTP, nil, true)
err := CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, true)
c.Assert(err, check.IsNil)
}

Expand All @@ -106,7 +107,7 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) {
mock.getAllStores = func() []*metapb.Store {
return []*metapb.Store{{Version: MinTiKVVersion.String()}}
}
err := CheckClusterVersion(context.Background(), &mock, pdHTTP, nil, true)
err := CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, true)
c.Assert(err, check.ErrorMatches, ".*PD .* is not supported.*")
}

Expand All @@ -118,7 +119,7 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) {
mock.getAllStores = func() []*metapb.Store {
return []*metapb.Store{{Version: MinTiKVVersion.String()}}
}
err := CheckClusterVersion(context.Background(), &mock, pdHTTP, nil, true)
err := CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, true)
c.Assert(err, check.ErrorMatches, ".*PD .* is not supported.*")
}

Expand All @@ -130,9 +131,9 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) {
// TiKV does not include 'v'.
return []*metapb.Store{{Version: `1.0.0-alpha-271-g824ae7fd`}}
}
err := CheckClusterVersion(context.Background(), &mock, pdHTTP, nil, true)
err := CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, true)
c.Assert(err, check.ErrorMatches, ".*TiKV .* is not supported.*")
err = CheckClusterVersion(context.Background(), &mock, pdHTTP, nil, false)
err = CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, false)
c.Assert(err, check.IsNil)
}

Expand All @@ -145,7 +146,7 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) {
// TiKV does not include 'v'.
return []*metapb.Store{{Version: `10000.0.0`}}
}
err := CheckClusterVersion(context.Background(), &mock, pdHTTP, nil, true)
err := CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, true)
c.Assert(err, check.ErrorMatches, ".*TiKV .* is not supported.*")
}

Expand All @@ -154,7 +155,7 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) {
return http.StatusBadRequest
}

err := CheckClusterVersion(context.Background(), &mock, pdHTTP, nil, false)
err := CheckClusterVersion(context.Background(), &mock, pdAddrs, nil, false)
c.Assert(err, check.ErrorMatches, ".*response status: .*")
}
}
Expand Down