Skip to content

Commit

Permalink
Merge branch 'master' of github.com:pingcap/tidb into issue41434
Browse files Browse the repository at this point in the history
  • Loading branch information
XuHuaiyu committed Feb 16, 2023
2 parents 306fa5d + d2d91b5 commit c7e324e
Show file tree
Hide file tree
Showing 131 changed files with 23,302 additions and 21,992 deletions.
1 change: 1 addition & 0 deletions br/pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ go_test(
],
embed = [":storage"],
flaky = True,
shard_count = 41,
deps = [
"//br/pkg/mock",
"@com_github_aws_aws_sdk_go//aws",
Expand Down
15 changes: 14 additions & 1 deletion br/pkg/storage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,9 @@ func TestS3Storage(t *testing.T) {
sendCredential bool
}

require.NoError(t, os.Setenv("AWS_ACCESS_KEY_ID", "ab"))
require.NoError(t, os.Setenv("AWS_SECRET_ACCESS_KEY", "cd"))
require.NoError(t, os.Setenv("AWS_SESSION_TOKEN", "ef"))
s := createGetBucketRegionServer("us-west-2", 200, true)
defer s.Close()

Expand Down Expand Up @@ -411,7 +414,17 @@ func TestS3Storage(t *testing.T) {
}

func TestS3URI(t *testing.T) {
backend, err := ParseBackend("s3://bucket/prefix/", nil)
accessKey := "ab"
secretAccessKey := "cd"
sessionToken := "ef"
options := &BackendOptions{
S3: S3BackendOptions{
AccessKey: accessKey,
SecretAccessKey: secretAccessKey,
SessionToken: sessionToken,
},
}
backend, err := ParseBackend("s3://bucket/prefix/", options)
require.NoError(t, err)
storage, err := New(context.Background(), backend, &ExternalStorageOptions{})
require.NoError(t, err)
Expand Down
3 changes: 2 additions & 1 deletion build/nogo_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@
"ineffassign": {
"exclude_files": {
"external/": "no need to vet third party code",
".*_generated\\.go$": "ignore generated code"
".*_generated\\.go$": "ignore generated code",
"/cgo/": "no need to vet cgo code"
}
},
"inspect": {
Expand Down
7 changes: 7 additions & 0 deletions ddl/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type Callback interface {
OnSchemaStateChanged(schemaVer int64)
// OnJobRunBefore is called before running job.
OnJobRunBefore(job *model.Job)
// OnJobRunAfter is called after running job.
OnJobRunAfter(job *model.Job)
// OnJobUpdated is called after the running job is updated.
OnJobUpdated(job *model.Job)
// OnWatched is called after watching owner is completed.
Expand Down Expand Up @@ -80,6 +82,11 @@ func (*BaseCallback) OnJobRunBefore(_ *model.Job) {
// Nothing to do.
}

// OnJobRunAfter implements Callback.OnJobRunAfter interface.
func (*BaseCallback) OnJobRunAfter(_ *model.Job) {
// Nothing to do.
}

// OnJobUpdated implements Callback.OnJobUpdated interface.
func (*BaseCallback) OnJobUpdated(job *model.Job) {
// Nothing to do.
Expand Down
2 changes: 1 addition & 1 deletion ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,7 @@ func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.J
return false, ver, errors.Trace(err)
}
reorgInfo, err := getReorgInfo(d.jobContext(job.ID), d, rh, job, dbInfo, tbl, BuildElements(changingCol, changingIdxs), false)
if err != nil || reorgInfo.first {
if err != nil || reorgInfo == nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
// and then run the reorg next time.
return false, ver, errors.Trace(err)
Expand Down
19 changes: 7 additions & 12 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,13 @@ func (dc *ddlCtx) getReorgCtx(jobID int64) *reorgCtx {
}

func (dc *ddlCtx) newReorgCtx(jobID int64, startKey []byte, currElement *meta.Element, rowCount int64) *reorgCtx {
dc.reorgCtx.Lock()
defer dc.reorgCtx.Unlock()
existedRC, ok := dc.reorgCtx.reorgCtxMap[jobID]
if ok {
existedRC.references.Add(1)
return existedRC
}
rc := &reorgCtx{}
rc.doneCh = make(chan error, 1)
// initial reorgCtx
Expand All @@ -535,22 +542,10 @@ func (dc *ddlCtx) newReorgCtx(jobID int64, startKey []byte, currElement *meta.El
rc.mu.warnings = make(map[errors.ErrorID]*terror.Error)
rc.mu.warningsCount = make(map[errors.ErrorID]int64)
rc.references.Add(1)
dc.reorgCtx.Lock()
defer dc.reorgCtx.Unlock()
dc.reorgCtx.reorgCtxMap[jobID] = rc
return rc
}

func (dc *ddlCtx) setReorgCtxForBackfill(bfJob *BackfillJob) {
rc := dc.getReorgCtx(bfJob.JobID)
if rc == nil {
ele := &meta.Element{ID: bfJob.EleID, TypeKey: bfJob.EleKey}
dc.newReorgCtx(bfJob.JobID, bfJob.Meta.StartKey, ele, bfJob.Meta.RowCount)
} else {
rc.references.Add(1)
}
}

func (dc *ddlCtx) removeReorgCtx(jobID int64) {
dc.reorgCtx.Lock()
defer dc.reorgCtx.Unlock()
Expand Down
6 changes: 0 additions & 6 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ type DDLForTest interface {
// SetInterceptor sets the interceptor.
SetInterceptor(h Interceptor)
NewReorgCtx(jobID int64, startKey []byte, currElement *meta.Element, rowCount int64) *reorgCtx
SetReorgCtxForBackfill(bfJob *BackfillJob)
GetReorgCtx(jobID int64) *reorgCtx
RemoveReorgCtx(id int64)
}
Expand All @@ -67,11 +66,6 @@ func (d *ddl) NewReorgCtx(jobID int64, startKey []byte, currElement *meta.Elemen
return d.newReorgCtx(jobID, startKey, currElement, rowCount)
}

// SetReorgCtxForBackfill exports for testing.
func (d *ddl) SetReorgCtxForBackfill(bfJob *BackfillJob) {
d.setReorgCtxForBackfill(bfJob)
}

// GetReorgCtx exports for testing.
func (d *ddl) GetReorgCtx(jobID int64) *reorgCtx {
return d.getReorgCtx(jobID)
Expand Down
4 changes: 4 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,10 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) {
// and retry later if the job is not cancelled.
schemaVer, runJobErr = w.runDDLJob(d, t, job)

d.mu.RLock()
d.mu.hook.OnJobRunAfter(job)
d.mu.RUnlock()

if job.IsCancelled() {
defer d.unlockSchemaVersion(job.ID)
w.sess.reset()
Expand Down
10 changes: 5 additions & 5 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,10 +317,10 @@ func TestUsingReorgCtx(t *testing.T) {
wg := util.WaitGroupWrapper{}
wg.Run(func() {
jobID := int64(1)
m := &model.BackfillMeta{StartKey: []byte("skey"), RowCount: 1}
bfJob := &ddl.BackfillJob{JobID: jobID, EleID: 1, EleKey: nil, Meta: m}
for i := 0; i < 100; i++ {
d.(ddl.DDLForTest).SetReorgCtxForBackfill(bfJob)
startKey := []byte("skey")
ele := &meta.Element{ID: 1, TypeKey: nil}
for i := 0; i < 500; i++ {
d.(ddl.DDLForTest).NewReorgCtx(jobID, startKey, ele, 0)
d.(ddl.DDLForTest).GetReorgCtx(jobID).IsReorgCanceled()
d.(ddl.DDLForTest).RemoveReorgCtx(jobID)
}
Expand All @@ -329,7 +329,7 @@ func TestUsingReorgCtx(t *testing.T) {
jobID := int64(1)
startKey := []byte("skey")
ele := &meta.Element{ID: 1, TypeKey: nil}
for i := 0; i < 100; i++ {
for i := 0; i < 500; i++ {
d.(ddl.DDLForTest).NewReorgCtx(jobID, startKey, ele, 0)
d.(ddl.DDLForTest).GetReorgCtx(jobID).IsReorgCanceled()
d.(ddl.DDLForTest).RemoveReorgCtx(jobID)
Expand Down
2 changes: 1 addition & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -983,7 +983,7 @@ func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
return false, ver, errors.Trace(err)
}
reorgInfo, err := getReorgInfo(d.jobContext(job.ID), d, rh, job, dbInfo, tbl, elements, mergingTmpIdx)
if err != nil || reorgInfo.first {
if err != nil || reorgInfo == nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
// and then run the reorg next time.
return false, ver, errors.Trace(err)
Expand Down
39 changes: 39 additions & 0 deletions ddl/indexmergetest/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package indexmergetest

import (
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -858,3 +859,41 @@ func TestAddIndexMultipleDelete(t *testing.T) {
tk.MustQuery("select * from t;").Check(testkit.Rows())
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecution"))
}

func TestAddIndexDuplicateAndWriteConflict(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(id int primary key, b int);")
tk.MustExec("insert into t values (1, 1);")

tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")

d := dom.DDL()
originalCallback := d.GetHook()
defer d.SetHook(originalCallback)
callback := &callback.TestDDLCallback{}
var runCancel bool
callback.OnJobRunAfterExported = func(job *model.Job) {
if t.Failed() || runCancel {
return
}
switch job.SchemaState {
case model.StateWriteOnly:
_, err := tk1.Exec("insert into t values (2, 1);")
assert.NoError(t, err)
}
if job.State == model.JobStateRollingback {
_, err := tk1.Exec("admin cancel ddl jobs " + strconv.FormatInt(job.ID, 10))
assert.NoError(t, err)
runCancel = true
}
}
d.SetHook(callback)

tk.MustGetErrCode("alter table t add unique index idx(b);", errno.ErrCancelledDDLJob)
tk.MustExec("admin check table t;")
tk.MustQuery("select * from t;").Check(testkit.Rows("1 1", "2 1"))
}
12 changes: 12 additions & 0 deletions ddl/internal/callback/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type TestDDLCallback struct {

onJobRunBefore func(*model.Job)
OnJobRunBeforeExported func(*model.Job)
OnJobRunAfterExported func(*model.Job)
onJobUpdated func(*model.Job)
OnJobUpdatedExported atomic.Pointer[func(*model.Job)]
onWatched func(ctx context.Context)
Expand Down Expand Up @@ -103,6 +104,17 @@ func (tc *TestDDLCallback) OnJobRunBefore(job *model.Job) {
tc.BaseCallback.OnJobRunBefore(job)
}

// OnJobRunAfter is used to run the user customized logic of `OnJobRunAfter` first.
func (tc *TestDDLCallback) OnJobRunAfter(job *model.Job) {
logutil.BgLogger().Info("on job run after", zap.String("job", job.String()))
if tc.OnJobRunAfterExported != nil {
tc.OnJobRunAfterExported(job)
return
}

tc.BaseCallback.OnJobRunAfter(job)
}

// OnJobUpdated is used to run the user customized logic of `OnJobUpdated` first.
func (tc *TestDDLCallback) OnJobUpdated(job *model.Job) {
logutil.BgLogger().Info("on job updated", zap.String("job", job.String()))
Expand Down
2 changes: 1 addition & 1 deletion ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func (d *ddl) loadBackfillJobAndRun() {
return
}
// TODO: Adjust how the non-owner uses ReorgCtx.
d.setReorgCtxForBackfill(bfJob)
d.newReorgCtx(bfJob.JobID, bfJob.Meta.StartKey, &meta.Element{ID: bfJob.EleID, TypeKey: bfJob.EleKey}, bfJob.Meta.RowCount)
d.wg.Run(func() {
defer func() {
tidbutil.Recover(metrics.LabelDistReorg, "runBackfillJobs", nil, false)
Expand Down
3 changes: 3 additions & 0 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,9 @@ func getReorgInfo(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job,
if meta.ErrDDLReorgElementNotExist.Equal(err) {
job.SnapshotVer = 0
logutil.BgLogger().Warn("[ddl] get reorg info, the element does not exist", zap.String("job", job.String()))
if job.IsCancelling() {
return nil, nil
}
}
return &info, errors.Trace(err)
}
Expand Down

0 comments on commit c7e324e

Please sign in to comment.