Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

partition: reorganized the execution order of partition table update operators #52442

Merged
merged 4 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 0 additions & 10 deletions pkg/table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package tables

import (
"bytes"
"context"
"sync"
"time"
Expand Down Expand Up @@ -355,15 +354,6 @@ func (c *index) Create(sctx table.MutateContext, txn kv.Transaction, indexedValu
}
continue
}
if c.idxInfo.Global && len(value) != 0 && !bytes.Equal(value, idxVal) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added by #42311. After change the update operators, no need this code anymore.

val := idxVal
err = txn.GetMemBuffer().Set(key, val)
if err != nil {
return nil, err
}
continue
}

if keyIsTempIdxKey && !tempIdxVal.IsEmpty() {
value = tempIdxVal.Current().Value
}
Expand Down
53 changes: 31 additions & 22 deletions pkg/table/tables/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1763,24 +1763,27 @@ func partitionedTableUpdateRecord(gctx context.Context, ctx table.MutateContext,
}
}

txn, err := ctx.Txn(true)
if err != nil {
return errors.Trace(err)
}
memBuffer := txn.GetMemBuffer()
sh := memBuffer.Staging()
defer memBuffer.Cleanup(sh)

// The old and new data locate in different partitions.
// Remove record from old partition and add record to new partition.
if from != to {
_, err = t.GetPartition(to).AddRecord(ctx, newData)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the old code does not get duplicate key error in this AddRecord operation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

err = t.GetPartition(from).RemoveRecord(ctx, h, currData)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As updateRecord, remove record first.

if err != nil {
return errors.Trace(err)
}
// UpdateRecord should be side effect free, but there're two steps here.
// What would happen if step1 succeed but step2 meets error? It's hard
// to rollback.
// So this special order is chosen: add record first, errors such as
// 'Key Already Exists' will generally happen during step1, errors are
// unlikely to happen in step2.
err = t.GetPartition(from).RemoveRecord(ctx, h, currData)

_, err = t.GetPartition(to).AddRecord(ctx, newData)
if err != nil {
logutil.BgLogger().Error("update partition record fails", zap.String("message", "new record inserted while old record is not removed"), zap.Error(err))
return errors.Trace(err)
}

newTo, newFrom := int64(0), int64(0)
if _, ok := t.reorganizePartitions[to]; ok {
newTo, err = t.locateReorgPartition(ectx, newData)
Expand All @@ -1798,24 +1801,28 @@ func partitionedTableUpdateRecord(gctx context.Context, ctx table.MutateContext,
}
if newTo == newFrom && newTo != 0 {
// Update needs to be done in StateDeleteOnly as well
tbl := t.GetPartition(newTo)
return tbl.UpdateRecord(gctx, ctx, h, currData, newData, touched)
}
if newTo != 0 && t.Meta().GetPartitionInfo().DDLState != model.StateDeleteOnly {
tbl := t.GetPartition(newTo)
_, err = tbl.AddRecord(ctx, newData)
err = t.GetPartition(newTo).UpdateRecord(gctx, ctx, h, currData, newData, touched)
if err != nil {
return errors.Trace(err)
}
memBuffer.Release(sh)
return nil
}

if newFrom != 0 {
tbl := t.GetPartition(newFrom)
err = tbl.RemoveRecord(ctx, h, currData)
err = t.GetPartition(newFrom).RemoveRecord(ctx, h, currData)
// TODO: Can this happen? When the data is not yet backfilled?
if err != nil {
return errors.Trace(err)
}
}
if newTo != 0 && t.Meta().GetPartitionInfo().DDLState != model.StateDeleteOnly {
_, err = t.GetPartition(newTo).AddRecord(ctx, newData)
if err != nil {
return errors.Trace(err)
}
}
memBuffer.Release(sh)
return nil
}
tbl := t.GetPartition(to)
Expand Down Expand Up @@ -1844,21 +1851,23 @@ func partitionedTableUpdateRecord(gctx context.Context, ctx table.MutateContext,
if err != nil {
return errors.Trace(err)
}
memBuffer.Release(sh)
return nil
}
tbl = t.GetPartition(newFrom)
err = tbl.RemoveRecord(ctx, h, currData)
if err != nil {
return errors.Trace(err)
}
if t.Meta().GetPartitionInfo().DDLState != model.StateDeleteOnly {
tbl = t.GetPartition(newTo)
_, err = tbl.AddRecord(ctx, newData)
if err != nil {
return errors.Trace(err)
}
}
tbl = t.GetPartition(newFrom)
err = tbl.RemoveRecord(ctx, h, currData)
if err != nil {
return errors.Trace(err)
}
}
memBuffer.Release(sh)
return nil
}

Expand Down
28 changes: 28 additions & 0 deletions tests/integrationtest/r/globalindex/update.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
set tidb_enable_global_index=true;
drop table if exists t;
CREATE TABLE `t` (
`a` int(11) NOT NULL,
`b` int(11) DEFAULT NULL,
`c` int(11) DEFAULT NULL,
PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,
UNIQUE KEY `idx1` (`b`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin
PARTITION BY HASH (`a`) PARTITIONS 5;
begin;
insert into t values (1, 2, 3);
insert into t values (2, 2, 3);
Error 1062 (23000): Duplicate entry '2' for key 't.idx1'
rollback;
drop table if exists t;
create table t ( a int, b int, c int, unique key idx(b))
partition by range( a ) (
partition p1 values less than (10),
partition p2 values less than (20),
partition p3 values less than (30)
);
begin;
insert into t values (1, 1, 1), (8, 8, 8), (11, 11, 11), (12, 12, 12);
update t set a = 2, b = 12 where a = 11;
Error 1062 (23000): Duplicate entry '12' for key 't.idx'
rollback;
set tidb_enable_global_index=default;
31 changes: 31 additions & 0 deletions tests/integrationtest/t/globalindex/update.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
set tidb_enable_global_index=true;
drop table if exists t;
CREATE TABLE `t` (
`a` int(11) NOT NULL,
`b` int(11) DEFAULT NULL,
`c` int(11) DEFAULT NULL,
PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,
UNIQUE KEY `idx1` (`b`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin
PARTITION BY HASH (`a`) PARTITIONS 5;

begin;
insert into t values (1, 2, 3);
--error 1062
insert into t values (2, 2, 3);
rollback;

drop table if exists t;
create table t ( a int, b int, c int, unique key idx(b))
partition by range( a ) (
partition p1 values less than (10),
partition p2 values less than (20),
partition p3 values less than (30)
);
begin;
insert into t values (1, 1, 1), (8, 8, 8), (11, 11, 11), (12, 12, 12);
--error 1062
update t set a = 2, b = 12 where a = 11;
rollback;

set tidb_enable_global_index=default;