Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Incorrect unique index key when table is not intHandle & Duplicate values for unique indexes (#2455) #2516

Merged
merged 5 commits into from
Aug 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
407 changes: 165 additions & 242 deletions core/src/main/scala/com/pingcap/tispark/write/TiBatchWriteTable.scala

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package org.apache.spark.sql.insertion

import com.pingcap.tikv.StoreVersion
import com.pingcap.tispark.TiConfigConst
import com.pingcap.tispark.datasource.BaseBatchWriteTest
import com.pingcap.tispark.test.generator.DataGenerator._
Expand All @@ -23,7 +24,7 @@ import com.pingcap.tispark.test.generator._
import com.pingcap.tispark.utils.TiUtil
import org.apache.commons.math3.util.Combinations
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.BaseRandomDataTypeTest
import org.apache.spark.sql.types._

import scala.util.Random

Expand Down Expand Up @@ -112,6 +113,101 @@ class BatchWritePKAndIndexSuite
}
}

// https://github.com/pingcap/tispark/issues/2452
test("test duplicate unique indexes are not deleted error") {
if (!StoreVersion.minTiKVVersion("5.0.0", this.ti.tiSession.getPDClient)) {
cancel("TiDB version must bigger than 5.0.0")
}
tidbStmt.execute("drop table if exists `tispark_test`.`t`")
tidbStmt.execute("""
|CREATE TABLE `tispark_test`.`t` (
| `id` int(20),
| `name` varchar(255) primary key clustered,
| `age` int(11) null default null,
| unique index(id)
|) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin
""".stripMargin)
val schema = StructType(
List(
StructField("id", IntegerType, nullable = true),
StructField("name", StringType, nullable = false),
StructField("age", IntegerType, nullable = true)))
val tidbOptions: Map[String, String] = Map(
"tidb.addr" -> "127.0.0.1",
"tidb.password" -> "",
"tidb.port" -> "4000",
"tidb.user" -> "root",
"replace" -> "true")

val rdd1 = sc.parallelize(Seq(Row(1, "1", 0)))
val row1 = sqlContext.createDataFrame(rdd1, schema)
row1.write
.format("tidb")
.option("database", "tispark_test")
.option("table", "t")
.options(tidbOptions)
.mode("append")
.save()
val rdd2 = sc.parallelize(Seq(Row(1, "2", 0)))
val row2 = sqlContext.createDataFrame(rdd2, schema)
row2.write
.format("tidb")
.option("database", "tispark_test")
.option("table", "t")
.options(tidbOptions)
.mode("append")
.save()
tidbStmt.execute("ADMIN CHECK TABLE `tispark_test`.`t`")
assert(spark.sql("select * from `tispark_test`.`t`").count() == 1)
}

// https://github.com/pingcap/tispark/issues/2391
test("test bug fix incorrect uniqueIndex key when table is not intHandle") {
if (!StoreVersion.minTiKVVersion("5.0.0", this.ti.tiSession.getPDClient)) {
cancel("TiDB version must bigger than 5.0.0")
}
tidbStmt.execute("drop table if exists `tispark_test`.`t`")
tidbStmt.execute("""
|CREATE TABLE `tispark_test`.`t` (
| `id` int(20),
| `name` varchar(255) primary key clustered,
| `age` int(11) null default null,
| unique index(id)
|) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin
""".stripMargin)
val schema = StructType(
List(
StructField("id", IntegerType, nullable = true),
StructField("name", StringType, nullable = false),
StructField("age", IntegerType, nullable = true)))
val tidbOptions: Map[String, String] = Map(
"tidb.addr" -> "127.0.0.1",
"tidb.password" -> "",
"tidb.port" -> "4000",
"tidb.user" -> "root",
"replace" -> "true")

val rdd1 = sc.parallelize(Seq(Row(null, "1", 0)))
val row1 = sqlContext.createDataFrame(rdd1, schema)
row1.write
.format("tidb")
.option("database", "tispark_test")
.option("table", "t")
.options(tidbOptions)
.mode("append")
.save()
val rdd2 = sc.parallelize(Seq(Row(null, "2", 0)))
val row2 = sqlContext.createDataFrame(rdd2, schema)
row2.write
.format("tidb")
.option("database", "tispark_test")
.option("table", "t")
.options(tidbOptions)
.mode("append")
.save()
assert(spark.sql("select * from `tispark_test`.`t`").count() == 2)
}

private def insertAndReplace(schema: Schema): Unit = {
val tiTblInfo = getTableInfo(schema.database, schema.tableName)
val tiColInfos = tiTblInfo.getColumns
Expand Down
160 changes: 154 additions & 6 deletions tikv-client/src/main/java/com/pingcap/tikv/codec/TableCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,32 @@
import com.pingcap.tikv.key.Handle;
import com.pingcap.tikv.key.IntHandle;
import com.pingcap.tikv.meta.TiColumnInfo;
import com.pingcap.tikv.meta.TiIndexInfo;
import com.pingcap.tikv.meta.TiTableInfo;
import com.pingcap.tikv.row.Row;
import java.util.Arrays;
import java.util.List;

public class TableCodec {

// MaxOldEncodeValueLen is the maximum len of the old encoding of index value.
public static byte MaxOldEncodeValueLen = 9;
// IndexVersionFlag is the flag used to decode the index's version info.
public static byte IndexVersionFlag = 125;
// PartitionIDFlag is the flag used to decode the partition ID in global index value.
public static byte PartitionIDFlag = 126;
// CommonHandleFlag is the flag used to decode the common handle in an unique index value.
public static byte CommonHandleFlag = 127;
// RestoreDataFlag is the flag that RestoreData begin with.
// See rowcodec.Encoder.Encode and rowcodec.row.toBytes
public static byte RestoreDataFlag = (byte) RowV2.CODEC_VER;

public static class IndexValueSegments {

byte[] commonHandle;
byte[] partitionID;
byte[] restoredValues;
byte[] intHandle;
}

public static byte[] encodeRow(
List<TiColumnInfo> columnInfos,
Expand Down Expand Up @@ -64,16 +82,92 @@ public static Handle decodeHandle(byte[] value, boolean isCommonHandle) {
return new IntHandle(new CodecDataInput(value).readLong());
}

/* only for unique index */
public static byte[] genIndexValueForClusteredIndexVersion1(TiIndexInfo index, Handle handle) {
// The encoding code is written to mimic TiDB and removed some logic that we didn't support.
// The detail encoding explain can be seen here
// https://github.com/pingcap/tidb/blob/master/tablecodec/tablecodec.go#L1127
// Value layout:
// +-- IndexValueVersion0 (with common handle)
// |
// | Layout: TailLen | Options | Padding
// | Length: 1 | len(options) | len(padding)
// |
// | TailLen: len(padding)
// | Options: Encode some value for new features, such as common handle, new collations
// | or global index.
// | See below for more information.
// | Padding: Ensure length of value always >= 10. (or >= 11 if UntouchedFlag exists.)
// |
// +-- Old Encoding (integer handle, local)
// |
// | Layout: [Handle]
// | Length: 8
// |
// | Handle: Only exists in unique index.
// |
// | If no Handle , value will be one single byte '0' (i.e. []byte{'0'}).
// | Length of value <= 9, use to distinguish from the new encoding.
// |
// +-- IndexValueForClusteredIndexVersion1
// |
// | Layout: TailLen | VersionFlag | Version | Options
// | Length: 1 | 1 | 1 | len(options)
// |
// | TailLen: TailLen always be zero.
// | Options: Encode some value for new features, such as common handle, new collations
// or global index.
// | See below for more information.
// |
// | Layout of Options:
// |
// | Segment: Common Handle
// | Layout: CHandle flag | CHandle Len | CHandle
// | Length: 1 | 2 | len(CHandle)
// |
// | Common Handle Segment: Exists when unique index used common handles.
// | Global Index and New Collation in not support now.
public static byte[] genIndexValue(Handle handle, int commonHandleVersion, boolean distinct) {
if (!handle.isInt() && commonHandleVersion == 1) {
return TableCodec.genIndexValueForCommonHandleVersion1(handle, distinct);
}
return genIndexValueForClusterIndexVersion0(handle, distinct);
}

private static byte[] genIndexValueForClusterIndexVersion0(Handle handle, boolean distinct) {
if (!handle.isInt()) {
CodecDataOutput cdo = new CodecDataOutput();
int tailLen = 0;
cdo.writeByte(0);
if (distinct) {
encodeCommonHandle(cdo, handle);
}
if (cdo.size() < 10) {
int paddingLen = 10 - cdo.size();
tailLen += paddingLen;
cdo.write(new byte[paddingLen]);
}
byte[] value = cdo.toBytes();
value[0] = (byte) tailLen;
return value;
}
// When handle is int, the index encode is version 0.
if (distinct) {
CodecDataOutput valueCdo = new CodecDataOutput();
valueCdo.writeLong(handle.intValue());
return valueCdo.toBytes();
}
return new byte[] {'0'};
}

private static byte[] genIndexValueForCommonHandleVersion1(Handle handle, boolean distinct) {
CodecDataOutput cdo = new CodecDataOutput();
// add tailLen to cdo, the tailLen is always zero in tispark.
cdo.writeByte(0);
cdo.writeByte(IndexVersionFlag);
cdo.writeByte(1);

assert (index.isUnique());
encodeCommonHandle(cdo, handle);

if (distinct) {
encodeCommonHandle(cdo, handle);
}
return cdo.toBytes();
}

Expand All @@ -84,4 +178,58 @@ private static void encodeCommonHandle(CodecDataOutput cdo, Handle handle) {
cdo.writeShort(hLen);
cdo.write(encoded);
}

public static Handle decodeHandleInUniqueIndexValue(byte[] value, boolean isCommonHandle) {
if (!isCommonHandle) {
if (value.length <= MaxOldEncodeValueLen) {
return new IntHandle(new CodecDataInput(value).readLong());
}
int tailLen = value[0];
byte[] encode = Arrays.copyOfRange(value, value.length - tailLen, value.length);
return new IntHandle(new CodecDataInput(encode).readLong());
}
CodecDataInput codecDataInput = new CodecDataInput(value);
if (getIndexVersion(value) == 1) {
IndexValueSegments segments = splitIndexValueForCommonHandleVersion1(codecDataInput);
return new CommonHandle(segments.commonHandle);
}
int handleLen = ((int) value[2] << 8) + value[3];
byte[] encode = Arrays.copyOfRange(value, 4, handleLen + 4);
return new CommonHandle(encode);
}

private static int getIndexVersion(byte[] value) {
int tailLen = value[0];
if ((tailLen == 0 || tailLen == 1) && value[1] == IndexVersionFlag) {
return value[2];
}
return 0;
}

public static IndexValueSegments splitIndexValueForCommonHandleVersion1(
CodecDataInput codecDataInput) {
int tailLen = codecDataInput.readByte();
// read IndexVersionFlag
codecDataInput.readByte();
// read IndexVersion
codecDataInput.readByte();
IndexValueSegments segments = new IndexValueSegments();
if (codecDataInput.available() > 0 && codecDataInput.peekByte() == CommonHandleFlag) {
codecDataInput.readByte();
int handleLen = codecDataInput.readShort();
segments.commonHandle = new byte[handleLen];
codecDataInput.readFully(segments.commonHandle, 0, handleLen);
}
if (codecDataInput.available() > 0 && codecDataInput.peekByte() == PartitionIDFlag) {
codecDataInput.readByte();
segments.partitionID = new byte[9];
codecDataInput.readFully(segments.partitionID, 0, 9);
}
if (codecDataInput.available() > 0 && codecDataInput.peekByte() == RestoreDataFlag) {
codecDataInput.readByte();
segments.restoredValues = new byte[codecDataInput.available() - tailLen];
codecDataInput.readFully(segments.restoredValues, 0, codecDataInput.available() - tailLen);
}
return segments;
}
}