Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

linter: support lll to avoid too long line #42718

Merged
merged 108 commits into from
Apr 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
108 commits
Select commit Hold shift + click to select a range
eca2b7f
linter: support lll to avoid too long line
hawkingrei Mar 31, 2023
e639060
linter: support lll to avoid too long line
hawkingrei Mar 31, 2023
c106d31
linter: support lll to avoid too long line
hawkingrei Mar 31, 2023
823812e
linter: support lll to avoid too long line
hawkingrei Mar 31, 2023
8a76e22
linter: support lll to avoid too long line
hawkingrei Mar 31, 2023
c763225
linter: support lll to avoid too long line
hawkingrei Mar 31, 2023
9e01ca9
fix
hawkingrei Mar 31, 2023
99e8359
fix
hawkingrei Mar 31, 2023
7d6fc1d
fix
hawkingrei Mar 31, 2023
1688ab6
fix
hawkingrei Mar 31, 2023
290297b
fix
hawkingrei Mar 31, 2023
36ded1d
fix
hawkingrei Mar 31, 2023
74031fc
fix
hawkingrei Mar 31, 2023
79ed61b
fix
hawkingrei Apr 1, 2023
f29e7a6
fix
hawkingrei Apr 1, 2023
fe9052d
update
hawkingrei Apr 1, 2023
82950f4
update
hawkingrei Apr 1, 2023
8f4646f
update
hawkingrei Apr 1, 2023
eb1e25b
update
hawkingrei Apr 1, 2023
55e3988
update
hawkingrei Apr 1, 2023
56900e9
update
hawkingrei Apr 1, 2023
662df1a
update
hawkingrei Apr 1, 2023
c6cd12e
update
hawkingrei Apr 2, 2023
788f745
update
hawkingrei Apr 3, 2023
a682b9a
*: fix unstable test TestPoolTuneScaleUpAndDown
hawkingrei Apr 3, 2023
a05edf9
update
hawkingrei Apr 3, 2023
e633064
update
hawkingrei Apr 3, 2023
894f1aa
update
hawkingrei Apr 3, 2023
e9e43db
update
hawkingrei Apr 3, 2023
6450625
update
hawkingrei Apr 3, 2023
7d8dbf4
bazel: update config
hawkingrei Apr 3, 2023
4c1cd07
update
hawkingrei Apr 3, 2023
0455a95
update
hawkingrei Apr 3, 2023
3a5cdeb
update
hawkingrei Apr 3, 2023
888744b
update
hawkingrei Apr 3, 2023
867550b
update
hawkingrei Apr 3, 2023
0d8eb8d
update
hawkingrei Apr 4, 2023
c19f5b1
update
hawkingrei Apr 6, 2023
1f14747
update
hawkingrei Apr 6, 2023
88cbffa
update bazel
hawkingrei Apr 6, 2023
b199039
update bazel
hawkingrei Apr 6, 2023
f469216
update
hawkingrei Apr 6, 2023
002a9f4
update
hawkingrei Apr 6, 2023
90460fe
update
hawkingrei Apr 6, 2023
c478098
update
hawkingrei Apr 6, 2023
27b9f2f
update
hawkingrei Apr 6, 2023
e181c9a
update
hawkingrei Apr 6, 2023
be19e38
update
hawkingrei Apr 6, 2023
9bccd4e
update
hawkingrei Apr 6, 2023
fa75bb1
update
hawkingrei Apr 6, 2023
c39a969
update
hawkingrei Apr 6, 2023
c178f94
update
hawkingrei Apr 6, 2023
d4c6da8
update
hawkingrei Apr 6, 2023
34b1559
update
hawkingrei Apr 6, 2023
de69bb8
update
hawkingrei Apr 6, 2023
ee7c725
update
hawkingrei Apr 6, 2023
c32a955
update
hawkingrei Apr 6, 2023
cb22341
update
hawkingrei Apr 7, 2023
2315d4e
update
hawkingrei Apr 7, 2023
863d4b0
update
hawkingrei Apr 7, 2023
0113ce0
update
hawkingrei Apr 7, 2023
4282a8d
update
hawkingrei Apr 7, 2023
66c3218
update
hawkingrei Apr 7, 2023
78d5b2e
update
hawkingrei Apr 7, 2023
49cd5b5
update
hawkingrei Apr 7, 2023
ceead3d
update
hawkingrei Apr 7, 2023
6504213
update
hawkingrei Apr 7, 2023
33159e3
update
hawkingrei Apr 7, 2023
d401a11
update
hawkingrei Apr 7, 2023
d31cd90
update
hawkingrei Apr 7, 2023
9ee0a3c
update
hawkingrei Apr 7, 2023
9b2ba50
update
hawkingrei Apr 7, 2023
e8e5dd8
update
hawkingrei Apr 7, 2023
ac205a5
update
hawkingrei Apr 7, 2023
d6a52b4
update
hawkingrei Apr 7, 2023
8bedce1
update
hawkingrei Apr 7, 2023
f9d4148
update
hawkingrei Apr 7, 2023
5a1f113
update
hawkingrei Apr 7, 2023
8886675
update
hawkingrei Apr 7, 2023
870f018
update
hawkingrei Apr 7, 2023
e6bc022
update
hawkingrei Apr 7, 2023
5638784
update
hawkingrei Apr 7, 2023
1738cae
update
hawkingrei Apr 9, 2023
8c17bd6
save
hawkingrei Apr 10, 2023
8f0ca03
save
hawkingrei Apr 10, 2023
a3785d8
save
hawkingrei Apr 10, 2023
2bf8c6d
save
hawkingrei Apr 10, 2023
dd7931d
save
hawkingrei Apr 10, 2023
7bd1481
save
hawkingrei Apr 10, 2023
02a4c7a
save
hawkingrei Apr 10, 2023
070e449
save
hawkingrei Apr 10, 2023
c68f73c
save
hawkingrei Apr 10, 2023
6a6aaa3
save
hawkingrei Apr 10, 2023
8de4a5c
save
hawkingrei Apr 10, 2023
be8ef85
save
hawkingrei Apr 10, 2023
e8779fa
save
hawkingrei Apr 10, 2023
be5d421
save
hawkingrei Apr 10, 2023
0e5beff
save
hawkingrei Apr 10, 2023
9cf8815
save
hawkingrei Apr 10, 2023
de2acc8
save
hawkingrei Apr 10, 2023
b689015
save
hawkingrei Apr 10, 2023
49a6ec1
save
hawkingrei Apr 10, 2023
643725b
save
hawkingrei Apr 10, 2023
b17a805
save
hawkingrei Apr 10, 2023
cc53961
save
hawkingrei Apr 10, 2023
0c65dce
*: fix data race in the baseTxnContextProvider.ActivateTxn
hawkingrei Apr 12, 2023
f890022
*: fix data race in the baseTxnContextProvider.ActivateTxn
hawkingrei Apr 12, 2023
d82d5f9
update
hawkingrei Apr 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion br/cmd/br/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,8 @@ func setPDConfigCommand() *cobra.Command {
return errors.Trace(err)
}

mgr, err := task.NewMgr(ctx, tidbGlue, cfg.PD, cfg.TLS, task.GetKeepalive(&cfg), cfg.CheckRequirements, false, conn.NormalVersionChecker)
mgr, err := task.NewMgr(ctx, tidbGlue, cfg.PD, cfg.TLS, task.GetKeepalive(&cfg),
cfg.CheckRequirements, false, conn.NormalVersionChecker)
if err != nil {
return errors.Trace(err)
}
Expand Down
7 changes: 4 additions & 3 deletions br/cmd/br/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,10 @@ func newStreamCheckCommand() *cobra.Command {

func newStreamAdvancerCommand() *cobra.Command {
command := &cobra.Command{
Use: "advancer",
Short: "Start a central worker for advancing the checkpoint. (only for debuging, this subcommand should be integrated to TiDB)",
Args: cobra.NoArgs,
Use: "advancer",
Short: "Start a central worker for advancing the checkpoint. " +
"(only for debuging, this subcommand should be integrated to TiDB)",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
return streamCommand(cmd, task.StreamCtl)
},
Expand Down
27 changes: 18 additions & 9 deletions br/pkg/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ type CheckpointRunner struct {
}

// only for test
func StartCheckpointRunnerForTest(ctx context.Context, storage storage.ExternalStorage, cipher *backuppb.CipherInfo, tick time.Duration, timer GlobalTimer) (*CheckpointRunner, error) {
func StartCheckpointRunnerForTest(ctx context.Context, storage storage.ExternalStorage,
cipher *backuppb.CipherInfo, tick time.Duration, timer GlobalTimer) (*CheckpointRunner, error) {
runner := &CheckpointRunner{
meta: make(map[string]*RangeGroups),

Expand All @@ -269,7 +270,8 @@ func StartCheckpointRunnerForTest(ctx context.Context, storage storage.ExternalS
return runner, nil
}

func StartCheckpointRunner(ctx context.Context, storage storage.ExternalStorage, cipher *backuppb.CipherInfo, timer GlobalTimer) (*CheckpointRunner, error) {
func StartCheckpointRunner(ctx context.Context, storage storage.ExternalStorage,
cipher *backuppb.CipherInfo, timer GlobalTimer) (*CheckpointRunner, error) {
runner := &CheckpointRunner{
meta: make(map[string]*RangeGroups),

Expand All @@ -293,7 +295,8 @@ func StartCheckpointRunner(ctx context.Context, storage storage.ExternalStorage,
return runner, nil
}

func (r *CheckpointRunner) FlushChecksum(ctx context.Context, tableID int64, crc64xor uint64, totalKvs uint64, totalBytes uint64, timeCost float64) error {
func (r *CheckpointRunner) FlushChecksum(ctx context.Context, tableID int64, crc64xor uint64,
totalKvs uint64, totalBytes uint64, timeCost float64) error {
return r.checksumRunner.FlushChecksum(ctx, r.storage, tableID, crc64xor, totalKvs, totalBytes, timeCost)
}

Expand Down Expand Up @@ -405,7 +408,8 @@ func (r *CheckpointRunner) sendError(err error) {
r.checksumRunner.RecordError(err)
}

func (r *CheckpointRunner) startCheckpointLoop(ctx context.Context, tickDurationForFlush, tickDurationForLock time.Duration) {
func (r *CheckpointRunner) startCheckpointLoop(ctx context.Context, tickDurationForFlush,
tickDurationForLock time.Duration) {
r.wg.Add(1)
checkpointLoop := func(ctx context.Context) {
defer r.wg.Done()
Expand Down Expand Up @@ -557,7 +561,8 @@ func (r *CheckpointRunner) flushLock(ctx context.Context, p int64) error {
LockId: r.lockId,
ExpireAt: p + lockTimeToLive.Milliseconds(),
}
log.Info("start to flush the checkpoint lock", zap.Int64("lock-at", p), zap.Int64("expire-at", lock.ExpireAt))
log.Info("start to flush the checkpoint lock", zap.Int64("lock-at", p),
zap.Int64("expire-at", lock.ExpireAt))
data, err := json.Marshal(lock)
if err != nil {
return errors.Trace(err)
Expand All @@ -584,12 +589,15 @@ func (r *CheckpointRunner) checkLockFile(ctx context.Context, now int64) error {
"Please check whether the BR is running. If not, you can retry.", lock.LockId, r.lockId)
}
if lock.LockId == r.lockId {
log.Warn("The lock has expired.", zap.Int64("expire-at(ms)", lock.ExpireAt), zap.Int64("now(ms)", now))
log.Warn("The lock has expired.",
zap.Int64("expire-at(ms)", lock.ExpireAt), zap.Int64("now(ms)", now))
}
} else if lock.LockId != r.lockId {
return errors.Errorf("The existing lock will expire in %d seconds. "+
"There may be another BR(%d) running. If not, you can wait for the lock to expire, or delete the file `%s%s` manually.",
(lock.ExpireAt-now)/1000, lock.LockId, strings.TrimRight(r.storage.URI(), "/"), CheckpointLockPath)
"There may be another BR(%d) running. If not, you can wait for the lock to expire, "+
"or delete the file `%s%s` manually.",
(lock.ExpireAt-now)/1000, lock.LockId,
strings.TrimRight(r.storage.URI(), "/"), CheckpointLockPath)
}

return nil
Expand Down Expand Up @@ -635,7 +643,8 @@ func (r *CheckpointRunner) initialLock(ctx context.Context) error {

// walk the whole checkpoint range files and retrieve the metadatat of backed up ranges
// and return the total time cost in the past executions
func WalkCheckpointFile(ctx context.Context, s storage.ExternalStorage, cipher *backuppb.CipherInfo, fn func(groupKey string, rg *rtree.Range)) (time.Duration, error) {
func WalkCheckpointFile(ctx context.Context, s storage.ExternalStorage, cipher *backuppb.CipherInfo,
fn func(groupKey string, rg *rtree.Range)) (time.Duration, error) {
// records the total time cost in the past executions
var pastDureTime time.Duration = 0
err := s.WalkDir(ctx, &storage.WalkOption{SubDir: CheckpointDataDir}, func(path string, size int64) error {
Expand Down
9 changes: 6 additions & 3 deletions br/pkg/checksum/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,10 @@ func buildIndexRequest(
var rule *tipb.ChecksumRewriteRule
if oldIndexInfo != nil {
rule = &tipb.ChecksumRewriteRule{
OldPrefix: append(append([]byte{}, oldKeyspace...), tablecodec.EncodeTableIndexPrefix(oldTableID, oldIndexInfo.ID)...),
NewPrefix: append(append([]byte{}, newKeyspace...), tablecodec.EncodeTableIndexPrefix(tableID, indexInfo.ID)...),
OldPrefix: append(append([]byte{}, oldKeyspace...),
tablecodec.EncodeTableIndexPrefix(oldTableID, oldIndexInfo.ID)...),
NewPrefix: append(append([]byte{}, newKeyspace...),
tablecodec.EncodeTableIndexPrefix(tableID, indexInfo.ID)...),
}
}
checksum := &tipb.ChecksumRequest{
Expand Down Expand Up @@ -332,7 +334,8 @@ func (exec *Executor) Execute(
updateFn func(),
) (*tipb.ChecksumResponse, error) {
checksumResp := &tipb.ChecksumResponse{}
checksumBackoffer := utils.InitialRetryState(utils.ChecksumRetryTime, utils.ChecksumWaitInterval, utils.ChecksumMaxWaitInterval)
checksumBackoffer := utils.InitialRetryState(utils.ChecksumRetryTime,
utils.ChecksumWaitInterval, utils.ChecksumMaxWaitInterval)
for _, req := range exec.reqs {
// Pointer to SessionVars.Killed
// Killed is a flag to indicate that this query is killed.
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ func (mgr *Mgr) GetTS(ctx context.Context) (uint64, error) {
return oracle.ComposeTS(p, l), nil
}

// GetMergeRegionSizeAndCount returns the tikv config `coprocessor.region-split-size` and `coprocessor.region-split-key`.
// GetMergeRegionSizeAndCount returns the tikv config
// `coprocessor.region-split-size` and `coprocessor.region-split-key`.
// returns the default config when failed.
func (mgr *Mgr) GetMergeRegionSizeAndCount(ctx context.Context, client *http.Client) (uint64, uint64) {
regionSplitSize := DefaultMergeRegionSizeBytes
Expand Down
6 changes: 4 additions & 2 deletions br/pkg/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ type Session interface {
Execute(ctx context.Context, sql string) error
ExecuteInternal(ctx context.Context, sql string, args ...interface{}) error
CreateDatabase(ctx context.Context, schema *model.DBInfo) error
CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error
CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo,
cs ...ddl.CreateTableWithInfoConfigurier) error
CreatePlacementPolicy(ctx context.Context, policy *model.PolicyInfo) error
Close()
GetGlobalVariable(name string) (string, error)
Expand All @@ -52,7 +53,8 @@ type Session interface {

// BatchCreateTableSession is an interface to batch create table parallelly
type BatchCreateTableSession interface {
CreateTables(ctx context.Context, tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error
CreateTables(ctx context.Context, tables map[string][]*model.TableInfo,
cs ...ddl.CreateTableWithInfoConfigurier) error
}

// Progress is an interface recording the current execution progress.
Expand Down
20 changes: 13 additions & 7 deletions br/pkg/glue/progressing.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,13 @@ func (ops ConsoleOperations) StartProgressBar(title string, total int, extraFiel
return ops.startProgressBarOverTTY(title, total, extraFields...)
}

func (ops ConsoleOperations) startProgressBarOverDummy(title string, total int, extraFields ...ExtraField) ProgressWaiter {
func (ops ConsoleOperations) startProgressBarOverDummy(title string, total int,
extraFields ...ExtraField) ProgressWaiter {
return noOPWaiter{utils.StartProgress(context.TODO(), title, int64(total), true, nil)}
}

func (ops ConsoleOperations) startProgressBarOverTTY(title string, total int, extraFields ...ExtraField) ProgressWaiter {
func (ops ConsoleOperations) startProgressBarOverTTY(title string, total int,
extraFields ...ExtraField) ProgressWaiter {
pb := mpb.New(mpb.WithOutput(ops.Out()), mpb.WithRefreshRate(400*time.Millisecond))
bar := adjustTotal(pb, title, total, extraFields...)

Expand All @@ -142,7 +144,8 @@ func buildProgressBar(pb *mpb.Progress, title string, total int, extraFields ...
greenTitle := color.GreenString(title)
return pb.New(int64(total),
// Play as if the old BR style.
mpb.BarStyle().Lbound("<").Filler("-").Padding(".").Rbound(">").Tip("-", "\\", "|", "/", "-").TipOnComplete("-"),
mpb.BarStyle().Lbound("<").Filler("-").Padding(".").Rbound(">").
Tip("-", "\\", "|", "/", "-").TipOnComplete("-"),
mpb.BarFillerMiddleware(func(bf mpb.BarFiller) mpb.BarFiller {
return mpb.BarFillerFunc(func(w io.Writer, reqWidth int, stat decor.Statistics) {
if stat.Aborted || stat.Completed {
Expand All @@ -151,19 +154,22 @@ func buildProgressBar(pb *mpb.Progress, title string, total int, extraFields ...
bf.Fill(w, reqWidth, stat)
})
}),
mpb.PrependDecorators(decor.OnAbort(decor.OnComplete(decor.Name(greenTitle), fmt.Sprintf("%s ::", title)), fmt.Sprintf("%s ::", title))),
mpb.AppendDecorators(decor.OnAbort(decor.Any(cbOnComplete(decor.NewPercentage("%02.2f"), printFinalMessage(extraFields))), color.RedString("ABORTED"))),
mpb.PrependDecorators(decor.OnAbort(decor.OnComplete(decor.Name(greenTitle),
fmt.Sprintf("%s ::", title)), fmt.Sprintf("%s ::", title))),
mpb.AppendDecorators(decor.OnAbort(decor.Any(cbOnComplete(decor.NewPercentage("%02.2f"),
printFinalMessage(extraFields))), color.RedString("ABORTED"))),
)
}

var (
spinnerDoneText string = fmt.Sprintf("... %s", color.GreenString("DONE"))
spinnerDoneText = fmt.Sprintf("... %s", color.GreenString("DONE"))
)

func buildOneTaskBar(pb *mpb.Progress, title string, total int) *mpb.Bar {
return pb.New(int64(total),
mpb.NopStyle(),
mpb.PrependDecorators(decor.Name(title)),
mpb.AppendDecorators(decor.OnAbort(decor.OnComplete(decor.Spinner(spinnerText), spinnerDoneText), color.RedString("ABORTED"))),
mpb.AppendDecorators(decor.OnAbort(decor.OnComplete(decor.Spinner(spinnerText), spinnerDoneText),
color.RedString("ABORTED"))),
)
}
26 changes: 16 additions & 10 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ func (gs *tidbSession) ExecuteInternal(ctx context.Context, sql string, args ...
defer rs.Close()
c := rs.NewChunk(nil)
if err := rs.Next(ctx, c); err != nil {
log.Warn("Error during draining result of internal sql.", logutil.Redact(zap.String("sql", sql)), logutil.ShortError(err))
log.Warn("Error during draining result of internal sql.",
logutil.Redact(zap.String("sql", sql)), logutil.ShortError(err))
return nil
}
}
Expand Down Expand Up @@ -224,11 +225,12 @@ func (gs *tidbSession) CreatePlacementPolicy(ctx context.Context, policy *model.
// SplitBatchCreateTable provide a way to split batch into small batch when batch size is large than 6 MB.
// The raft entry has limit size of 6 MB, a batch of CreateTables may hit this limitation
// TODO: shall query string be set for each split batch create, it looks does not matter if we set once for all.
func (gs *tidbSession) SplitBatchCreateTable(schema model.CIStr, infos []*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
func (gs *tidbSession) SplitBatchCreateTable(schema model.CIStr,
infos []*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
var err error
d := domain.GetDomain(gs.se).DDL()

if err = d.BatchCreateTableWithInfo(gs.se, schema, infos, append(cs, ddl.OnExistIgnore)...); kv.ErrEntryTooLarge.Equal(err) {
err = d.BatchCreateTableWithInfo(gs.se, schema, infos, append(cs, ddl.OnExistIgnore)...)
if kv.ErrEntryTooLarge.Equal(err) {
log.Info("entry too large, split batch create table", zap.Int("num table", len(infos)))
if len(infos) == 1 {
return err
Expand All @@ -248,7 +250,8 @@ func (gs *tidbSession) SplitBatchCreateTable(schema model.CIStr, infos []*model.
}

// CreateTables implements glue.BatchCreateTableSession.
func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
func (gs *tidbSession) CreateTables(_ context.Context,
tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
var dbName model.CIStr

// Disable foreign key check when batch create tables.
Expand Down Expand Up @@ -289,7 +292,8 @@ func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*mo
}

// CreateTable implements glue.Session.
func (gs *tidbSession) CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
func (gs *tidbSession) CreateTable(_ context.Context, dbName model.CIStr,
table *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
d := domain.GetDomain(gs.se).DDL()
query, err := gs.showCreateTable(table)
if err != nil {
Expand Down Expand Up @@ -385,25 +389,27 @@ func (s *mockSession) ExecuteInternal(ctx context.Context, sql string, args ...i
}

// CreateDatabase implements glue.Session.
func (s *mockSession) CreateDatabase(ctx context.Context, schema *model.DBInfo) error {
func (*mockSession) CreateDatabase(_ context.Context, _ *model.DBInfo) error {
log.Fatal("unimplemented CreateDatabase for mock session")
return nil
}

// CreatePlacementPolicy implements glue.Session.
func (s *mockSession) CreatePlacementPolicy(ctx context.Context, policy *model.PolicyInfo) error {
func (*mockSession) CreatePlacementPolicy(_ context.Context, _ *model.PolicyInfo) error {
log.Fatal("unimplemented CreateDatabase for mock session")
return nil
}

// CreateTables implements glue.BatchCreateTableSession.
func (s *mockSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
func (*mockSession) CreateTables(_ context.Context, _ map[string][]*model.TableInfo,
_ ...ddl.CreateTableWithInfoConfigurier) error {
log.Fatal("unimplemented CreateDatabase for mock session")
return nil
}

// CreateTable implements glue.Session.
func (s *mockSession) CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
func (*mockSession) CreateTable(_ context.Context, _ model.CIStr,
_ *model.TableInfo, _ ...ddl.CreateTableWithInfoConfigurier) error {
log.Fatal("unimplemented CreateDatabase for mock session")
return nil
}
Expand Down
19 changes: 13 additions & 6 deletions br/pkg/gluetidb/glue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,10 @@ func TestSplitBatchCreateTableWithTableId(t *testing.T) {
}))
require.NoError(t, err)

tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'table_id_resued1'").Check(testkit.Rows("124"))
tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'table_id_resued2'").Check(testkit.Rows("125"))
tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'table_id_resued1'").
Check(testkit.Rows("124"))
tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'table_id_resued2'").
Check(testkit.Rows("125"))
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnOthers)

// allocate new table id verification
Expand All @@ -90,7 +92,9 @@ func TestSplitBatchCreateTableWithTableId(t *testing.T) {
}))
require.NoError(t, err)

idGen, ok := tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'table_id_new'").Rows()[0][0].(string)
idGen, ok := tk.MustQuery(
"select tidb_table_id from information_schema.tables where table_name = 'table_id_new'").
Rows()[0][0].(string)
require.True(t, ok)
idGenNum, err := strconv.ParseInt(idGen, 10, 64)
require.NoError(t, err)
Expand Down Expand Up @@ -166,9 +170,12 @@ func TestSplitBatchCreateTable(t *testing.T) {
require.Equal(t, "public", job3[4])

// check reused table id
tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'tables_1'").Check(testkit.Rows("1234"))
tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'tables_2'").Check(testkit.Rows("1235"))
tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'tables_3'").Check(testkit.Rows("1236"))
tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'tables_1'").
Check(testkit.Rows("1234"))
tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'tables_2'").
Check(testkit.Rows("1235"))
tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'tables_3'").
Check(testkit.Rows("1236"))

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/RestoreBatchCreateTableEntryTooLarge"))
}
Expand Down
13 changes: 9 additions & 4 deletions br/pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ func MakeEngineManager(ab Backend) EngineManager {
}

// OpenEngine opens an engine with the given table name and engine ID.
func (be EngineManager) OpenEngine(ctx context.Context, config *EngineConfig, tableName string, engineID int32) (*OpenedEngine, error) {
func (be EngineManager) OpenEngine(ctx context.Context, config *EngineConfig,
tableName string, engineID int32) (*OpenedEngine, error) {
tag, engineUUID := MakeUUID(tableName, engineID)
logger := makeLogger(log.FromContext(ctx), tag, engineUUID)

Expand All @@ -231,7 +232,9 @@ func (be EngineManager) OpenEngine(ctx context.Context, config *EngineConfig, ta

closedCount := metric.ReadCounter(closedCounter)
if injectValue := val.(int); openCount-closedCount > float64(injectValue) {
panic(fmt.Sprintf("forcing failure due to FailIfEngineCountExceeds: %v - %v >= %d", openCount, closedCount, injectValue))
panic(fmt.Sprintf(
"forcing failure due to FailIfEngineCountExceeds: %v - %v >= %d",
openCount, closedCount, injectValue))
}
}
})
Expand Down Expand Up @@ -274,7 +277,8 @@ func (engine *OpenedEngine) LocalWriter(ctx context.Context, cfg *LocalWriterCon
// (Open -> Write -> Close -> Import). This method should only be used when one
// knows via other ways that the engine has already been opened, e.g. when
// resuming from a checkpoint.
func (be EngineManager) UnsafeCloseEngine(ctx context.Context, cfg *EngineConfig, tableName string, engineID int32) (*ClosedEngine, error) {
func (be EngineManager) UnsafeCloseEngine(ctx context.Context, cfg *EngineConfig,
tableName string, engineID int32) (*ClosedEngine, error) {
tag, engineUUID := MakeUUID(tableName, engineID)
return be.UnsafeCloseEngineWithUUID(ctx, cfg, tag, engineUUID, engineID)
}
Expand All @@ -284,7 +288,8 @@ func (be EngineManager) UnsafeCloseEngine(ctx context.Context, cfg *EngineConfig
// (Open -> Write -> Close -> Import). This method should only be used when one
// knows via other ways that the engine has already been opened, e.g. when
// resuming from a checkpoint.
func (be EngineManager) UnsafeCloseEngineWithUUID(ctx context.Context, cfg *EngineConfig, tag string, engineUUID uuid.UUID, id int32) (*ClosedEngine, error) {
func (be EngineManager) UnsafeCloseEngineWithUUID(ctx context.Context, cfg *EngineConfig, tag string,
engineUUID uuid.UUID, id int32) (*ClosedEngine, error) {
return engine{
backend: be.backend,
logger: makeLogger(log.FromContext(ctx), tag, engineUUID),
Expand Down