Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Fix a bug when have a lot of jobs in upgrading state #44162

Merged
merged 4 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ go_test(
"//ddl/internal/session",
"//ddl/placement",
"//ddl/schematracker",
"//ddl/syncer",
"//ddl/testutil",
"//ddl/util",
"//disttask/framework/proto",
Expand Down
8 changes: 7 additions & 1 deletion ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
tidb_util "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/intest"
"github.com/pingcap/tidb/util/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
Expand Down Expand Up @@ -186,7 +187,12 @@ func (d *ddl) processJobDuringUpgrade(sess *sess.Session, job *model.Job) (isRun
}

if err != nil {
logutil.BgLogger().Warn("[ddl-upgrading] pause the job failed", zap.Stringer("job", job), zap.Bool("has job err", len(errs) > 0), zap.Error(err))
isCannotPauseDDLJobErr := dbterror.ErrCannotPauseDDLJob.Equal(err)
logutil.BgLogger().Warn("[ddl-upgrading] pause the job failed", zap.Stringer("job", job),
zap.Bool("isRunnable", isCannotPauseDDLJobErr), zap.Error(err))
if isCannotPauseDDLJobErr {
return true, nil
}
} else {
logutil.BgLogger().Warn("[ddl-upgrading] pause the job successfully", zap.Stringer("job", job))
}
Expand Down
68 changes: 68 additions & 0 deletions ddl/job_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@
package ddl_test

import (
"context"
"fmt"
"sync"
"testing"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/ddl/internal/callback"
"github.com/pingcap/tidb/ddl/syncer"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -174,3 +178,67 @@ func check(t *testing.T, record []int64, ids ...int64) {
}
}
}

func TestUpgradingRelatedJobState(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("CREATE TABLE e2 (id INT NOT NULL);")

d := dom.DDL()

testCases := []struct {
sql string
jobState model.JobState
err error
}{
{"alter table e2 add index idx(id)", model.JobStateDone, nil},
{"alter table e2 add index idx1(id)", model.JobStateCancelling, errors.New("[ddl:8214]Cancelled DDL job")},
{"alter table e2 add index idx2(id)", model.JobStateRollingback, errors.New("[ddl:8214]Cancelled DDL job")},
{"alter table e2 add index idx3(id)", model.JobStateRollbackDone, errors.New("[ddl:8214]Cancelled DDL job")},
}

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockUpgradingState", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockUpgradingState"))
}()

num := 0
hook := &callback.TestDDLCallback{}
hook.OnGetJobBeforeExported = func(jobType string) {
for i := 0; i < 100; i++ {
time.Sleep(time.Millisecond * 100)
jobs, err := ddl.GetAllDDLJobs(testkit.NewTestKit(t, store).Session(), nil)
require.NoError(t, err)
if len(jobs) < 1 || jobs[0].Query != testCases[num].sql {
continue
}

if testCases[num].err != nil && jobs[0].SchemaState == model.StateWriteOnly {
tk.MustExec("use test")
tk.MustExec(fmt.Sprintf("admin cancel ddl jobs %d", jobs[0].ID))
}
if jobs[0].State == testCases[num].jobState {
dom.DDL().StateSyncer().UpdateGlobalState(context.Background(), &syncer.StateInfo{State: syncer.StateUpgrading})
}
break
}
}
hook.OnGetJobAfterExported = func(jobType string, getJob *model.Job) {
if getJob.Query == testCases[num].sql && getJob.State == testCases[num].jobState {
dom.DDL().StateSyncer().UpdateGlobalState(context.Background(), &syncer.StateInfo{State: syncer.StateNormalRunning})
}
}
d.SetHook(hook)

for i, tc := range testCases {
num = i
if tc.err == nil {
tk.MustExec(tc.sql)
} else {
_, err := tk.Exec(tc.sql)
require.Equal(t, tc.err.Error(), err.Error())
}
}
}
6 changes: 6 additions & 0 deletions ddl/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@ func (s *MockStateSyncer) Init(context.Context) error {

// UpdateGlobalState implements StateSyncer.UpdateGlobalState interface.
func (s *MockStateSyncer) UpdateGlobalState(_ context.Context, stateInfo *syncer.StateInfo) error {
failpoint.Inject("mockUpgradingState", func(val failpoint.Value) {
if val.(bool) {
s.clusterState.Store(stateInfo)
failpoint.Return(nil)
}
})
s.globalVerCh <- clientv3.WatchResponse{}
s.clusterState.Store(stateInfo)
return nil
Expand Down
10 changes: 5 additions & 5 deletions session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -1188,21 +1188,21 @@ func syncUpgradeState(s Session) {
if err == nil && len(jobErrs) == 0 {
break
}
isAllFinished := true
jobErrStrs := make([]string, 0, len(jobErrs))
for _, jobErr := range jobErrs {
if dbterror.ErrPausedDDLJob.Equal(jobErr) {
continue
}
isAllFinished = false
jobErrStrs = append(jobErrStrs, jobErr.Error())
zimulala marked this conversation as resolved.
Show resolved Hide resolved
}
if isAllFinished {
if err == nil && len(jobErrs) == 0 {
break
}

if i == retryTimes-1 {
logutil.BgLogger().Fatal("[upgrading] pause all jobs failed", zap.Error(err))
logutil.BgLogger().Fatal("[upgrading] pause all jobs failed", zap.Strings("errs", jobErrStrs), zap.Error(err))
}
logutil.BgLogger().Warn("[upgrading] pause all jobs failed", zap.Error(err))
logutil.BgLogger().Warn("[upgrading] pause all jobs failed", zap.Strings("errs", jobErrStrs), zap.Error(err))
time.Sleep(interval)
}
logutil.BgLogger().Info("[upgrading] update global state to upgrading", zap.String("state", syncer.StateUpgrading))
Expand Down