Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server, version: check and wait if cluster is incompatible #2695

Closed
wants to merge 7 commits into from
37 changes: 24 additions & 13 deletions cdc/server.go
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/httputil"
"github.com/pingcap/ticdc/pkg/retry"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/ticdc/pkg/version"
tidbkv "github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -143,19 +144,6 @@ func (s *Server) Run(ctx context.Context) error {
return errors.Trace(err)
}

// To not block CDC server startup, we need to warn instead of error
// when TiKV is incompatible.
errorTiKVIncompatible := false
for _, pdEndpoint := range s.pdEndpoints {
err = version.CheckClusterVersion(ctx, s.pdClient, pdEndpoint, conf.Security, errorTiKVIncompatible)
if err == nil {
break
}
}
if err != nil {
return err
}

kv.InitWorkerPool()
kvStore, err := kv.CreateTiStore(strings.Join(s.pdEndpoints, ","), conf.Security)
if err != nil {
Expand All @@ -177,6 +165,10 @@ func (s *Server) Run(ctx context.Context) error {
return err
}

// Check cluster version and wait if it's incompatible.
// We start status server first to not block tiup cluster upgrading.
checkAndWaitClusterVersion(ctx, s.pdClient, s.pdEndpoints, conf.Security)

return s.run(ctx)
Comment on lines +168 to 172
Copy link
Contributor

@amyangfei amyangfei Sep 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently /status API is available after status server starts, however #2691 is considering to change this behavior, which means the /status API will be available after capture info is persisted to etcd.

}

Expand Down Expand Up @@ -356,3 +348,22 @@ func findBestDataDir(candidates []string) (result string, ok bool) {

return result, ok
}

func checkAndWaitClusterVersion(
ctx context.Context, pdClient pd.Client, pdEndpoints []string, security *config.SecurityConfig,
) {
const backoffOneSecond = 1000
retry.Do(ctx, func() error {
var err error
for _, pdEndpoint := range pdEndpoints {
err = version.CheckClusterVersion(ctx, pdClient, pdEndpoint, security)
if err == nil {
break
}
Comment on lines +360 to +362
Copy link
Contributor

@amyangfei amyangfei Sep 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two scenarios have logic conflicts here.

  • More than 3 PDs in upstream, one of them is down, TiCDC can start up, which is the HA feature, in this scenario TiCDC only checks one PD.
  • Upstream upgrade, should TiCDC wait all PDs upgrade to the required version, then TiCDC should check all PDs.

log.Warn("check version", zap.Error(err))
}
return err
}, retry.WithInfiniteTries(),
retry.WithBackoffBaseDelay(backoffOneSecond), retry.WithBackoffMaxDelay(backoffOneSecond))
return
}
77 changes: 77 additions & 0 deletions cdc/server_test.go
Expand Up @@ -15,16 +15,22 @@ package cdc

import (
"context"
"fmt"
"net/http"
"net/url"
"path/filepath"
"sync/atomic"
"time"

"github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/ticdc/cdc/kv"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/etcd"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/ticdc/pkg/util/testleak"
"github.com/pingcap/ticdc/pkg/version"
"github.com/tikv/pd/pkg/tempurl"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -119,3 +125,74 @@ func (s *serverSuite) TestInitDataDir(c *check.C) {

cancel()
}

type checkSuite struct{}

var _ = check.Suite(&checkSuite{})

func (s *checkSuite) TestCheckAndWaitClusterVersion(c *check.C) {
defer testleak.AfterTest(c)()
mock := version.MockPDClient{
Client: nil,
}
pdURL, _ := url.Parse(tempurl.Alloc())
pdHTTP := fmt.Sprintf("http://%s", pdURL.Host)
srv := http.Server{Addr: pdURL.Host, Handler: &mock}
go func() {
_ = srv.ListenAndServe()
}()
defer srv.Close()

{
mock.GetVersionFunc = func() string {
return version.MinPDVersion.String()
}
mock.GetAllStoresFunc = func() []*metapb.Store {
return []*metapb.Store{{Version: version.MinTiKVVersion.String()}}
}
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
checkAndWaitClusterVersion(ctx, &mock, []string{pdHTTP}, nil)
done <- struct{}{}
}()
select {
case <-time.After(5 * time.Second):
c.Fatal("check timeout")
case <-done:
}
cancel()
}

{
pdVersionSwitch := int64(0)
mock.GetVersionFunc = func() string {
if atomic.LoadInt64(&pdVersionSwitch) == 0 {
const incompatiblePD = "0.0.1"
return incompatiblePD
}
return version.MinPDVersion.String()
}
mock.GetAllStoresFunc = func() []*metapb.Store {
return []*metapb.Store{{Version: version.MinTiKVVersion.String()}}
}
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
checkAndWaitClusterVersion(ctx, &mock, []string{pdHTTP}, nil)
done <- struct{}{}
}()
select {
case <-time.After(2 * time.Second):
case <-done:
c.Fatal("must timeout")
}
atomic.StoreInt64(&pdVersionSwitch, 1)
select {
case <-time.After(2 * time.Second):
c.Fatal("check timeout")
case <-done:
}
cancel()
}
}
4 changes: 1 addition & 3 deletions cmd/client.go
Expand Up @@ -204,10 +204,8 @@ func newCliCommand() *cobra.Command {
if err != nil {
return err
}
errorTiKVIncompatible := true // Error if TiKV is incompatible.
for _, pdEndpoint := range pdEndpoints {
err = version.CheckClusterVersion(
ctx, pdCli, pdEndpoint, credential, errorTiKVIncompatible)
err = version.CheckClusterVersion(ctx, pdCli, pdEndpoint, credential)
if err == nil {
break
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/factory/factory_impl.go
Expand Up @@ -161,7 +161,7 @@ func (f factoryImpl) PdClient() (pd.Client, error) {

// TODO: we need to check all pd endpoint and make sure they belong to the same cluster.
// See also: https://github.com/pingcap/ticdc/pull/2341#discussion_r673021305.
err = version.CheckClusterVersion(ctx, pdClient, pdEndpoints[0], credential, true)
err = version.CheckClusterVersion(ctx, pdClient, pdEndpoints[0], credential)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to check all PD endpoints to avoid failure when the first PD endpoint is unreachable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#2713 implements your suggestion.

if err != nil {
return nil, err
}
Expand Down
24 changes: 8 additions & 16 deletions pkg/version/check.go
Expand Up @@ -26,34 +26,29 @@ import (

"github.com/coreos/go-semver/semver"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/httputil"
"github.com/pingcap/ticdc/pkg/security"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)

var (
// minPDVersion is the version of the minimal compatible PD.
// TODO bump 5.2.0-alpha once PD releases.
minPDVersion *semver.Version = semver.New("5.1.0-alpha")
// MinPDVersion is the version of the minimal compatible PD.
MinPDVersion *semver.Version = semver.New("5.2.0-alpha")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove all the version-related global variables? I think we can check whether the major/minor version matched TiKV/PD/TiCDC.

// maxPDVersion is the version of the maximum compatible PD.
// Compatible versions are in [minPDVersion, maxPDVersion)
// 9999.0.0 disables the check effectively in the master branch.
maxPDVersion *semver.Version = semver.New("9999.0.0")

// MinTiKVVersion is the version of the minimal compatible TiKV.
// TODO bump 5.2.0-alpha once TiKV releases.
MinTiKVVersion *semver.Version = semver.New("5.1.0-alpha")
MinTiKVVersion *semver.Version = semver.New("5.2.0-alpha")
// maxTiKVVersion is the version of the maximum compatible TiKV.
// Compatible versions are in [MinTiKVVersion, maxTiKVVersion)
// 9999.0.0 disables the check effectively in the master branch.
maxTiKVVersion *semver.Version = semver.New("9999.0.0")

// minTiCDCVersion is the version of the minimal compatible TiCDC.
// TODO bump 5.2.0-alpha once TiCDC releases.
minTiCDCVersion *semver.Version = semver.New("5.1.0-alpha")
minTiCDCVersion *semver.Version = semver.New("5.2.0-alpha")
// Compatible versions are in [MinTiCDCVersion, MaxTiCDCVersion)
// 9999.0.0 disables the check effectively in the master branch.
maxTiCDCVersion *semver.Version = semver.New("9999.0.0")
Expand All @@ -76,14 +71,11 @@ func removeVAndHash(v string) string {

// CheckClusterVersion check TiKV and PD version.
func CheckClusterVersion(
ctx context.Context, client pd.Client, pdHTTP string, credential *security.Credential, errorTiKVIncompat bool,
ctx context.Context, client pd.Client, pdHTTP string, credential *security.Credential,
) error {
err := CheckStoreVersion(ctx, client, 0 /* check all TiKV */)
if err != nil {
if errorTiKVIncompat {
return err
}
log.Warn("check TiKV version failed", zap.Error(err))
return err
}

httpCli, err := httputil.NewClient(credential)
Expand Down Expand Up @@ -127,10 +119,10 @@ func CheckClusterVersion(
return cerror.WrapError(cerror.ErrNewSemVersion, err)
}

minOrd := ver.Compare(*minPDVersion)
minOrd := ver.Compare(*MinPDVersion)
if minOrd < 0 {
arg := fmt.Sprintf("PD %s is not supported, the minimal compatible version is %s",
removeVAndHash(pdVer.Version), minPDVersion)
removeVAndHash(pdVer.Version), MinPDVersion)
return cerror.ErrVersionIncompatible.GenWithStackByArgs(arg)
}
maxOrd := ver.Compare(*maxPDVersion)
Expand Down
73 changes: 24 additions & 49 deletions pkg/version/check_test.go
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/util/testleak"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/pkg/tempurl"
)

Expand All @@ -38,34 +37,9 @@ type checkSuite struct{}

var _ = check.Suite(&checkSuite{})

type mockPDClient struct {
pd.Client
getAllStores func() []*metapb.Store
getVersion func() string
getStatusCode func() int
}

func (m *mockPDClient) GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error) {
if m.getAllStores != nil {
return m.getAllStores(), nil
}
return []*metapb.Store{}, nil
}

func (m *mockPDClient) ServeHTTP(resp http.ResponseWriter, _ *http.Request) {
// set status code at first, else will not work
if m.getStatusCode != nil {
resp.WriteHeader(m.getStatusCode())
}

if m.getVersion != nil {
_, _ = resp.Write([]byte(fmt.Sprintf(`{"version":"%s"}`, m.getVersion())))
}
}

func (s *checkSuite) TestCheckClusterVersion(c *check.C) {
defer testleak.AfterTest(c)()
mock := mockPDClient{
mock := MockPDClient{
Client: nil,
}
pdURL, _ := url.Parse(tempurl.Alloc())
Expand All @@ -89,72 +63,73 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) {
}

{
mock.getVersion = func() string {
return minPDVersion.String()
mock.GetVersionFunc = func() string {
return MinPDVersion.String()
}
mock.getAllStores = func() []*metapb.Store {
mock.GetAllStoresFunc = func() []*metapb.Store {
return []*metapb.Store{{Version: MinTiKVVersion.String()}}
}
err := CheckClusterVersion(context.Background(), &mock, pdHTTP, nil, true)
err := CheckClusterVersion(context.Background(), &mock, pdHTTP, nil)
c.Assert(err, check.IsNil)
}

{
mock.getVersion = func() string {
mock.GetVersionFunc = func() string {
return `v1.0.0-alpha-271-g824ae7fd`
}
mock.getAllStores = func() []*metapb.Store {
mock.GetAllStoresFunc = func() []*metapb.Store {
return []*metapb.Store{{Version: MinTiKVVersion.String()}}
}
err := CheckClusterVersion(context.Background(), &mock, pdHTTP, nil, true)
err := CheckClusterVersion(context.Background(), &mock, pdHTTP, nil)
c.Assert(err, check.ErrorMatches, ".*PD .* is not supported.*")
}

// Check maximum compatible PD.
{
mock.getVersion = func() string {
mock.GetVersionFunc = func() string {
return `v10000.0.0`
}
mock.getAllStores = func() []*metapb.Store {
mock.GetAllStoresFunc = func() []*metapb.Store {
return []*metapb.Store{{Version: MinTiKVVersion.String()}}
}
err := CheckClusterVersion(context.Background(), &mock, pdHTTP, nil, true)
err := CheckClusterVersion(context.Background(), &mock, pdHTTP, nil)
c.Assert(err, check.ErrorMatches, ".*PD .* is not supported.*")
}

{
mock.getVersion = func() string {
return minPDVersion.String()
mock.GetVersionFunc = func() string {
return MinPDVersion.String()
}
mock.getAllStores = func() []*metapb.Store {
mock.GetAllStoresFunc = func() []*metapb.Store {
// TiKV does not include 'v'.
return []*metapb.Store{{Version: `1.0.0-alpha-271-g824ae7fd`}}
}
err := CheckClusterVersion(context.Background(), &mock, pdHTTP, nil, true)
err := CheckClusterVersion(context.Background(), &mock, pdHTTP, nil)
c.Assert(err, check.ErrorMatches, ".*TiKV .* is not supported.*")
err = CheckClusterVersion(context.Background(), &mock, pdHTTP, nil, false)
c.Assert(err, check.IsNil)
}

// Check maximum compatible TiKV.
{
mock.getVersion = func() string {
return minPDVersion.String()
mock.GetVersionFunc = func() string {
return MinPDVersion.String()
}
mock.getAllStores = func() []*metapb.Store {
mock.GetAllStoresFunc = func() []*metapb.Store {
// TiKV does not include 'v'.
return []*metapb.Store{{Version: `10000.0.0`}}
}
err := CheckClusterVersion(context.Background(), &mock, pdHTTP, nil, true)
err := CheckClusterVersion(context.Background(), &mock, pdHTTP, nil)
c.Assert(err, check.ErrorMatches, ".*TiKV .* is not supported.*")
}

{
mock.getStatusCode = func() int {
mock.GetAllStoresFunc = func() []*metapb.Store {
return []*metapb.Store{{Version: MinTiKVVersion.String()}}
}
mock.GetStatusCodeFunc = func() int {
return http.StatusBadRequest
}

err := CheckClusterVersion(context.Background(), &mock, pdHTTP, nil, false)
err := CheckClusterVersion(context.Background(), &mock, pdHTTP, nil)
c.Assert(err, check.ErrorMatches, ".*response status: .*")
}
}
Expand Down