Skip to content

Commit

Permalink
Merge branch 'master' into fix_bug
Browse files Browse the repository at this point in the history
  • Loading branch information
hehechen committed Nov 16, 2022
2 parents 061d5e6 + b117a2e commit 5f23ff7
Show file tree
Hide file tree
Showing 17 changed files with 1,030 additions and 693 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -2898,8 +2898,8 @@ def go_deps():
name = "com_github_pingcap_log",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/log",
sum = "h1:T7e5Low0BU2ZazI2dz2mh3W1qv+w8wtvq1YR8DneA0c=",
version = "v1.1.1-0.20221110065318-21a4942860b3",
sum = "h1:crhkw6DD+07Bg1wYhW5Piw+kYNKZqFQqfC2puUf6gMI=",
version = "v1.1.1-0.20221116035753-734d527bc87c",
)
go_repository(
name = "com_github_pingcap_sysutil",
Expand Down
28 changes: 23 additions & 5 deletions autoid_service/autoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,12 +405,30 @@ func (s *Service) allocAutoID(ctx context.Context, req *autoid.AutoIDRequest) (*

val := s.getAlloc(req.DbID, req.TblID, req.IsUnsigned)

if req.N == 0 && val.base != 0 {
base := val.base
if req.N == 0 {
if val.base != 0 {
return &autoid.AutoIDResponse{
Min: val.base,
Max: val.base,
}, nil
}
// This item is not initialized, get the data from remote.
var currentEnd int64
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnMeta)
err := kv.RunInNewTxn(ctx, s.store, true, func(ctx context.Context, txn kv.Transaction) error {
idAcc := meta.NewMeta(txn).GetAutoIDAccessors(req.DbID, req.TblID).RowID()
var err1 error
currentEnd, err1 = idAcc.Get()
if err1 != nil {
return err1
}
val.end = currentEnd
return nil
})
return &autoid.AutoIDResponse{
Min: base,
Max: base,
}, nil
Min: currentEnd,
Max: currentEnd,
}, err
}

val.Lock()
Expand Down
186 changes: 102 additions & 84 deletions ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2894,17 +2894,23 @@ func TestAutoIncrementTableOption(t *testing.T) {
tk.MustExec("create database test_auto_inc_table_opt;")
tk.MustExec("use test_auto_inc_table_opt;")

// Empty auto_inc allocator should not cause error.
tk.MustExec("create table t (a bigint primary key clustered) auto_increment = 10;")
tk.MustExec("alter table t auto_increment = 10;")
tk.MustExec("alter table t auto_increment = 12345678901234567890;")

// Rebase the auto_inc allocator to a large integer should work.
tk.MustExec("drop table t;")
tk.MustExec("create table t (a bigint unsigned auto_increment, unique key idx(a));")
tk.MustExec("alter table t auto_increment = 12345678901234567890;")
tk.MustExec("insert into t values ();")
tk.MustQuery("select * from t;").Check(testkit.Rows("12345678901234567890"))
for _, str := range []string{"", " AUTO_ID_CACHE 1"} {
// Empty auto_inc allocator should not cause error.
tk.MustExec("create table t (a bigint primary key clustered) auto_increment = 10" + str)
tk.MustExec("alter table t auto_increment = 10;")
tk.MustExec("alter table t auto_increment = 12345678901234567890;")
tk.MustExec("drop table t;")

// Rebase the auto_inc allocator to a large integer should work.
tk.MustExec("create table t (a bigint unsigned auto_increment, unique key idx(a))" + str)
// Set auto_inc to negative is not supported
err := tk.ExecToErr("alter table t auto_increment = -1;")
require.Error(t, err)
tk.MustExec("alter table t auto_increment = 12345678901234567890;")
tk.MustExec("insert into t values ();")
tk.MustQuery("select * from t;").Check(testkit.Rows("12345678901234567890"))
tk.MustExec("drop table t;")
}
}

func TestAutoIncrementForce(t *testing.T) {
Expand All @@ -2919,83 +2925,95 @@ func TestAutoIncrementForce(t *testing.T) {
require.NoError(t, err)
return gid
}
// Rebase _tidb_row_id.
tk.MustExec("create table t (a int);")
tk.MustExec("alter table t force auto_increment = 2;")
tk.MustExec("insert into t values (1),(2);")
tk.MustQuery("select a, _tidb_rowid from t;").Check(testkit.Rows("1 2", "2 3"))
// Cannot set next global ID to 0.
tk.MustGetErrCode("alter table t force auto_increment = 0;", errno.ErrAutoincReadFailed)
tk.MustExec("alter table t force auto_increment = 1;")
require.Equal(t, uint64(1), getNextGlobalID())
// inserting new rows can overwrite the existing data.
tk.MustExec("insert into t values (3);")
require.Equal(t, "[kv:1062]Duplicate entry '2' for key 't.PRIMARY'", tk.ExecToErr("insert into t values (3);").Error())
tk.MustQuery("select a, _tidb_rowid from t;").Check(testkit.Rows("3 1", "1 2", "2 3"))

// Rebase auto_increment.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int primary key auto_increment, b int);")
tk.MustExec("insert into t values (1, 1);")
tk.MustExec("insert into t values (100000000, 1);")
tk.MustExec("delete from t where a = 100000000;")
require.Greater(t, getNextGlobalID(), uint64(100000000))
// Cannot set next global ID to 0.
tk.MustGetErrCode("alter table t /*T![force_inc] force */ auto_increment = 0;", errno.ErrAutoincReadFailed)
tk.MustExec("alter table t /*T![force_inc] force */ auto_increment = 2;")
require.Equal(t, uint64(2), getNextGlobalID())
tk.MustExec("insert into t(b) values (2);")
tk.MustQuery("select a, b from t;").Check(testkit.Rows("1 1", "2 2"))

// Rebase auto_random.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a bigint primary key auto_random(5));")
tk.MustExec("insert into t values ();")
tk.MustExec("set @@allow_auto_random_explicit_insert = true")
tk.MustExec("insert into t values (100000000);")
tk.MustExec("delete from t where a = 100000000;")
require.Greater(t, getNextGlobalID(), uint64(100000000))
// Cannot set next global ID to 0.
tk.MustGetErrCode("alter table t force auto_random_base = 0;", errno.ErrAutoincReadFailed)
tk.MustExec("alter table t force auto_random_base = 2;")
require.Equal(t, uint64(2), getNextGlobalID())
tk.MustExec("insert into t values ();")
tk.MustQuery("select (a & 3) from t order by 1;").Check(testkit.Rows("1", "2"))

// Change next global ID.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a bigint primary key auto_increment);")
tk.MustExec("insert into t values (1);")
bases := []uint64{1, 65535, 10, math.MaxUint64, math.MaxInt64 + 1, 1, math.MaxUint64, math.MaxInt64, 2}
lastBase := fmt.Sprintf("%d", bases[len(bases)-1])
for _, b := range bases {
tk.MustExec(fmt.Sprintf("alter table t force auto_increment = %d;", b))
require.Equal(t, b, getNextGlobalID())

for _, str := range []string{"", " AUTO_ID_CACHE 1"} {
// Rebase _tidb_row_id.
tk.MustExec("create table t (a int)" + str)
tk.MustExec("alter table t force auto_increment = 2;")
tk.MustExec("insert into t values (1),(2);")
tk.MustQuery("select a, _tidb_rowid from t;").Check(testkit.Rows("1 2", "2 3"))
// Cannot set next global ID to 0.
tk.MustGetErrCode("alter table t force auto_increment = 0;", errno.ErrAutoincReadFailed)
tk.MustExec("alter table t force auto_increment = 1;")
require.Equal(t, uint64(1), getNextGlobalID())
// inserting new rows can overwrite the existing data.
tk.MustExec("insert into t values (3);")
require.Equal(t, "[kv:1062]Duplicate entry '2' for key 't.PRIMARY'", tk.ExecToErr("insert into t values (3);").Error())
tk.MustQuery("select a, _tidb_rowid from t;").Check(testkit.Rows("3 1", "1 2", "2 3"))
tk.MustExec("drop table if exists t;")
}
tk.MustExec("insert into t values ();")
tk.MustQuery("select a from t;").Check(testkit.Rows("1", lastBase))
// Force alter unsigned int auto_increment column.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a bigint unsigned primary key auto_increment);")
for _, b := range bases {
tk.MustExec(fmt.Sprintf("alter table t force auto_increment = %d;", b))
require.Equal(t, b, getNextGlobalID())

for _, str := range []string{"", " AUTO_ID_CACHE 1"} {
// Rebase auto_increment.
tk.MustExec("create table t (a int primary key auto_increment, b int)" + str)
tk.MustExec("insert into t values (1, 1);")
tk.MustExec("insert into t values (100000000, 1);")
tk.MustExec("delete from t where a = 100000000;")
require.Greater(t, getNextGlobalID(), uint64(100000000))
// Cannot set next global ID to 0.
tk.MustGetErrCode("alter table t /*T![force_inc] force */ auto_increment = 0;", errno.ErrAutoincReadFailed)
tk.MustExec("alter table t /*T![force_inc] force */ auto_increment = 2;")
require.Equal(t, uint64(2), getNextGlobalID())
tk.MustExec("insert into t(b) values (2);")
tk.MustQuery("select a, b from t;").Check(testkit.Rows("1 1", "2 2"))
tk.MustExec("drop table if exists t;")
}

for _, str := range []string{"", " AUTO_ID_CACHE 1"} {
// Rebase auto_random.
tk.MustExec("create table t (a bigint primary key auto_random(5))" + str)
tk.MustExec("insert into t values ();")
tk.MustExec("set @@allow_auto_random_explicit_insert = true")
tk.MustExec("insert into t values (100000000);")
tk.MustExec("delete from t where a = 100000000;")
require.Greater(t, getNextGlobalID(), uint64(100000000))
// Cannot set next global ID to 0.
tk.MustGetErrCode("alter table t force auto_random_base = 0;", errno.ErrAutoincReadFailed)
tk.MustExec("alter table t force auto_random_base = 2;")
require.Equal(t, uint64(2), getNextGlobalID())
tk.MustExec("insert into t values ();")
tk.MustQuery("select a from t;").Check(testkit.Rows(fmt.Sprintf("%d", b)))
tk.MustExec("delete from t;")
tk.MustQuery("select (a & 3) from t order by 1;").Check(testkit.Rows("1", "2"))
tk.MustExec("drop table if exists t;")
}

// Force alter with @@auto_increment_increment and @@auto_increment_offset.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t(a int key auto_increment);")
tk.MustExec("set @@auto_increment_offset=2;")
tk.MustExec("set @@auto_increment_increment = 11;")
tk.MustExec("insert into t values (500);")
tk.MustExec("alter table t force auto_increment=100;")
tk.MustExec("insert into t values (), ();")
tk.MustQuery("select * from t;").Check(testkit.Rows("101", "112", "500"))
tk.MustQuery("select * from t order by a;").Check(testkit.Rows("101", "112", "500"))
tk.MustExec("drop table if exists t;")
for _, str := range []string{"", " AUTO_ID_CACHE 1"} {
// Change next global ID.
tk.MustExec("create table t (a bigint primary key auto_increment)" + str)
tk.MustExec("insert into t values (1);")
bases := []uint64{1, 65535, 10, math.MaxUint64, math.MaxInt64 + 1, 1, math.MaxUint64, math.MaxInt64, 2}
lastBase := fmt.Sprintf("%d", bases[len(bases)-1])
for _, b := range bases {
fmt.Println("execute alter table force increment to ==", b)
tk.MustExec(fmt.Sprintf("alter table t force auto_increment = %d;", b))
require.Equal(t, b, getNextGlobalID())
}
tk.MustExec("insert into t values ();")
tk.MustQuery("select a from t;").Check(testkit.Rows("1", lastBase))
// Force alter unsigned int auto_increment column.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a bigint unsigned primary key auto_increment)" + str)
for _, b := range bases {
tk.MustExec(fmt.Sprintf("alter table t force auto_increment = %d;", b))
require.Equal(t, b, getNextGlobalID())
tk.MustExec("insert into t values ();")
tk.MustQuery("select a from t;").Check(testkit.Rows(fmt.Sprintf("%d", b)))
tk.MustExec("delete from t;")
}
tk.MustExec("drop table if exists t;")
}

for _, str := range []string{"", " AUTO_ID_CACHE 1"} {
// Force alter with @@auto_increment_increment and @@auto_increment_offset.
tk.MustExec("create table t(a int key auto_increment)" + str)
tk.MustExec("set @@auto_increment_offset=2;")
tk.MustExec("set @@auto_increment_increment = 11;")
tk.MustExec("insert into t values (500);")
tk.MustExec("alter table t force auto_increment=100;")
tk.MustExec("insert into t values (), ();")
tk.MustQuery("select * from t;").Check(testkit.Rows("101", "112", "500"))
tk.MustQuery("select * from t order by a;").Check(testkit.Rows("101", "112", "500"))
tk.MustExec("drop table if exists t;")
}

// Check for warning in case we can't set the auto_increment to the desired value
tk.MustExec("create table t(a int primary key auto_increment)")
Expand Down
2 changes: 1 addition & 1 deletion ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (dc *ddlCtx) excludeJobIDs() string {
}

const (
getJobSQL = "select job_meta, processing from mysql.tidb_ddl_job where job_id in (select min(job_id) from mysql.tidb_ddl_job group by schema_ids, table_ids) and %s reorg %s order by processing desc, job_id"
getJobSQL = "select job_meta, processing from mysql.tidb_ddl_job where job_id in (select min(job_id) from mysql.tidb_ddl_job group by schema_ids, table_ids, processing order by processing desc limit 1) and %s reorg %s order by processing desc, job_id"
)

type jobType int
Expand Down
65 changes: 65 additions & 0 deletions ddl/job_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,68 @@ func check(t *testing.T, record []int64, ids ...int64) {
}
}
}

func TestAlwaysChoiceProcessingJob(t *testing.T) {
if !variable.EnableConcurrentDDL.Load() {
t.Skipf("test requires concurrent ddl")
}
store, dom := testkit.CreateMockStoreAndDomain(t)

d := dom.DDL()

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(a int, b int)")

ddlJobs := []string{
"alter table t add index idx(a)",
"alter table t add index idx(b)",
}

hook := &ddl.TestDDLCallback{}
var wg util.WaitGroupWrapper
wg.Add(1)
var once sync.Once
var idxa, idxb int64
hook.OnGetJobBeforeExported = func(jobType string) {
once.Do(func() {
var jobs []*model.Job
for i, job := range ddlJobs {
wg.Run(func() {
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
recordSet, _ := tk.Exec(job)
if recordSet != nil {
require.NoError(t, recordSet.Close())
}
})
for {
time.Sleep(time.Millisecond * 100)
var err error
jobs, err = ddl.GetAllDDLJobs(testkit.NewTestKit(t, store).Session(), nil)
require.NoError(t, err)
if len(jobs) == i+1 {
break
}
}
}
idxa = jobs[0].ID
idxb = jobs[1].ID
require.Greater(t, idxb, idxa)
tk := testkit.NewTestKit(t, store)
tk.MustExec("update mysql.tidb_ddl_job set processing = 1 where job_id = ?", idxb)
wg.Done()
})
}

record := make([]int64, 0, 16)
hook.OnGetJobAfterExported = func(jobType string, job *model.Job) {
// record the job schedule order
record = append(record, job.ID)
}

d.SetHook(hook)
wg.Wait()

check(t, record, idxb, idxa)
}
24 changes: 24 additions & 0 deletions executor/autoidtest/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
load("@io_bazel_rules_go//go:def.bzl", "go_test")

go_test(
name = "autoidtest_test",
srcs = [
"autoid_test.go",
"main_test.go",
],
flaky = True,
race = "on",
deps = [
"//config",
"//ddl/testutil",
"//meta/autoid",
"//parser/mysql",
"//session",
"//sessionctx/variable",
"//testkit",
"//testkit/testutil",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//tikv",
"@org_uber_go_goleak//:goleak",
],
)

0 comments on commit 5f23ff7

Please sign in to comment.