Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: always get latest PD leader when access PD after initialized (#46726) #46758

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions br/pkg/lightning/importer/check_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ func TestCheckCSVHeader(t *testing.T) {
dbMetas,
preInfoGetter,
nil,
nil,
)
preInfoGetter.dbInfosCache = rc.dbInfos
err = rc.checkCSVHeader(ctx)
Expand Down Expand Up @@ -465,6 +466,7 @@ func TestCheckTableEmpty(t *testing.T) {
dbMetas,
preInfoGetter,
nil,
nil,
)

rc := &Controller{
Expand Down Expand Up @@ -622,6 +624,7 @@ func TestLocalResource(t *testing.T) {
nil,
preInfoGetter,
nil,
nil,
)
rc := &Controller{
cfg: cfg,
Expand Down
17 changes: 12 additions & 5 deletions br/pkg/lightning/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ func NewImportControllerWithPauser(
}

preCheckBuilder := NewPrecheckItemBuilder(
cfg, p.DBMetas, preInfoGetter, cpdb,
cfg, p.DBMetas, preInfoGetter, cpdb, pdCli,
)

rc := &Controller{
Expand Down Expand Up @@ -488,6 +488,8 @@ func (rc *Controller) Close() {

// Run starts the restore task.
func (rc *Controller) Run(ctx context.Context) error {
failpoint.Inject("beforeRun", func() {})

opts := []func(context.Context) error{
rc.setGlobalVariables,
rc.restoreSchema,
Expand Down Expand Up @@ -1395,7 +1397,7 @@ const (

func (rc *Controller) keepPauseGCForDupeRes(ctx context.Context) (<-chan struct{}, error) {
tlsOpt := rc.tls.ToPDSecurityOption()
pdCli, err := pd.NewClientWithContext(ctx, []string{rc.cfg.TiDB.PdAddr}, tlsOpt)
pdCli, err := pd.NewClientWithContext(ctx, []string{rc.pdCli.GetLeaderAddr()}, tlsOpt)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -1556,8 +1558,13 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {
}

// Disable GC because TiDB enables GC already.

currentLeaderAddr := rc.pdCli.GetLeaderAddr()
// remove URL scheme
currentLeaderAddr = strings.TrimPrefix(currentLeaderAddr, "http://")
currentLeaderAddr = strings.TrimPrefix(currentLeaderAddr, "https://")
kvStore, err = driver.TiKVDriver{}.OpenWithOptions(
fmt.Sprintf("tikv://%s?disableGC=true&keyspaceName=%s", rc.cfg.TiDB.PdAddr, rc.keyspaceName),
fmt.Sprintf("tikv://%s?disableGC=true&keyspaceName=%s", currentLeaderAddr, rc.keyspaceName),
driver.WithSecurity(rc.tls.ToTiKVSecurityConfig()),
)
if err != nil {
Expand Down Expand Up @@ -1769,7 +1776,7 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {
}

func (rc *Controller) registerTaskToPD(ctx context.Context) (undo func(), _ error) {
etcdCli, err := dialEtcdWithCfg(ctx, rc.cfg)
etcdCli, err := dialEtcdWithCfg(ctx, rc.cfg, rc.pdCli.GetLeaderAddr())
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -2181,7 +2188,7 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error {
rc.status.TotalFileSize.Store(estimatedSizeResult.SizeWithoutIndex)
}
if isLocalBackend(rc.cfg) {
pdController, err := pdutil.NewPdController(ctx, rc.cfg.TiDB.PdAddr,
pdController, err := pdutil.NewPdController(ctx, rc.pdCli.GetLeaderAddr(),
rc.tls.TLSConfig(), rc.tls.ToPDSecurityOption())
if err != nil {
return common.NormalizeOrWrapErr(common.ErrCreatePDClient, err)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/importer/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func TestPreCheckFailed(t *testing.T) {
dbMetas: make([]*mydump.MDDatabaseMeta, 0),
}
cpdb := panicCheckpointDB{}
theCheckBuilder := NewPrecheckItemBuilder(cfg, make([]*mydump.MDDatabaseMeta, 0), preInfoGetter, cpdb)
theCheckBuilder := NewPrecheckItemBuilder(cfg, make([]*mydump.MDDatabaseMeta, 0), preInfoGetter, cpdb, nil)
ctl := &Controller{
cfg: cfg,
saveCpCh: make(chan saveCp),
Expand Down
37 changes: 26 additions & 11 deletions br/pkg/lightning/importer/precheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,21 @@ func WithPrecheckKey(ctx context.Context, key precheckContextKey, val any) conte

// PrecheckItemBuilder is used to build precheck items
type PrecheckItemBuilder struct {
cfg *config.Config
dbMetas []*mydump.MDDatabaseMeta
preInfoGetter PreImportInfoGetter
checkpointsDB checkpoints.DB
cfg *config.Config
dbMetas []*mydump.MDDatabaseMeta
preInfoGetter PreImportInfoGetter
checkpointsDB checkpoints.DB
pdLeaderAddrGetter func() string
}

// NewPrecheckItemBuilderFromConfig creates a new PrecheckItemBuilder from config
// pdCli **must not** be nil for local backend
func NewPrecheckItemBuilderFromConfig(ctx context.Context, cfg *config.Config, pdCli pd.Client, opts ...ropts.PrecheckItemBuilderOption) (*PrecheckItemBuilder, error) {
func NewPrecheckItemBuilderFromConfig(
ctx context.Context,
cfg *config.Config,
pdCli pd.Client,
opts ...ropts.PrecheckItemBuilderOption,
) (*PrecheckItemBuilder, error) {
var gerr error
builderCfg := new(ropts.PrecheckItemBuilderConfig)
for _, o := range opts {
Expand Down Expand Up @@ -71,7 +77,7 @@ func NewPrecheckItemBuilderFromConfig(ctx context.Context, cfg *config.Config, p
if err != nil {
return nil, errors.Trace(err)
}
return NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, cpdb), gerr
return NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, cpdb, pdCli), gerr
}

// NewPrecheckItemBuilder creates a new PrecheckItemBuilder
Expand All @@ -80,12 +86,21 @@ func NewPrecheckItemBuilder(
dbMetas []*mydump.MDDatabaseMeta,
preInfoGetter PreImportInfoGetter,
checkpointsDB checkpoints.DB,
pdCli pd.Client,
) *PrecheckItemBuilder {
leaderAddrGetter := func() string {
return cfg.TiDB.PdAddr
}
// in tests we may not have a pdCli
if pdCli != nil {
leaderAddrGetter = pdCli.GetLeaderAddr
}
return &PrecheckItemBuilder{
cfg: cfg,
dbMetas: dbMetas,
preInfoGetter: preInfoGetter,
checkpointsDB: checkpointsDB,
cfg: cfg,
dbMetas: dbMetas,
preInfoGetter: preInfoGetter,
checkpointsDB: checkpointsDB,
pdLeaderAddrGetter: leaderAddrGetter,
}
}

Expand Down Expand Up @@ -117,7 +132,7 @@ func (b *PrecheckItemBuilder) BuildPrecheckItem(checkID precheck.CheckItemID) (p
case precheck.CheckLocalTempKVDir:
return NewLocalTempKVDirCheckItem(b.cfg, b.preInfoGetter, b.dbMetas), nil
case precheck.CheckTargetUsingCDCPITR:
return NewCDCPITRCheckItem(b.cfg), nil
return NewCDCPITRCheckItem(b.cfg, b.pdLeaderAddrGetter), nil
default:
return nil, errors.Errorf("unsupported check item: %v", checkID)
}
Expand Down
22 changes: 14 additions & 8 deletions br/pkg/lightning/importer/precheck_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,17 +746,19 @@ func (ci *checkpointCheckItem) checkpointIsValid(ctx context.Context, tableInfo
// CDCPITRCheckItem check downstream has enabled CDC or PiTR. It's exposed to let
// caller override the Instruction message.
type CDCPITRCheckItem struct {
cfg *config.Config
Instruction string
cfg *config.Config
Instruction string
leaderAddrGetter func() string
// used in test
etcdCli *clientv3.Client
}

// NewCDCPITRCheckItem creates a checker to check downstream has enabled CDC or PiTR.
func NewCDCPITRCheckItem(cfg *config.Config) precheck.Checker {
func NewCDCPITRCheckItem(cfg *config.Config, leaderAddrGetter func() string) precheck.Checker {
return &CDCPITRCheckItem{
cfg: cfg,
Instruction: "local backend is not compatible with them. Please switch to tidb backend then try again.",
cfg: cfg,
Instruction: "local backend is not compatible with them. Please switch to tidb backend then try again.",
leaderAddrGetter: leaderAddrGetter,
}
}

Expand All @@ -765,7 +767,11 @@ func (*CDCPITRCheckItem) GetCheckItemID() precheck.CheckItemID {
return precheck.CheckTargetUsingCDCPITR
}

func dialEtcdWithCfg(ctx context.Context, cfg *config.Config) (*clientv3.Client, error) {
func dialEtcdWithCfg(
ctx context.Context,
cfg *config.Config,
leaderAddr string,
) (*clientv3.Client, error) {
cfg2, err := cfg.ToTLS()
if err != nil {
return nil, err
Expand All @@ -774,7 +780,7 @@ func dialEtcdWithCfg(ctx context.Context, cfg *config.Config) (*clientv3.Client,

return clientv3.New(clientv3.Config{
TLS: tlsConfig,
Endpoints: []string{cfg.TiDB.PdAddr},
Endpoints: []string{leaderAddr},
AutoSyncInterval: 30 * time.Second,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{
Expand All @@ -801,7 +807,7 @@ func (ci *CDCPITRCheckItem) Check(ctx context.Context) (*precheck.CheckResult, e

if ci.etcdCli == nil {
var err error
ci.etcdCli, err = dialEtcdWithCfg(ctx, ci.cfg)
ci.etcdCli, err = dialEtcdWithCfg(ctx, ci.cfg, ci.leaderAddrGetter())
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/importer/precheck_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ func (s *precheckImplSuite) TestCDCPITRCheckItem() {
Backend: config.BackendLocal,
},
}
ci := NewCDCPITRCheckItem(cfg)
ci := NewCDCPITRCheckItem(cfg, nil)
checker := ci.(*CDCPITRCheckItem)
checker.etcdCli = testEtcdCluster.RandClient()
result, err := ci.Check(ctx)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/importer/precheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestPrecheckBuilderBasic(t *testing.T) {

preInfoGetter, err := NewPreImportInfoGetter(cfg, mockSrc.GetAllDBFileMetas(), mockSrc.GetStorage(), mockTarget, nil, nil)
require.NoError(t, err)
theCheckBuilder := NewPrecheckItemBuilder(cfg, mockSrc.GetAllDBFileMetas(), preInfoGetter, nil)
theCheckBuilder := NewPrecheckItemBuilder(cfg, mockSrc.GetAllDBFileMetas(), preInfoGetter, nil, nil)
for _, checkItemID := range []precheck.CheckItemID{
precheck.CheckLargeDataFile,
precheck.CheckSourcePermission,
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/lightning/importer/table_import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1193,7 +1193,7 @@ func (s *tableRestoreSuite) TestCheckClusterResource() {
targetInfoGetter: targetInfoGetter,
srcStorage: mockStore,
}
theCheckBuilder := NewPrecheckItemBuilder(cfg, []*mydump.MDDatabaseMeta{}, preInfoGetter, nil)
theCheckBuilder := NewPrecheckItemBuilder(cfg, []*mydump.MDDatabaseMeta{}, preInfoGetter, nil, nil)
rc := &Controller{
cfg: cfg,
tls: tls,
Expand Down Expand Up @@ -1352,7 +1352,7 @@ func (s *tableRestoreSuite) TestCheckClusterRegion() {
targetInfoGetter: targetInfoGetter,
dbMetas: dbMetas,
}
theCheckBuilder := NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, checkpoints.NewNullCheckpointsDB())
theCheckBuilder := NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, checkpoints.NewNullCheckpointsDB(), nil)
rc := &Controller{
cfg: cfg,
tls: tls,
Expand Down Expand Up @@ -1448,7 +1448,7 @@ func (s *tableRestoreSuite) TestCheckHasLargeCSV() {
for _, ca := range cases {
template := NewSimpleTemplate()
cfg := &config.Config{Mydumper: config.MydumperRuntime{StrictFormat: ca.strictFormat}}
theCheckBuilder := NewPrecheckItemBuilder(cfg, ca.dbMetas, nil, nil)
theCheckBuilder := NewPrecheckItemBuilder(cfg, ca.dbMetas, nil, nil, nil)
rc := &Controller{
cfg: cfg,
checkTemplate: template,
Expand Down
Loading