Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server, version: check and wait if cluster is incompatible #2695

Closed
wants to merge 7 commits into from
36 changes: 24 additions & 12 deletions cdc/server.go
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/httputil"
"github.com/pingcap/ticdc/pkg/retry"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/ticdc/pkg/version"
tidbkv "github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -137,18 +138,7 @@ func (s *Server) Run(ctx context.Context) error {
return errors.Trace(err)
}

// To not block CDC server startup, we need to warn instead of error
// when TiKV is incompatible.
errorTiKVIncompatible := false
for _, pdEndpoint := range s.pdEndpoints {
err = version.CheckClusterVersion(ctx, s.pdClient, pdEndpoint, conf.Security, errorTiKVIncompatible)
if err == nil {
break
}
}
if err != nil {
return err
}
checkAndWaitClusterVersion(ctx, s.pdClient, s.pdEndpoints, conf.Security)

kv.InitWorkerPool()
kvStore, err := kv.CreateTiStore(strings.Join(s.pdEndpoints, ","), conf.Security)
Expand Down Expand Up @@ -367,3 +357,25 @@ func findBestDataDir(candidates []string) (result string, ok bool) {

return result, ok
}

func checkAndWaitClusterVersion(
ctx context.Context, pdClient pd.Client, pdEndpoints []string, security *config.SecurityConfig,
) {
const backoffOneSecond = 1000
const errorTiKVIncompatible = true

retry.Do(ctx, func() error {
var err error
for _, pdEndpoint := range pdEndpoints {
err = version.CheckClusterVersion(
ctx, pdClient, pdEndpoint, security, errorTiKVIncompatible)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems the parameter errorTiKVIncompatible can be removed from function CheckClusterVersion

if err == nil {
break
}
Comment on lines +360 to +362
Copy link
Contributor

@amyangfei amyangfei Sep 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two scenarios have logic conflicts here.

  • More than 3 PDs in upstream, one of them is down, TiCDC can start up, which is the HA feature, in this scenario TiCDC only checks one PD.
  • Upstream upgrade, should TiCDC wait all PDs upgrade to the required version, then TiCDC should check all PDs.

log.Warn("check version", zap.Error(err))
}
return err
}, retry.WithInfiniteTries(),
retry.WithBackoffBaseDelay(backoffOneSecond), retry.WithBackoffMaxDelay(backoffOneSecond))
return
}
77 changes: 77 additions & 0 deletions cdc/server_test.go
Expand Up @@ -15,15 +15,21 @@ package cdc

import (
"context"
"fmt"
"net/http"
"net/url"
"path/filepath"
"sync/atomic"
"time"

"github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/etcd"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/ticdc/pkg/util/testleak"
"github.com/pingcap/ticdc/pkg/version"
"github.com/tikv/pd/pkg/tempurl"
"go.etcd.io/etcd/embed"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -109,3 +115,74 @@ func (s *serverSuite) TestInitDataDir(c *check.C) {

cancel()
}

type checkSuite struct{}

var _ = check.Suite(&checkSuite{})

func (s *checkSuite) TestCheckAndWaitClusterVersion(c *check.C) {
defer testleak.AfterTest(c)()
mock := version.MockPDClient{
Client: nil,
}
pdURL, _ := url.Parse(tempurl.Alloc())
pdHTTP := fmt.Sprintf("http://%s", pdURL.Host)
srv := http.Server{Addr: pdURL.Host, Handler: &mock}
go func() {
_ = srv.ListenAndServe()
}()
defer srv.Close()

{
mock.GetVersionFunc = func() string {
return version.MinPDVersion.String()
}
mock.GetAllStoresFunc = func() []*metapb.Store {
return []*metapb.Store{{Version: version.MinTiKVVersion.String()}}
}
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
checkAndWaitClusterVersion(ctx, &mock, []string{pdHTTP}, nil)
done <- struct{}{}
}()
select {
case <-time.After(5 * time.Second):
c.Fatal("check timeout")
case <-done:
}
cancel()
}

{
pdVersionSwitch := int64(0)
mock.GetVersionFunc = func() string {
if atomic.LoadInt64(&pdVersionSwitch) == 0 {
const incompatiblePD = "0.0.1"
return incompatiblePD
}
return version.MinPDVersion.String()
}
mock.GetAllStoresFunc = func() []*metapb.Store {
return []*metapb.Store{{Version: version.MinTiKVVersion.String()}}
}
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
checkAndWaitClusterVersion(ctx, &mock, []string{pdHTTP}, nil)
done <- struct{}{}
}()
select {
case <-time.After(2 * time.Second):
case <-done:
c.Fatal("must timeout")
}
atomic.StoreInt64(&pdVersionSwitch, 1)
select {
case <-time.After(2 * time.Second):
c.Fatal("check timeout")
case <-done:
}
cancel()
}
}
8 changes: 4 additions & 4 deletions pkg/version/check.go
Expand Up @@ -35,9 +35,9 @@ import (
)

var (
// minPDVersion is the version of the minimal compatible PD.
// MinPDVersion is the version of the minimal compatible PD.
// TODO bump 5.2.0-alpha once PD releases.
minPDVersion *semver.Version = semver.New("5.1.0-alpha")
MinPDVersion *semver.Version = semver.New("5.1.0-alpha")
Copy link
Contributor

@amyangfei amyangfei Aug 31, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this version be updated to 5.2.0-alpha, ditto for MinTiKVVersion

// maxPDVersion is the version of the maximum compatible PD.
// Compatible versions are in [minPDVersion, maxPDVersion)
// 9999.0.0 disables the check effectively in the master branch.
Expand Down Expand Up @@ -127,10 +127,10 @@ func CheckClusterVersion(
return cerror.WrapError(cerror.ErrNewSemVersion, err)
}

minOrd := ver.Compare(*minPDVersion)
minOrd := ver.Compare(*MinPDVersion)
if minOrd < 0 {
arg := fmt.Sprintf("PD %s is not supported, the minimal compatible version is %s",
removeVAndHash(pdVer.Version), minPDVersion)
removeVAndHash(pdVer.Version), MinPDVersion)
return cerror.ErrVersionIncompatible.GenWithStackByArgs(arg)
}
maxOrd := ver.Compare(*maxPDVersion)
Expand Down
50 changes: 12 additions & 38 deletions pkg/version/check_test.go
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/util/testleak"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/pkg/tempurl"
)

Expand All @@ -38,34 +37,9 @@ type checkSuite struct{}

var _ = check.Suite(&checkSuite{})

type mockPDClient struct {
pd.Client
getAllStores func() []*metapb.Store
getVersion func() string
getStatusCode func() int
}

func (m *mockPDClient) GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error) {
if m.getAllStores != nil {
return m.getAllStores(), nil
}
return []*metapb.Store{}, nil
}

func (m *mockPDClient) ServeHTTP(resp http.ResponseWriter, _ *http.Request) {
// set status code at first, else will not work
if m.getStatusCode != nil {
resp.WriteHeader(m.getStatusCode())
}

if m.getVersion != nil {
_, _ = resp.Write([]byte(fmt.Sprintf(`{"version":"%s"}`, m.getVersion())))
}
}

func (s *checkSuite) TestCheckClusterVersion(c *check.C) {
defer testleak.AfterTest(c)()
mock := mockPDClient{
mock := MockPDClient{
Client: nil,
}
pdURL, _ := url.Parse(tempurl.Alloc())
Expand All @@ -89,21 +63,21 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) {
}

{
mock.getVersion = func() string {
mock.GetVersionFunc = func() string {
return minPDVersion.String()
}
mock.getAllStores = func() []*metapb.Store {
mock.GetAllStoresFunc = func() []*metapb.Store {
return []*metapb.Store{{Version: MinTiKVVersion.String()}}
}
err := CheckClusterVersion(context.Background(), &mock, pdHTTP, nil, true)
c.Assert(err, check.IsNil)
}

{
mock.getVersion = func() string {
mock.GetVersionFunc = func() string {
return `v1.0.0-alpha-271-g824ae7fd`
}
mock.getAllStores = func() []*metapb.Store {
mock.GetAllStoresFunc = func() []*metapb.Store {
return []*metapb.Store{{Version: MinTiKVVersion.String()}}
}
err := CheckClusterVersion(context.Background(), &mock, pdHTTP, nil, true)
Expand All @@ -112,21 +86,21 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) {

// Check maximum compatible PD.
{
mock.getVersion = func() string {
mock.GetVersionFunc = func() string {
return `v10000.0.0`
}
mock.getAllStores = func() []*metapb.Store {
mock.GetAllStoresFunc = func() []*metapb.Store {
return []*metapb.Store{{Version: MinTiKVVersion.String()}}
}
err := CheckClusterVersion(context.Background(), &mock, pdHTTP, nil, true)
c.Assert(err, check.ErrorMatches, ".*PD .* is not supported.*")
}

{
mock.getVersion = func() string {
mock.GetVersionFunc = func() string {
return minPDVersion.String()
}
mock.getAllStores = func() []*metapb.Store {
mock.GetAllStoresFunc = func() []*metapb.Store {
// TiKV does not include 'v'.
return []*metapb.Store{{Version: `1.0.0-alpha-271-g824ae7fd`}}
}
Expand All @@ -138,10 +112,10 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) {

// Check maximum compatible TiKV.
{
mock.getVersion = func() string {
mock.GetVersionFunc = func() string {
return minPDVersion.String()
}
mock.getAllStores = func() []*metapb.Store {
mock.GetAllStoresFunc = func() []*metapb.Store {
// TiKV does not include 'v'.
return []*metapb.Store{{Version: `10000.0.0`}}
}
Expand All @@ -150,7 +124,7 @@ func (s *checkSuite) TestCheckClusterVersion(c *check.C) {
}

{
mock.getStatusCode = func() int {
mock.GetStatusCodeFunc = func() int {
return http.StatusBadRequest
}

Expand Down
50 changes: 50 additions & 0 deletions pkg/version/testing.go
@@ -0,0 +1,50 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package version

import (
"context"
"fmt"
"net/http"

"github.com/pingcap/kvproto/pkg/metapb"
pd "github.com/tikv/pd/client"
)

// MockPDClient mocks a pd client for tests.
type MockPDClient struct {
pd.Client
GetAllStoresFunc func() []*metapb.Store
GetVersionFunc func() string
GetStatusCodeFunc func() int
}

// GetAllStores impls pd.Client.GetAllStores
func (m *MockPDClient) GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error) {
if m.GetAllStoresFunc != nil {
return m.GetAllStoresFunc(), nil
}
return []*metapb.Store{}, nil
}

func (m *MockPDClient) ServeHTTP(resp http.ResponseWriter, _ *http.Request) {
// set status code at first, else will not work
if m.GetStatusCodeFunc != nil {
resp.WriteHeader(m.GetStatusCodeFunc())
}

if m.GetVersionFunc != nil {
_, _ = resp.Write([]byte(fmt.Sprintf(`{"version":"%s"}`, m.GetVersionFunc())))
}
}