Skip to content

Commit

Permalink
*: Fix a bug when have a lot of jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala committed May 25, 2023
1 parent 4d9a1f1 commit 097ae28
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 7 deletions.
10 changes: 8 additions & 2 deletions ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
tidb_util "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/intest"
"github.com/pingcap/tidb/util/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
Expand Down Expand Up @@ -186,15 +187,20 @@ func (d *ddl) processJobDuringUpgrade(sess *sess.Session, job *model.Job) (isRun
}

if err != nil {
logutil.BgLogger().Warn("[ddl-upgrading] pause the job failed", zap.Stringer("job", job), zap.Bool("has job err", len(errs) > 0), zap.Error(err))
isCannotPauseDDLJobErr := dbterror.ErrCannotPauseDDLJob.Equal(err)
logutil.BgLogger().Warn("[ddl-upgrading] pause the job failed", zap.Stringer("job", job),
zap.Bool("isRunnable", isCannotPauseDDLJobErr), zap.Error(err))
if isCannotPauseDDLJobErr {
return true, nil
}
} else {
logutil.BgLogger().Warn("[ddl-upgrading] pause the job successfully", zap.Stringer("job", job))
}

return false, nil
}

if job.IsPausedBySystem() && !hasSysDB(job) {
if job.IsPausedBySystem() {
var errs []error
errs, err = ResumeJobsBySystem(sess.Session(), []int64{job.ID})
if len(errs) > 0 {
Expand Down
73 changes: 73 additions & 0 deletions ddl/job_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,21 @@
package ddl_test

import (
"context"
"fmt"
"sync"
"testing"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/ddl/internal/callback"
"github.com/pingcap/tidb/ddl/syncer"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
)
Expand Down Expand Up @@ -174,3 +179,71 @@ func check(t *testing.T, record []int64, ids ...int64) {
}
}
}

func TestUpgradingRelatedJobState(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("CREATE TABLE e2 (id INT NOT NULL);")

d := dom.DDL()

testCases := []struct {
sql string
jobState model.JobState
err error
}{
{"alter table e2 add index idx(id)", model.JobStateDone, nil},
{"alter table e2 add index idx1(id)", model.JobStateCancelling, errors.New("[ddl:8214]Cancelled DDL job")},
{"alter table e2 add index idx2(id)", model.JobStateRollingback, errors.New("[ddl:8214]Cancelled DDL job")},
{"alter table e2 add index idx3(id)", model.JobStateRollbackDone, errors.New("[ddl:8214]Cancelled DDL job")},
}

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockUpgradingState", `return(true)`))
defer require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/syncer/mockUpgradingState"))

num := 0
hook := &callback.TestDDLCallback{}
hook.OnGetJobBeforeExported = func(jobType string) {
for i := 0; i < 100; i++ {
time.Sleep(time.Millisecond * 100)
jobs, err := ddl.GetAllDDLJobs(testkit.NewTestKit(t, store).Session(), nil)
require.NoError(t, err)
if len(jobs) < 1 || jobs[0].Query != testCases[num].sql {
continue
}

if testCases[num].err != nil && jobs[0].SchemaState == model.StateWriteOnly {
tk.MustExec("use test")
tk.MustExec(fmt.Sprintf("admin cancel ddl jobs %d", jobs[0].ID))
logutil.BgLogger().Warn(fmt.Sprintf("xxx------------------------------------------------cancel job:%s", jobs[0].String()))
}
if jobs[0].State == testCases[num].jobState {
logutil.BgLogger().Warn(fmt.Sprintf("xxx------------------------------------------------upgrading job:%s", jobs[0].String()))
dom.DDL().StateSyncer().UpdateGlobalState(context.Background(), &syncer.StateInfo{State: syncer.StateUpgrading})
logutil.BgLogger().Warn(fmt.Sprintf("xxx------------------------------------------------upgrading 222 job:%s", jobs[0].String()))
}
break
}
}
hook.OnGetJobAfterExported = func(jobType string, getJob *model.Job) {
if getJob.Query == testCases[num].sql && getJob.State == testCases[num].jobState {
logutil.BgLogger().Warn(fmt.Sprintf("xxx------------------------------------------------normal job:%s", getJob.String()))
dom.DDL().StateSyncer().UpdateGlobalState(context.Background(), &syncer.StateInfo{State: syncer.StateNormalRunning})
logutil.BgLogger().Warn(fmt.Sprintf("xxx------------------------------------------------normal 222 job:%s", getJob.String()))
}
}
d.SetHook(hook)

for i, tc := range testCases {
num = i
logutil.BgLogger().Warn(fmt.Sprintf("xxx------------------------------------------------sql:%s", tc.sql))
if tc.err == nil {
tk.MustExec(tc.sql)
} else {
_, err := tk.Exec(tc.sql)
require.Equal(t, tc.err.Error(), err.Error())
}
}
}
10 changes: 10 additions & 0 deletions ddl/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package ddl

import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
atomicutil "go.uber.org/atomic"
)
Expand Down Expand Up @@ -161,6 +163,13 @@ func (s *MockStateSyncer) Init(context.Context) error {

// UpdateGlobalState implements StateSyncer.UpdateGlobalState interface.
func (s *MockStateSyncer) UpdateGlobalState(_ context.Context, stateInfo *syncer.StateInfo) error {
failpoint.Inject("mockUpgradingState", func(val failpoint.Value) {
if val.(bool) {
logutil.BgLogger().Warn(fmt.Sprintf("xxx------------------------------------------------mockUpgradingState *** job:%v", stateInfo))
s.clusterState.Store(stateInfo)
failpoint.Return(nil)
}
})
s.globalVerCh <- clientv3.WatchResponse{}
s.clusterState.Store(stateInfo)
return nil
Expand All @@ -173,6 +182,7 @@ func (s *MockStateSyncer) GetGlobalState(context.Context) (*syncer.StateInfo, er

// IsUpgradingState implements StateSyncer.IsUpgradingState interface.
func (s *MockStateSyncer) IsUpgradingState() bool {

return s.clusterState.Load().State == syncer.StateUpgrading
}

Expand Down
10 changes: 5 additions & 5 deletions session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -1188,21 +1188,21 @@ func syncUpgradeState(s Session) {
if err == nil && len(jobErrs) == 0 {
break
}
isAllFinished := true
jobErrStrs := make([]string, 0, len(jobErrs))
for _, jobErr := range jobErrs {
if dbterror.ErrPausedDDLJob.Equal(jobErr) {
continue
}
isAllFinished = false
jobErrStrs = append(jobErrStrs, jobErr.Error())
}
if isAllFinished {
if err == nil && len(jobErrs) == 0 {
break
}

if i == retryTimes-1 {
logutil.BgLogger().Fatal("[upgrading] pause all jobs failed", zap.Error(err))
logutil.BgLogger().Fatal("[upgrading] pause all jobs failed", zap.Strings("errs", jobErrStrs), zap.Error(err))
}
logutil.BgLogger().Warn("[upgrading] pause all jobs failed", zap.Error(err))
logutil.BgLogger().Warn("[upgrading] pause all jobs failed", zap.Strings("errs", jobErrStrs), zap.Error(err))
time.Sleep(interval)
}
logutil.BgLogger().Info("[upgrading] update global state to upgrading", zap.String("state", syncer.StateUpgrading))
Expand Down

0 comments on commit 097ae28

Please sign in to comment.