Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#45680
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
lichunzhu authored and ti-chi-bot committed Aug 1, 2023
1 parent c3f04c1 commit d57d450
Show file tree
Hide file tree
Showing 14 changed files with 2,230 additions and 27 deletions.
58 changes: 53 additions & 5 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,15 +267,15 @@ func (*encodingBuilder) MakeEmptyRows() encode.Rows {
type targetInfoGetter struct {
tls *common.TLS
targetDB *sql.DB
pdAddr string
pdCli pd.Client
}

// NewTargetInfoGetter creates an TargetInfoGetter with local backend implementation.
func NewTargetInfoGetter(tls *common.TLS, db *sql.DB, pdAddr string) backend.TargetInfoGetter {
func NewTargetInfoGetter(tls *common.TLS, db *sql.DB, pdCli pd.Client) backend.TargetInfoGetter {
return &targetInfoGetter{
tls: tls,
targetDB: db,
pdAddr: pdAddr,
pdCli: pdCli,
}
}

Expand All @@ -296,10 +296,10 @@ func (g *targetInfoGetter) CheckRequirements(ctx context.Context, checkCtx *back
if err := checkTiDBVersion(ctx, versionStr, localMinTiDBVersion, localMaxTiDBVersion); err != nil {
return err
}
if err := tikv.CheckPDVersion(ctx, g.tls, g.pdAddr, localMinPDVersion, localMaxPDVersion); err != nil {
if err := tikv.CheckPDVersion(ctx, g.tls, g.pdCli.GetLeaderAddr(), localMinPDVersion, localMaxPDVersion); err != nil {
return err
}
if err := tikv.CheckTiKVVersion(ctx, g.tls, g.pdAddr, localMinTiKVVersion, localMaxTiKVVersion); err != nil {
if err := tikv.CheckTiKVVersion(ctx, g.tls, g.pdCli.GetLeaderAddr(), localMinTiKVVersion, localMaxTiKVVersion); err != nil {
return err
}

Expand Down Expand Up @@ -1690,6 +1690,54 @@ func (local *Backend) LocalWriter(_ context.Context, cfg *backend.LocalWriterCon
return openLocalWriter(cfg, engine, local.tikvCodec, local.LocalWriterMemCacheSize, local.bufferPool.NewBuffer())
}

<<<<<<< HEAD
=======
// SwitchModeByKeyRanges will switch tikv mode for regions in the specific key range for multirocksdb.
// This function will spawn a goroutine to keep switch mode periodically until the context is done.
// The return done channel is used to notify the caller that the background goroutine is exited.
func (local *Backend) SwitchModeByKeyRanges(ctx context.Context, ranges []Range) (<-chan struct{}, error) {
switcher := NewTiKVModeSwitcher(local.tls, local.pdCtl.GetPDClient(), log.FromContext(ctx).Logger)
done := make(chan struct{})

keyRanges := make([]*sst.Range, 0, len(ranges))
for _, r := range ranges {
startKey := r.start
if len(r.start) > 0 {
startKey = codec.EncodeBytes(nil, r.start)
}
endKey := r.end
if len(r.end) > 0 {
endKey = codec.EncodeBytes(nil, r.end)
}
keyRanges = append(keyRanges, &sst.Range{
Start: startKey,
End: endKey,
})
}

go func() {
defer close(done)
ticker := time.NewTicker(local.BackendConfig.RaftKV2SwitchModeDuration)
defer ticker.Stop()
switcher.ToImportMode(ctx, keyRanges...)
loop:
for {
select {
case <-ctx.Done():
break loop
case <-ticker.C:
switcher.ToImportMode(ctx, keyRanges...)
}
}
// Use a new context to avoid the context is canceled by the caller.
recoverCtx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
switcher.ToNormalMode(recoverCtx, keyRanges...)
}()
return done, nil
}

>>>>>>> 9c213aac21d (lightning: fix pd http request using old address (#45680))
func openLocalWriter(cfg *backend.LocalWriterConfig, engine *Engine, tikvCodec tikvclient.Codec, cacheSize int64, kvBuffer *membuf.Buffer) (*Writer, error) {
w := &Writer{
engine: engine,
Expand Down
83 changes: 83 additions & 0 deletions br/pkg/lightning/backend/local/tikv_mode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package local

import (
"context"

sstpb "github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/tikv"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)

// TiKVModeSwitcher is used to switch TiKV nodes between Import and Normal mode.
type TiKVModeSwitcher interface {
// ToImportMode switches all TiKV nodes to Import mode.
ToImportMode(ctx context.Context, ranges ...*sstpb.Range)
// ToNormalMode switches all TiKV nodes to Normal mode.
ToNormalMode(ctx context.Context, ranges ...*sstpb.Range)
}

// TiKVModeSwitcher is used to switch TiKV nodes between Import and Normal mode.
type switcher struct {
tls *common.TLS
pdCli pd.Client
logger *zap.Logger
}

// NewTiKVModeSwitcher creates a new TiKVModeSwitcher.
func NewTiKVModeSwitcher(tls *common.TLS, pdCli pd.Client, logger *zap.Logger) TiKVModeSwitcher {
return &switcher{
tls: tls,
pdCli: pdCli,
logger: logger,
}
}

func (rc *switcher) ToImportMode(ctx context.Context, ranges ...*sstpb.Range) {
rc.switchTiKVMode(ctx, sstpb.SwitchMode_Import, ranges...)
}

func (rc *switcher) ToNormalMode(ctx context.Context, ranges ...*sstpb.Range) {
rc.switchTiKVMode(ctx, sstpb.SwitchMode_Normal, ranges...)
}

func (rc *switcher) switchTiKVMode(ctx context.Context, mode sstpb.SwitchMode, ranges ...*sstpb.Range) {
rc.logger.Info("switch tikv mode", zap.Stringer("mode", mode))

// It is fine if we miss some stores which did not switch to Import mode,
// since we're running it periodically, so we exclude disconnected stores.
// But it is essentially all stores be switched back to Normal mode to allow
// normal operation.
var minState tikv.StoreState
if mode == sstpb.SwitchMode_Import {
minState = tikv.StoreStateOffline
} else {
minState = tikv.StoreStateDisconnected
}
tls := rc.tls.WithHost(rc.pdCli.GetLeaderAddr())
// we ignore switch mode failure since it is not fatal.
// no need log the error, it is done in kv.SwitchMode already.
_ = tikv.ForAllStores(
ctx,
tls,
minState,
func(c context.Context, store *tikv.Store) error {
return tikv.SwitchMode(c, tls, store.Address, mode, ranges...)
},
)
}
2 changes: 2 additions & 0 deletions br/pkg/lightning/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ go_test(
"@com_github_stretchr_testify//require",
"@com_github_stretchr_testify//suite",
"@com_github_tikv_client_go_v2//config",
"@com_github_tikv_client_go_v2//testutils",
"@com_github_tikv_pd_client//:client",
"@com_github_xitongsys_parquet_go//writer",
"@com_github_xitongsys_parquet_go_source//buffer",
"@io_etcd_go_etcd_client_v3//:client",
Expand Down
20 changes: 11 additions & 9 deletions br/pkg/lightning/importer/checksum_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/metric"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/kv"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)

Expand All @@ -37,21 +36,14 @@ func NewChecksumManager(ctx context.Context, rc *Controller, store kv.Storage) (
return nil, nil
}

pdAddr := rc.cfg.TiDB.PdAddr
pdVersion, err := pdutil.FetchPDVersion(ctx, rc.tls, pdAddr)
pdVersion, err := pdutil.FetchPDVersion(ctx, rc.tls, rc.pdCli.GetLeaderAddr())
if err != nil {
return nil, errors.Trace(err)
}

// for v4.0.0 or upper, we can use the gc ttl api
var manager local.ChecksumManager
if pdVersion.Major >= 4 && !rc.cfg.PostRestore.ChecksumViaSQL {
tlsOpt := rc.tls.ToPDSecurityOption()
pdCli, err := pd.NewClientWithContext(ctx, []string{pdAddr}, tlsOpt)
if err != nil {
return nil, errors.Trace(err)
}

backoffWeight, err := common.GetBackoffWeightFromDB(ctx, rc.db)
// only set backoff weight when it's smaller than default value
if err == nil && backoffWeight >= local.DefaultBackoffWeight {
Expand All @@ -60,7 +52,17 @@ func NewChecksumManager(ctx context.Context, rc *Controller, store kv.Storage) (
log.FromContext(ctx).Info("set tidb_backoff_weight to default", zap.Int("backoff_weight", local.DefaultBackoffWeight))
backoffWeight = local.DefaultBackoffWeight
}
<<<<<<< HEAD
manager = local.NewTiKVChecksumManager(store.GetClient(), pdCli, uint(rc.cfg.TiDB.DistSQLScanConcurrency), backoffWeight)
=======

explicitRequestSourceType, err := common.GetExplicitRequestSourceTypeFromDB(ctx, rc.db)
if err != nil {
log.FromContext(ctx).Warn("get tidb_request_source_type failed", zap.Error(err), zap.String("tidb_request_source_type", explicitRequestSourceType))
return nil, errors.Trace(err)
}
manager = local.NewTiKVChecksumManager(store.GetClient(), rc.pdCli, uint(rc.cfg.TiDB.DistSQLScanConcurrency), backoffWeight, rc.resourceGroupName, explicitRequestSourceType)
>>>>>>> 9c213aac21d (lightning: fix pd http request using old address (#45680))
} else {
manager = local.NewTiDBChecksumExecutor(rc.db)
}
Expand Down
15 changes: 11 additions & 4 deletions br/pkg/lightning/importer/get_pre_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/mock"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)
Expand Down Expand Up @@ -123,12 +124,14 @@ type TargetInfoGetterImpl struct {
db *sql.DB
tls *common.TLS
backend backend.TargetInfoGetter
pdCli pd.Client
}

// NewTargetInfoGetterImpl creates a TargetInfoGetterImpl object.
func NewTargetInfoGetterImpl(
cfg *config.Config,
targetDB *sql.DB,
pdCli pd.Client,
) (*TargetInfoGetterImpl, error) {
tls, err := cfg.ToTLS()
if err != nil {
Expand All @@ -139,7 +142,10 @@ func NewTargetInfoGetterImpl(
case config.BackendTiDB:
backendTargetInfoGetter = tidb.NewTargetInfoGetter(targetDB)
case config.BackendLocal:
backendTargetInfoGetter = local.NewTargetInfoGetter(tls, targetDB, cfg.TiDB.PdAddr)
if pdCli == nil {
return nil, common.ErrUnknown.GenWithStack("pd client is required when using local backend")
}
backendTargetInfoGetter = local.NewTargetInfoGetter(tls, targetDB, pdCli)
default:
return nil, common.ErrUnknownBackend.GenWithStackByArgs(cfg.TikvImporter.Backend)
}
Expand All @@ -148,6 +154,7 @@ func NewTargetInfoGetterImpl(
tls: tls,
db: targetDB,
backend: backendTargetInfoGetter,
pdCli: pdCli,
}, nil
}

Expand Down Expand Up @@ -229,7 +236,7 @@ func (g *TargetInfoGetterImpl) GetTargetSysVariablesForImport(ctx context.Contex
// It uses the PD interface through TLS to get the information.
func (g *TargetInfoGetterImpl) GetReplicationConfig(ctx context.Context) (*pdtypes.ReplicationConfig, error) {
result := new(pdtypes.ReplicationConfig)
if err := g.tls.WithHost(g.cfg.TiDB.PdAddr).GetJSON(ctx, pdReplicate, &result); err != nil {
if err := g.tls.WithHost(g.pdCli.GetLeaderAddr()).GetJSON(ctx, pdReplicate, &result); err != nil {
return nil, errors.Trace(err)
}
return result, nil
Expand All @@ -240,7 +247,7 @@ func (g *TargetInfoGetterImpl) GetReplicationConfig(ctx context.Context) (*pdtyp
// It uses the PD interface through TLS to get the information.
func (g *TargetInfoGetterImpl) GetStorageInfo(ctx context.Context) (*pdtypes.StoresInfo, error) {
result := new(pdtypes.StoresInfo)
if err := g.tls.WithHost(g.cfg.TiDB.PdAddr).GetJSON(ctx, pdStores, result); err != nil {
if err := g.tls.WithHost(g.pdCli.GetLeaderAddr()).GetJSON(ctx, pdStores, result); err != nil {
return nil, errors.Trace(err)
}
return result, nil
Expand All @@ -251,7 +258,7 @@ func (g *TargetInfoGetterImpl) GetStorageInfo(ctx context.Context) (*pdtypes.Sto
// It uses the PD interface through TLS to get the information.
func (g *TargetInfoGetterImpl) GetEmptyRegionsInfo(ctx context.Context) (*pdtypes.RegionsInfo, error) {
result := new(pdtypes.RegionsInfo)
if err := g.tls.WithHost(g.cfg.TiDB.PdAddr).GetJSON(ctx, pdEmptyRegions, &result); err != nil {
if err := g.tls.WithHost(g.pdCli.GetLeaderAddr()).GetJSON(ctx, pdEmptyRegions, &result); err != nil {
return nil, errors.Trace(err)
}
return result, nil
Expand Down
5 changes: 4 additions & 1 deletion br/pkg/lightning/importer/get_pre_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,10 @@ func TestGetPreInfoIsTableEmpty(t *testing.T) {
require.NoError(t, err)
lnConfig := config.NewConfig()
lnConfig.TikvImporter.Backend = config.BackendLocal
targetGetter, err := NewTargetInfoGetterImpl(lnConfig, db)
_, err = NewTargetInfoGetterImpl(lnConfig, db, nil)
require.ErrorContains(t, err, "pd client is required when using local backend")
lnConfig.TikvImporter.Backend = config.BackendTiDB
targetGetter, err := NewTargetInfoGetterImpl(lnConfig, db, nil)
require.NoError(t, err)
require.Equal(t, lnConfig, targetGetter.cfg)

Expand Down
Loading

0 comments on commit d57d450

Please sign in to comment.