Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: fix pd http request using old address #45680

Merged
merged 13 commits into from Aug 1, 2023
12 changes: 6 additions & 6 deletions br/pkg/lightning/backend/local/local.go
Expand Up @@ -268,15 +268,15 @@
type targetInfoGetter struct {
tls *common.TLS
targetDB *sql.DB
pdAddr string
pdCli pd.Client
}

// NewTargetInfoGetter creates an TargetInfoGetter with local backend implementation.
func NewTargetInfoGetter(tls *common.TLS, db *sql.DB, pdAddr string) backend.TargetInfoGetter {
func NewTargetInfoGetter(tls *common.TLS, db *sql.DB, pdCli pd.Client) backend.TargetInfoGetter {

Check warning on line 275 in br/pkg/lightning/backend/local/local.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/backend/local/local.go#L275

Added line #L275 was not covered by tests
return &targetInfoGetter{
tls: tls,
targetDB: db,
pdAddr: pdAddr,
pdCli: pdCli,

Check warning on line 279 in br/pkg/lightning/backend/local/local.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/backend/local/local.go#L279

Added line #L279 was not covered by tests
}
}

Expand All @@ -297,10 +297,10 @@
if err := checkTiDBVersion(ctx, versionStr, localMinTiDBVersion, localMaxTiDBVersion); err != nil {
return err
}
if err := tikv.CheckPDVersion(ctx, g.tls, g.pdAddr, localMinPDVersion, localMaxPDVersion); err != nil {
if err := tikv.CheckPDVersion(ctx, g.tls, g.pdCli.GetLeaderAddr(), localMinPDVersion, localMaxPDVersion); err != nil {

Check warning on line 300 in br/pkg/lightning/backend/local/local.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/backend/local/local.go#L300

Added line #L300 was not covered by tests
return err
}
if err := tikv.CheckTiKVVersion(ctx, g.tls, g.pdAddr, localMinTiKVVersion, localMaxTiKVVersion); err != nil {
if err := tikv.CheckTiKVVersion(ctx, g.tls, g.pdCli.GetLeaderAddr(), localMinTiKVVersion, localMaxTiKVVersion); err != nil {

Check warning on line 303 in br/pkg/lightning/backend/local/local.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/backend/local/local.go#L303

Added line #L303 was not covered by tests
return err
}

Expand Down Expand Up @@ -1719,7 +1719,7 @@
// This function will spawn a goroutine to keep switch mode periodically until the context is done.
// The return done channel is used to notify the caller that the background goroutine is exited.
func (local *Backend) SwitchModeByKeyRanges(ctx context.Context, ranges []Range) (<-chan struct{}, error) {
switcher := NewTiKVModeSwitcher(local.tls, local.PDAddr, log.FromContext(ctx).Logger)
switcher := NewTiKVModeSwitcher(local.tls, local.pdCtl.GetPDClient(), log.FromContext(ctx).Logger)

Check warning on line 1722 in br/pkg/lightning/backend/local/local.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/backend/local/local.go#L1722

Added line #L1722 was not covered by tests
done := make(chan struct{})

keyRanges := make([]*sst.Range, 0, len(ranges))
Expand Down
9 changes: 5 additions & 4 deletions br/pkg/lightning/backend/local/tikv_mode.go
Expand Up @@ -20,6 +20,7 @@
sstpb "github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/tikv"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)

Expand All @@ -34,15 +35,15 @@
// TiKVModeSwitcher is used to switch TiKV nodes between Import and Normal mode.
type switcher struct {
tls *common.TLS
pdAddr string
pdCli pd.Client
logger *zap.Logger
}

// NewTiKVModeSwitcher creates a new TiKVModeSwitcher.
func NewTiKVModeSwitcher(tls *common.TLS, pdAddr string, logger *zap.Logger) TiKVModeSwitcher {
func NewTiKVModeSwitcher(tls *common.TLS, pdCli pd.Client, logger *zap.Logger) TiKVModeSwitcher {

Check warning on line 43 in br/pkg/lightning/backend/local/tikv_mode.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/backend/local/tikv_mode.go#L43

Added line #L43 was not covered by tests
return &switcher{
tls: tls,
pdAddr: pdAddr,
pdCli: pdCli,

Check warning on line 46 in br/pkg/lightning/backend/local/tikv_mode.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/backend/local/tikv_mode.go#L46

Added line #L46 was not covered by tests
logger: logger,
}
}
Expand All @@ -68,7 +69,7 @@
} else {
minState = tikv.StoreStateDisconnected
}
tls := rc.tls.WithHost(rc.pdAddr)
tls := rc.tls.WithHost(rc.pdCli.GetLeaderAddr())

Check warning on line 72 in br/pkg/lightning/backend/local/tikv_mode.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/backend/local/tikv_mode.go#L72

Added line #L72 was not covered by tests
// we ignore switch mode failure since it is not fatal.
// no need log the error, it is done in kv.SwitchMode already.
_ = tikv.ForAllStores(
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/importer/BUILD.bazel
Expand Up @@ -175,6 +175,8 @@ go_test(
"@com_github_stretchr_testify//require",
"@com_github_stretchr_testify//suite",
"@com_github_tikv_client_go_v2//config",
"@com_github_tikv_client_go_v2//testutils",
"@com_github_tikv_pd_client//:client",
"@com_github_xitongsys_parquet_go//writer",
"@com_github_xitongsys_parquet_go_source//buffer",
"@io_etcd_go_etcd_client_v3//:client",
Expand Down
12 changes: 2 additions & 10 deletions br/pkg/lightning/importer/checksum_helper.go
Expand Up @@ -26,7 +26,6 @@
"github.com/pingcap/tidb/br/pkg/lightning/metric"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/kv"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)

Expand All @@ -37,21 +36,14 @@
return nil, nil
}

pdAddr := rc.cfg.TiDB.PdAddr
pdVersion, err := pdutil.FetchPDVersion(ctx, rc.tls, pdAddr)
pdVersion, err := pdutil.FetchPDVersion(ctx, rc.tls, rc.pdCli.GetLeaderAddr())

Check warning on line 39 in br/pkg/lightning/importer/checksum_helper.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/importer/checksum_helper.go#L39

Added line #L39 was not covered by tests
if err != nil {
return nil, errors.Trace(err)
}

// for v4.0.0 or upper, we can use the gc ttl api
var manager local.ChecksumManager
if pdVersion.Major >= 4 && !rc.cfg.PostRestore.ChecksumViaSQL {
tlsOpt := rc.tls.ToPDSecurityOption()
pdCli, err := pd.NewClientWithContext(ctx, []string{pdAddr}, tlsOpt)
if err != nil {
return nil, errors.Trace(err)
}

backoffWeight, err := common.GetBackoffWeightFromDB(ctx, rc.db)
// only set backoff weight when it's smaller than default value
if err == nil && backoffWeight >= local.DefaultBackoffWeight {
Expand All @@ -66,7 +58,7 @@
log.FromContext(ctx).Warn("get tidb_request_source_type failed", zap.Error(err), zap.String("tidb_request_source_type", explicitRequestSourceType))
return nil, errors.Trace(err)
}
manager = local.NewTiKVChecksumManager(store.GetClient(), pdCli, uint(rc.cfg.TiDB.DistSQLScanConcurrency), backoffWeight, rc.resourceGroupName, explicitRequestSourceType)
manager = local.NewTiKVChecksumManager(store.GetClient(), rc.pdCli, uint(rc.cfg.TiDB.DistSQLScanConcurrency), backoffWeight, rc.resourceGroupName, explicitRequestSourceType)

Check warning on line 61 in br/pkg/lightning/importer/checksum_helper.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/importer/checksum_helper.go#L61

Added line #L61 was not covered by tests
} else {
manager = local.NewTiDBChecksumExecutor(rc.db)
}
Expand Down
15 changes: 11 additions & 4 deletions br/pkg/lightning/importer/get_pre_info.go
Expand Up @@ -50,6 +50,7 @@
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/mock"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)
Expand Down Expand Up @@ -123,12 +124,14 @@
db *sql.DB
tls *common.TLS
backend backend.TargetInfoGetter
pdCli pd.Client
}

// NewTargetInfoGetterImpl creates a TargetInfoGetterImpl object.
func NewTargetInfoGetterImpl(
cfg *config.Config,
targetDB *sql.DB,
pdCli pd.Client,
) (*TargetInfoGetterImpl, error) {
tls, err := cfg.ToTLS()
if err != nil {
Expand All @@ -139,7 +142,10 @@
case config.BackendTiDB:
backendTargetInfoGetter = tidb.NewTargetInfoGetter(targetDB)
case config.BackendLocal:
backendTargetInfoGetter = local.NewTargetInfoGetter(tls, targetDB, cfg.TiDB.PdAddr)
if pdCli == nil {
return nil, common.ErrUnknown.GenWithStack("pd client is required when using local backend")
}
backendTargetInfoGetter = local.NewTargetInfoGetter(tls, targetDB, pdCli)

Check warning on line 148 in br/pkg/lightning/importer/get_pre_info.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/importer/get_pre_info.go#L148

Added line #L148 was not covered by tests
default:
return nil, common.ErrUnknownBackend.GenWithStackByArgs(cfg.TikvImporter.Backend)
}
Expand All @@ -148,6 +154,7 @@
tls: tls,
db: targetDB,
backend: backendTargetInfoGetter,
pdCli: pdCli,
}, nil
}

Expand Down Expand Up @@ -229,7 +236,7 @@
// It uses the PD interface through TLS to get the information.
func (g *TargetInfoGetterImpl) GetReplicationConfig(ctx context.Context) (*pdtypes.ReplicationConfig, error) {
result := new(pdtypes.ReplicationConfig)
if err := g.tls.WithHost(g.cfg.TiDB.PdAddr).GetJSON(ctx, pdReplicate, &result); err != nil {
if err := g.tls.WithHost(g.pdCli.GetLeaderAddr()).GetJSON(ctx, pdReplicate, &result); err != nil {
return nil, errors.Trace(err)
}
return result, nil
Expand All @@ -240,7 +247,7 @@
// It uses the PD interface through TLS to get the information.
func (g *TargetInfoGetterImpl) GetStorageInfo(ctx context.Context) (*pdtypes.StoresInfo, error) {
result := new(pdtypes.StoresInfo)
if err := g.tls.WithHost(g.cfg.TiDB.PdAddr).GetJSON(ctx, pdStores, result); err != nil {
if err := g.tls.WithHost(g.pdCli.GetLeaderAddr()).GetJSON(ctx, pdStores, result); err != nil {
return nil, errors.Trace(err)
}
return result, nil
Expand All @@ -251,7 +258,7 @@
// It uses the PD interface through TLS to get the information.
func (g *TargetInfoGetterImpl) GetEmptyRegionsInfo(ctx context.Context) (*pdtypes.RegionsInfo, error) {
result := new(pdtypes.RegionsInfo)
if err := g.tls.WithHost(g.cfg.TiDB.PdAddr).GetJSON(ctx, pdEmptyRegions, &result); err != nil {
if err := g.tls.WithHost(g.pdCli.GetLeaderAddr()).GetJSON(ctx, pdEmptyRegions, &result); err != nil {
return nil, errors.Trace(err)
}
return result, nil
Expand Down
5 changes: 4 additions & 1 deletion br/pkg/lightning/importer/get_pre_info_test.go
Expand Up @@ -757,7 +757,10 @@ func TestGetPreInfoIsTableEmpty(t *testing.T) {
require.NoError(t, err)
lnConfig := config.NewConfig()
lnConfig.TikvImporter.Backend = config.BackendLocal
targetGetter, err := NewTargetInfoGetterImpl(lnConfig, db)
_, err = NewTargetInfoGetterImpl(lnConfig, db, nil)
require.ErrorContains(t, err, "pd client is required when using local backend")
lnConfig.TikvImporter.Backend = config.BackendTiDB
targetGetter, err := NewTargetInfoGetterImpl(lnConfig, db, nil)
require.NoError(t, err)
require.Equal(t, lnConfig, targetGetter.cfg)

Expand Down
19 changes: 15 additions & 4 deletions br/pkg/lightning/importer/import.go
Expand Up @@ -200,6 +200,7 @@
engineMgr backend.EngineManager
backend backend.Backend
db *sql.DB
pdCli pd.Client

alterTableLock sync.Mutex
sysVars map[string]string
Expand Down Expand Up @@ -333,6 +334,7 @@

var encodingBuilder encode.EncodingBuilder
var backendObj backend.Backend
var pdCli pd.Client
switch cfg.TikvImporter.Backend {
case config.BackendTiDB:
encodingBuilder = tidb.NewEncodingBuilder()
Expand All @@ -348,9 +350,13 @@
if maxOpenFiles < 0 {
maxOpenFiles = math.MaxInt32
}
pdCli, err = pd.NewClientWithContext(ctx, []string{cfg.TiDB.PdAddr}, tls.ToPDSecurityOption())
if err != nil {
return nil, errors.Trace(err)
}

Check warning on line 356 in br/pkg/lightning/importer/import.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/importer/import.go#L353-L356

Added lines #L353 - L356 were not covered by tests

if cfg.TikvImporter.DuplicateResolution != config.DupeResAlgNone {
if err := tikv.CheckTiKVVersion(ctx, tls, cfg.TiDB.PdAddr, minTiKVVersionForDuplicateResolution, maxTiKVVersionForDuplicateResolution); err != nil {
if err := tikv.CheckTiKVVersion(ctx, tls, pdCli.GetLeaderAddr(), minTiKVVersionForDuplicateResolution, maxTiKVVersionForDuplicateResolution); err != nil {

Check warning on line 359 in br/pkg/lightning/importer/import.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/importer/import.go#L359

Added line #L359 was not covered by tests
if !berrors.Is(err, berrors.ErrVersionMismatch) {
return nil, common.ErrCheckKVVersion.Wrap(err).GenWithStackByArgs()
}
Expand Down Expand Up @@ -419,7 +425,7 @@

var wrapper backend.TargetInfoGetter
if cfg.TikvImporter.Backend == config.BackendLocal {
wrapper = local.NewTargetInfoGetter(tls, db, cfg.TiDB.PdAddr)
wrapper = local.NewTargetInfoGetter(tls, db, pdCli)

Check warning on line 428 in br/pkg/lightning/importer/import.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/importer/import.go#L428

Added line #L428 was not covered by tests
} else {
wrapper = tidb.NewTargetInfoGetter(db)
}
Expand All @@ -429,6 +435,7 @@
db: db,
tls: tls,
backend: wrapper,
pdCli: pdCli,

Check warning on line 438 in br/pkg/lightning/importer/import.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/importer/import.go#L438

Added line #L438 was not covered by tests
}
preInfoGetter, err := NewPreImportInfoGetter(
cfg,
Expand Down Expand Up @@ -458,6 +465,7 @@
pauser: p.Pauser,
engineMgr: backend.MakeEngineManager(backendObj),
backend: backendObj,
pdCli: pdCli,

Check warning on line 468 in br/pkg/lightning/importer/import.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/importer/import.go#L468

Added line #L468 was not covered by tests
db: db,
sysVars: common.DefaultImportantVariables,
tls: tls,
Expand All @@ -482,7 +490,7 @@
preInfoGetter: preInfoGetter,
precheckItemBuilder: preCheckBuilder,
encBuilder: encodingBuilder,
tikvModeSwitcher: local.NewTiKVModeSwitcher(tls, cfg.TiDB.PdAddr, log.FromContext(ctx).Logger),
tikvModeSwitcher: local.NewTiKVModeSwitcher(tls, pdCli, log.FromContext(ctx).Logger),

Check warning on line 493 in br/pkg/lightning/importer/import.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/importer/import.go#L493

Added line #L493 was not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

init another client for it? odd to have an object with close but not called

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think creating two pd client is not effient and a bit confusing. pd has a background goroutine to update members.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can add a comment here, easy to be taken as a bug. and i think it's a bad practice to rely on tikvModeSwitcher only closes pd-client in it's Close method, and does nothing else. it's easy to forget in later change.


keyspaceName: p.KeyspaceName,
resourceGroupName: p.ResourceGroupName,
Expand All @@ -495,6 +503,9 @@
func (rc *Controller) Close() {
rc.backend.Close()
_ = rc.db.Close()
if rc.pdCli != nil {
rc.pdCli.Close()
}

Check warning on line 508 in br/pkg/lightning/importer/import.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/importer/import.go#L507-L508

Added lines #L507 - L508 were not covered by tests
}

// Run starts the restore task.
Expand Down Expand Up @@ -1870,7 +1881,7 @@
}

func (rc *Controller) doCompact(ctx context.Context, level int32) error {
tls := rc.tls.WithHost(rc.cfg.TiDB.PdAddr)
tls := rc.tls.WithHost(rc.pdCli.GetLeaderAddr())

Check warning on line 1884 in br/pkg/lightning/importer/import.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/importer/import.go#L1884

Added line #L1884 was not covered by tests
return tikv.ForAllStores(
ctx,
tls,
Expand Down
6 changes: 4 additions & 2 deletions br/pkg/lightning/importer/precheck.go
Expand Up @@ -9,6 +9,7 @@
ropts "github.com/pingcap/tidb/br/pkg/lightning/importer/opts"
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/br/pkg/lightning/precheck"
pd "github.com/tikv/pd/client"
)

type precheckContextKey string
Expand All @@ -29,7 +30,8 @@
}

// NewPrecheckItemBuilderFromConfig creates a new PrecheckItemBuilder from config
func NewPrecheckItemBuilderFromConfig(ctx context.Context, cfg *config.Config, opts ...ropts.PrecheckItemBuilderOption) (*PrecheckItemBuilder, error) {
// pdCli **must not** be nil for local backend
func NewPrecheckItemBuilderFromConfig(ctx context.Context, cfg *config.Config, pdCli pd.Client, opts ...ropts.PrecheckItemBuilderOption) (*PrecheckItemBuilder, error) {

Check warning on line 34 in br/pkg/lightning/importer/precheck.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/importer/precheck.go#L34

Added line #L34 was not covered by tests
var gerr error
builderCfg := new(ropts.PrecheckItemBuilderConfig)
for _, o := range opts {
Expand All @@ -39,7 +41,7 @@
if err != nil {
return nil, errors.Trace(err)
}
targetInfoGetter, err := NewTargetInfoGetterImpl(cfg, targetDB)
targetInfoGetter, err := NewTargetInfoGetterImpl(cfg, targetDB, pdCli)

Check warning on line 44 in br/pkg/lightning/importer/precheck.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/importer/precheck.go#L44

Added line #L44 was not covered by tests
if err != nil {
return nil, errors.Trace(err)
}
Expand Down