Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into fix-client-leak
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter committed Apr 4, 2023
2 parents e83b723 + 51a25c7 commit 45e255f
Show file tree
Hide file tree
Showing 36 changed files with 449 additions and 294 deletions.
12 changes: 10 additions & 2 deletions disttask/framework/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ func (d *dispatcher) DispatchTaskLoop() {
case <-ticker.C:
cnt := d.getRunningGTaskCnt()
if cnt >= DefaultDispatchConcurrency {
logutil.BgLogger().Info("dispatch task loop, running GTask cnt is more than concurrency",
zap.Int("running cnt", cnt), zap.Int("concurrency", DefaultDispatchConcurrency))
break
}

Expand All @@ -175,12 +177,14 @@ func (d *dispatcher) DispatchTaskLoop() {
continue
}
if cnt >= DefaultDispatchConcurrency {
logutil.BgLogger().Info("dispatch task loop, running GTask cnt is more than concurrency", zap.Int64("current task ID", gTask.ID),
zap.Int("running cnt", cnt), zap.Int("concurrency", DefaultDispatchConcurrency))
break
}

err = d.processNormalFlow(gTask)
logutil.BgLogger().Info("dispatch task loop", zap.Int64("task ID", gTask.ID),
zap.String("state", gTask.State), zap.Uint64("con", gTask.Concurrency), zap.Error(err))
zap.String("state", gTask.State), zap.Uint64("concurrency", gTask.Concurrency), zap.Error(err))
if err != nil || gTask.IsFinished() {
continue
}
Expand Down Expand Up @@ -266,6 +270,10 @@ func (d *dispatcher) detectTask(gTask *proto.Task) {
zap.Int64("taskID", gTask.ID), zap.String("state", gTask.State))
return
}
if !d.isRunningGTask(gTask.ID) {
logutil.BgLogger().Info("detect task, this task can't run",
zap.Int64("taskID", gTask.ID), zap.String("state", gTask.State))
}
}
}
}
Expand Down Expand Up @@ -371,7 +379,7 @@ func (d *dispatcher) processNormalFlow(gTask *proto.Task) (err error) {
return err
}
logutil.BgLogger().Info("process normal flow", zap.Int64("task ID", gTask.ID),
zap.String("state", gTask.State), zap.Uint64("con", gTask.Concurrency), zap.Int("subtasks", len(metas)))
zap.String("state", gTask.State), zap.Uint64("concurrency", gTask.Concurrency), zap.Int("subtasks", len(metas)))

// Adjust the global task's concurrency.
if gTask.Concurrency == 0 {
Expand Down
24 changes: 16 additions & 8 deletions disttask/framework/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ const (
)

func checkDispatch(t *testing.T, taskCnt int, isSucc bool) {
failpoint.Enable("github.com/pingcap/tidb/domain/MockDisableDistTask", "return(true)")
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/MockDisableDistTask"))
}()
// test DispatchTaskLoop
// test parallelism control
var originalConcurrency int
Expand All @@ -150,15 +154,16 @@ func checkDispatch(t *testing.T, taskCnt int, isSucc bool) {

dispatcher.RegisterTaskFlowHandle(taskTypeExample, NumberExampleHandle{})

cnt := 20
// 2s
cnt := 40
checkGetRunningGTaskCnt := func() {
var retCnt int
for i := 0; i < cnt; i++ {
retCnt = dsp.(dispatcher.DispatcherForTest).GetRunningGTaskCnt()
if retCnt == taskCnt {
break
}
time.Sleep(time.Millisecond * 30)
time.Sleep(time.Millisecond * 50)
}
require.Equal(t, retCnt, taskCnt)
}
Expand All @@ -179,14 +184,17 @@ func checkDispatch(t *testing.T, taskCnt int, isSucc bool) {
require.Equal(t, int64(i+1), tasks[i].ID)
subtasks, err := subTaskMgr.GetSubtaskInStatesCnt(taskID, proto.TaskStatePending)
require.NoError(t, err)
require.Equal(t, subtasks, int64(subtaskCnt))
require.Equal(t, int64(subtaskCnt), subtasks, fmt.Sprintf("num:%d", i))
}
// test parallelism control
taskID, err := gTaskMgr.AddNewTask(fmt.Sprintf("%d", taskCnt), taskTypeExample, 0, nil)
require.NoError(t, err)
checkGetRunningGTaskCnt()
// Clean the task.
deleteTasks(t, store, taskID)
if taskCnt == 1 {
taskID, err := gTaskMgr.AddNewTask(fmt.Sprintf("%d", taskCnt), taskTypeExample, 0, nil)
require.NoError(t, err)
checkGetRunningGTaskCnt()
// Clean the task.
deleteTasks(t, store, taskID)
dsp.(dispatcher.DispatcherForTest).DelRunningGTask(taskID)
}

// test DetectTaskLoop
checkGetGTaskState := func(expectedState string) {
Expand Down
6 changes: 6 additions & 0 deletions disttask/framework/dispatcher/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,19 @@ import (
// DispatcherForTest exports for testing.
type DispatcherForTest interface {
GetRunningGTaskCnt() int
DelRunningGTask(globalTaskID int64)
}

// GetRunningGTaskCnt implements Dispatcher.GetRunningGTaskCnt interface.
func (d *dispatcher) GetRunningGTaskCnt() int {
return d.getRunningGTaskCnt()
}

// DelRunningGTask implements Dispatcher.DelRunningGTask interface.
func (d *dispatcher) DelRunningGTask(globalTaskID int64) {
d.delRunningGTask(globalTaskID)
}

func TestMain(m *testing.M) {
testsetup.SetupForCommonTest()

Expand Down
7 changes: 6 additions & 1 deletion domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func (do *Domain) getTimestampForSchemaVersionWithNonEmptyDiff(m *meta.Meta, ver
if err != nil {
return 0, err
}
if len(data.Info.Writes) == 0 {
if data == nil || data.Info == nil || len(data.Info.Writes) == 0 {
return 0, errors.Errorf("There is no Write MVCC info for the schema version")
}
return int64(data.Info.Writes[0].CommitTs), nil
Expand Down Expand Up @@ -1346,6 +1346,11 @@ func (do *Domain) checkReplicaRead(ctx context.Context, pdClient pd.Client) erro
}

func (do *Domain) initDistTaskLoop(ctx context.Context) error {
failpoint.Inject("MockDisableDistTask", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(nil)
}
})
se1, err := do.sysExecutorFactory(do)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions executor/analyze_col_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ func (e *AnalyzeColumnsExecV2) analyzeColumnsPushDownWithRetryV2() *statistics.A
} else {
statsTbl = statsHandle.GetPartitionStats(e.tableInfo, tid)
}
if statsTbl == nil || statsTbl.Count <= 0 {
if statsTbl == nil || statsTbl.RealtimeCount <= 0 {
return analyzeResult
}
newSampleRate := math.Min(1, float64(config.DefRowsForSampleRate)/float64(statsTbl.Count))
newSampleRate := math.Min(1, float64(config.DefRowsForSampleRate)/float64(statsTbl.RealtimeCount))
if newSampleRate >= *e.analyzePB.ColReq.SampleRate {
return analyzeResult
}
Expand Down
4 changes: 2 additions & 2 deletions executor/analyze_fast.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (e *AnalyzeFastExec) calculateEstimateSampleStep() (err error) {
var historyRowCount uint64
hasBeenAnalyzed := len(rows) != 0 && rows[0].GetInt64(0) == statistics.AnalyzeFlag
if hasBeenAnalyzed {
historyRowCount = uint64(domain.GetDomain(e.ctx).StatsHandle().GetPartitionStats(e.tblInfo, e.tableID.GetStatisticsID()).Count)
historyRowCount = uint64(domain.GetDomain(e.ctx).StatsHandle().GetPartitionStats(e.tblInfo, e.tableID.GetStatisticsID()).RealtimeCount)
} else {
dbInfo, ok := domain.GetDomain(e.ctx).InfoSchema().SchemaByTable(e.tblInfo)
if !ok {
Expand Down Expand Up @@ -564,7 +564,7 @@ func (e *AnalyzeFastExec) runTasks() ([]*statistics.Histogram, []*statistics.CMS
var rowCount int64 = 0
if stats.Lease() > 0 {
if t := stats.GetPartitionStats(e.tblInfo, e.tableID.GetStatisticsID()); !t.Pseudo {
rowCount = t.Count
rowCount = t.RealtimeCount
}
}
hists, cms, topNs, fms := make([]*statistics.Histogram, length), make([]*statistics.CMSketch, length), make([]*statistics.TopN, length), make([]*statistics.FMSketch, length)
Expand Down
1 change: 1 addition & 0 deletions executor/asyncloaddata/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ go_test(
shard_count = 6,
deps = [
"//br/pkg/lightning/config",
"//executor",
"//executor/importer",
"//kv",
"//parser/auth",
Expand Down
73 changes: 45 additions & 28 deletions executor/asyncloaddata/show_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/fsouza/fake-gcs-server/fakestorage"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/executor"
. "github.com/pingcap/tidb/executor/asyncloaddata"
"github.com/pingcap/tidb/executor/importer"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -269,15 +270,14 @@ func (s *mockGCSSuite) TestInternalStatus() {
wg.Add(1)
go func() {
defer wg.Done()
defer close(executor.TestSyncCh)

tk2 := testkit.NewTestKit(s.T(), s.store)
tk2.Session().GetSessionVars().User = user
userStr := tk2.Session().GetSessionVars().User.String()

// tk @ 0:00
// create load data job record in the system table and sleep 3 seconds

time.Sleep(2 * time.Second)
// tk2 @ 0:02
// wait for the load data job to be created
<-executor.TestSyncCh

jobInfos, err := GetAllJobInfo(ctx, tk2.Session(), userStr)
require.NoError(s.T(), err)
Expand Down Expand Up @@ -318,11 +318,11 @@ func (s *mockGCSSuite) TestInternalStatus() {
}
r.checkIgnoreTimes(s.T(), row)

// tk @ 0:03
// start job and sleep 3 seconds
// resume the load data job
executor.TestSyncCh <- struct{}{}

time.Sleep(3 * time.Second)
// tk2 @ 0:05
// wait for the load data job to be started
<-executor.TestSyncCh

info, err = GetJobInfo(ctx, tk2.Session(), id, userStr)
require.NoError(s.T(), err)
Expand All @@ -336,12 +336,20 @@ func (s *mockGCSSuite) TestInternalStatus() {
r.jobStatus = "running"
r.checkIgnoreTimes(s.T(), row)

// tk @ 0:06
// commit one task and sleep 3 seconds
// resume the load data job
executor.TestSyncCh <- struct{}{}

time.Sleep(3 * time.Second)
// tk2 @ 0:08
// wait for the first task to be committed
<-executor.TestSyncCh

// wait for UpdateJobProgress
require.Eventually(s.T(), func() bool {
info, err = GetJobInfo(ctx, tk2.Session(), id, userStr)
if err != nil {
return false
}
return info.Progress == `{"SourceFileSize":2,"LoadedFileSize":1,"LoadedRowCnt":1}`
}, 6*time.Second, time.Millisecond*100)
info, err = GetJobInfo(ctx, tk2.Session(), id, userStr)
require.NoError(s.T(), err)
expected.Progress = `{"SourceFileSize":2,"LoadedFileSize":1,"LoadedRowCnt":1}`
Expand All @@ -354,28 +362,37 @@ func (s *mockGCSSuite) TestInternalStatus() {
r.loadedFileSize = "1B"
r.checkIgnoreTimes(s.T(), row)

// tk @ 0:09
// commit one task and sleep 3 seconds
// resume the load data job
executor.TestSyncCh <- struct{}{}

time.Sleep(3 * time.Second)
// tk2 @ 0:11
// wait for the second task to be committed
<-executor.TestSyncCh

info, err = GetJobInfo(ctx, tk2.Session(), id, userStr)
require.NoError(s.T(), err)
expected.Progress = `{"SourceFileSize":2,"LoadedFileSize":2,"LoadedRowCnt":2}`
require.Equal(s.T(), expected, info)
// wait for UpdateJobProgress
require.Eventually(s.T(), func() bool {
info, err = GetJobInfo(ctx, tk2.Session(), id, userStr)
if err != nil {
return false
}
return info.Progress == `{"SourceFileSize":2,"LoadedFileSize":2,"LoadedRowCnt":2}`
}, 6*time.Second, time.Millisecond*100)

rows = tk2.MustQuery(fmt.Sprintf("SHOW LOAD DATA JOB %d;", id)).Rows()
require.Len(s.T(), rows, 1)
row = rows[0]
r.loadedFileSize = "2B"
r.checkIgnoreTimes(s.T(), row)

// tk @ 0:12
// finish job
// resume the load data job
executor.TestSyncCh <- struct{}{}

time.Sleep(3 * time.Second)
// tk2 @ 0:14
require.Eventually(s.T(), func() bool {
info, err = GetJobInfo(ctx, tk2.Session(), id, userStr)
if err != nil {
return false
}
return info.Status == JobFinished
}, 6*time.Second, 100*time.Millisecond)

info, err = GetJobInfo(ctx, tk2.Session(), id, userStr)
require.NoError(s.T(), err)
Expand Down Expand Up @@ -410,9 +427,9 @@ func (s *mockGCSSuite) TestInternalStatus() {
config.BufferSizeScale = backup3
})

s.enableFailpoint("github.com/pingcap/tidb/executor/AfterCreateLoadDataJob", `sleep(3000)`)
s.enableFailpoint("github.com/pingcap/tidb/executor/AfterStartJob", `sleep(3000)`)
s.enableFailpoint("github.com/pingcap/tidb/executor/AfterCommitOneTask", `sleep(3000)`)
s.enableFailpoint("github.com/pingcap/tidb/executor/SyncAfterCreateLoadDataJob", `return`)
s.enableFailpoint("github.com/pingcap/tidb/executor/SyncAfterStartJob", `return`)
s.enableFailpoint("github.com/pingcap/tidb/executor/SyncAfterCommitOneTask", `return`)
sql := fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t*.tsv?endpoint=%s'
INTO TABLE load_tsv.t WITH batch_size = 1;`, gcsEndpoint)
s.tk.MustExec(sql)
Expand Down
8 changes: 4 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2657,25 +2657,25 @@ func (b *executorBuilder) getAdjustedSampleRate(sctx sessionctx.Context, task pl
return defaultRate
}
// If the count in stats_meta is still 0 and there's no information from pd side, we scan all rows.
if statsTbl.Count == 0 && !hasPD {
if statsTbl.RealtimeCount == 0 && !hasPD {
return 1
}
// we have issue https://github.com/pingcap/tidb/issues/29216.
// To do a workaround for this issue, we check the approxiCount from the pd side to do a comparison.
// If the count from the stats_meta is extremely smaller than the approximate count from the pd,
// we think that we meet this issue and use the approximate count to calculate the sample rate.
if float64(statsTbl.Count*5) < approxiCount {
if float64(statsTbl.RealtimeCount*5) < approxiCount {
// Confirmed by TiKV side, the experience error rate of the approximate count is about 20%.
// So we increase the number to 150000 to reduce this error rate.
return math.Min(1, 150000/approxiCount)
}
// If we don't go into the above if branch and we still detect the count is zero. Return 1 to prevent the dividing zero.
if statsTbl.Count == 0 {
if statsTbl.RealtimeCount == 0 {
return 1
}
// We are expected to scan about 100000 rows or so.
// Since there's tiny error rate around the count from the stats meta, we use 110000 to get a little big result
return math.Min(1, config.DefRowsForSampleRate/float64(statsTbl.Count))
return math.Min(1, config.DefRowsForSampleRate/float64(statsTbl.RealtimeCount))
}

func (b *executorBuilder) getApproximateTableCountFromStorage(sctx sessionctx.Context, tid int64, task plannercore.AnalyzeColumnsTask) (float64, bool) {
Expand Down
15 changes: 15 additions & 0 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,9 @@ func (e *LoadDataWorker) Load(
return jobID, e.doLoad(ctx, readerInfos, jobID)
}

// TestSyncCh is used in unit test to synchronize the execution of LOAD DATA.
var TestSyncCh = make(chan struct{})

func (e *LoadDataWorker) doLoad(
ctx context.Context,
readerInfos []importer.LoadDataReaderInfo,
Expand Down Expand Up @@ -375,6 +378,10 @@ func (e *LoadDataWorker) doLoad(
}()

failpoint.Inject("AfterCreateLoadDataJob", nil)
failpoint.Inject("SyncAfterCreateLoadDataJob", func() {
TestSyncCh <- struct{}{}
<-TestSyncCh
})

totalFilesize := int64(0)
hasErr := false
Expand All @@ -396,6 +403,10 @@ func (e *LoadDataWorker) doLoad(
}

failpoint.Inject("AfterStartJob", nil)
failpoint.Inject("SyncAfterStartJob", func() {
TestSyncCh <- struct{}{}
<-TestSyncCh
})

group, groupCtx := errgroup.WithContext(ctx)
// main goroutine -> readerInfoCh -> processOneStream goroutines
Expand Down Expand Up @@ -759,6 +770,10 @@ func (w *commitWorker) commitWork(ctx context.Context, inCh <-chan commitTask) (
zap.Uint64("taskCnt processed", taskCnt),
)
failpoint.Inject("AfterCommitOneTask", nil)
failpoint.Inject("SyncAfterCommitOneTask", func() {
TestSyncCh <- struct{}{}
<-TestSyncCh
})
}
}
}
Expand Down
Loading

0 comments on commit 45e255f

Please sign in to comment.