Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: Fix rename table bug #4862

Merged
merged 20 commits into from
Oct 26, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (d *ddl) onDropTable(t *meta.Meta, job *model.Job) (ver int64, _ error) {
if err != nil {
return ver, errors.Trace(err)
}
if err = t.DropTable(job.SchemaID, job.TableID, true); err != nil {
if err = t.DropTable(job.SchemaID, tblInfo, true); err != nil {
break
}
// Finish this job.
Expand Down Expand Up @@ -206,7 +206,7 @@ func (d *ddl) onTruncateTable(t *meta.Meta, job *model.Job) (ver int64, _ error)
return ver, errors.Trace(err)
}

err = t.DropTable(schemaID, tableID, true)
err = t.DropTable(schemaID, tblInfo, true)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand Down Expand Up @@ -253,7 +253,7 @@ func (d *ddl) onRenameTable(t *meta.Meta, job *model.Job) (ver int64, _ error) {
}
}

err = t.DropTable(oldSchemaID, tblInfo.ID, false)
err = t.DropTable(oldSchemaID, tblInfo, false)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand Down
27 changes: 25 additions & 2 deletions executor/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,18 +219,41 @@ func (s *testSuite) TestRenameTable(c *C) {
tk.MustExec("create database rename1")
tk.MustExec("create database rename2")
tk.MustExec("create database rename3")

tk.MustExec("create table rename1.t (a int primary key auto_increment)")
tk.MustExec("insert rename1.t values ()")
tk.MustExec("rename table rename1.t to rename2.t")
tk.MustExec("insert rename2.t values ()")
tk.MustExec("rename table rename2.t to rename3.t")
tk.MustExec("insert rename3.t values ()")
tk.MustQuery("select * from rename3.t").Check(testkit.Rows("1", "2", "3"))

tk.MustExec("drop database rename1")
tk.MustExec("drop database rename2")
tk.MustExec("drop database rename3")

tk.MustExec("create database rename1")
tk.MustExec("create database rename2")
tk.MustExec("create table rename1.t (a int primary key auto_increment)")
tk.MustExec("rename table rename1.t to rename2.t1")
tk.MustExec("insert rename2.t1 values ()")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add test case: insert rename1.t values()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename2.t1 ? Done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, I means insert back to rename1.t, it should fail as expected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did it in line 253.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay

result := tk.MustQuery("select * from rename2.t1")
result.Check(testkit.Rows("1"))
tk.MustExec("drop database rename1")
tk.MustExec("drop database rename2")

tk.MustExec("create database rename1")
tk.MustExec("create database rename2")
tk.MustExec("create table rename1.t (a int primary key auto_increment)")
tk.MustExec("insert rename1.t values ()")
tk.MustExec("rename table rename1.t to rename2.t1")
// Make sure the value is greater than autoid.step.
tk.MustExec("insert rename2.t1 values (100000)")
tk.MustExec("insert rename2.t1 values ()")
result = tk.MustQuery("select * from rename2.t1")
result.Check(testkit.Rows("1", "100000", "100001"))
_, err := tk.Exec("insert rename1.t values ()")
c.Assert(err, NotNil)
tk.MustExec("drop database rename1")
tk.MustExec("drop database rename2")
}

func (s *testSuite) TestUnsupportedCharset(c *C) {
Expand Down
35 changes: 21 additions & 14 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (m *Meta) parseDatabaseID(key string) (int64, error) {
return n, errors.Trace(err)
}

func (m *Meta) autoTalbeIDKey(tableID int64) []byte {
func (m *Meta) autoTableIDKey(tableID int64) []byte {
return []byte(fmt.Sprintf("%s:%d", mTableIDPrefix, tableID))
}

Expand All @@ -140,26 +140,24 @@ func (m *Meta) parseTableID(key string) (int64, error) {
return n, errors.Trace(err)
}

// GenAutoTableID adds step to the auto id of the table and returns the sum.
// GenAutoTableID adds step to the auto ID of the table and returns the sum.
func (m *Meta) GenAutoTableID(dbID int64, tableID int64, step int64) (int64, error) {
// Check if db exists.
dbKey := m.dbKey(dbID)
if err := m.checkDBExists(dbKey); err != nil {
id, err := m.txn.HGet(dbKey, m.autoTableIDKey(tableID))
if err != nil {
return 0, errors.Trace(err)
}

// Check if table exists.
tableKey := m.tableKey(tableID)
if err := m.checkTableExists(dbKey, tableKey); err != nil {
return 0, errors.Trace(err)
isNotExisting := len(id) == 0
if isNotExisting {
return 0, kv.ErrNotExist
}

return m.txn.HInc(dbKey, m.autoTalbeIDKey(tableID), step)
return m.txn.HInc(dbKey, m.autoTableIDKey(tableID), step)
}

// GetAutoTableID gets current auto id with table id.
func (m *Meta) GetAutoTableID(dbID int64, tableID int64) (int64, error) {
return m.txn.HGetInt64(m.dbKey(dbID), m.autoTalbeIDKey(tableID))
return m.txn.HGetInt64(m.dbKey(dbID), m.autoTableIDKey(tableID))
}

// GetSchemaVersion gets current global schema version.
Expand Down Expand Up @@ -273,6 +271,12 @@ func (m *Meta) CreateTable(dbID int64, tableInfo *model.TableInfo) error {
return errors.Trace(err)
}

// Initial the auto ID key.
base := []byte(strconv.FormatInt(0, 10))
err = m.txn.HSet(dbKey, m.autoTableIDKey(tableInfo.ID), base)
if err != nil {
return errors.Trace(err)
}
return m.txn.HSet(dbKey, tableKey, data)
}

Expand All @@ -294,15 +298,15 @@ func (m *Meta) DropDatabase(dbID int64) error {
// DropTable drops table in database.
// If delAutoID is true, it will delete the auto_increment id key-value of the table.
// For rename table, we do not need to rename auto_increment id key-value.
func (m *Meta) DropTable(dbID int64, tableID int64, delAutoID bool) error {
func (m *Meta) DropTable(dbID int64, tblInfo *model.TableInfo, delAutoID bool) error {
// Check if db exists.
dbKey := m.dbKey(dbID)
if err := m.checkDBExists(dbKey); err != nil {
return errors.Trace(err)
}

// Check if table exists.
tableKey := m.tableKey(tableID)
tableKey := m.tableKey(tblInfo.ID)
if err := m.checkTableExists(dbKey, tableKey); err != nil {
return errors.Trace(err)
}
Expand All @@ -312,7 +316,10 @@ func (m *Meta) DropTable(dbID int64, tableID int64, delAutoID bool) error {
}

if delAutoID {
if err := m.txn.HDel(dbKey, m.autoTalbeIDKey(tableID)); err != nil {
if tblInfo.OldSchemaID != 0 {
dbKey = m.dbKey(tblInfo.OldSchemaID)
}
if err := m.txn.HDel(dbKey, m.autoTableIDKey(tblInfo.ID)); err != nil {
return errors.Trace(err)
}
}
Expand Down
20 changes: 18 additions & 2 deletions meta/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (s *testSuite) TestMeta(c *C) {
c.Assert(err, IsNil)
c.Assert(n, Equals, int64(10))

err = t.DropTable(1, 2, true)
err = t.DropTable(1, tbInfo2, true)
c.Assert(err, IsNil)
// Make sure auto id key-value entry is gone.
n, err = t.GetAutoTableID(1, 2)
Expand All @@ -168,16 +168,32 @@ func (s *testSuite) TestMeta(c *C) {
n, err = t.GenAutoTableID(1, tid, 10)
c.Assert(err, IsNil)
c.Assert(n, Equals, int64(10))
// Fail to update auto id.
nonExistentID := int64(1234)
_, err = t.GenAutoTableID(1, nonExistentID, 10)
c.Assert(err, NotNil)
n, err = t.GetAutoTableID(1, tid)
c.Assert(err, IsNil)
c.Assert(n, Equals, int64(10))
// Drop table without touch auto id key-value entry.
err = t.DropTable(1, 100, false)
err = t.DropTable(1, tbInfo100, false)
c.Assert(err, IsNil)
// Make sure that auto id key-value entry is still there.
n, err = t.GetAutoTableID(1, tid)
c.Assert(err, IsNil)
c.Assert(n, Equals, int64(10))
// Drop table with old schema ID.
err = t.CreateTable(1, tbInfo100)
c.Assert(err, IsNil)
n, err = t.GenAutoTableID(1, tid, 10)
c.Assert(err, IsNil)
c.Assert(n, Equals, int64(10))
tbInfo100.OldSchemaID = 1
err = t.DropTable(1, tbInfo100, true)
c.Assert(err, IsNil)
n, err = t.GetAutoTableID(1, tid)
c.Assert(err, IsNil)
c.Assert(n, Equals, int64(0))

err = t.DropDatabase(1)
c.Assert(err, IsNil)
Expand Down
2 changes: 1 addition & 1 deletion util/admin/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (s *testSuite) TearDownSuite(c *C) {
c.Assert(err, IsNil)
t := meta.NewMeta(txn)

err = t.DropTable(s.dbInfo.ID, s.tbInfo.ID, true)
err = t.DropTable(s.dbInfo.ID, s.tbInfo, true)
c.Assert(err, IsNil)
err = t.DropDatabase(s.dbInfo.ID)
c.Assert(err, IsNil)
Expand Down