From 503236f4a09bcd4bb32825535b97eab9484b685b Mon Sep 17 00:00:00 2001 From: "shenyunlong.syl" Date: Tue, 30 Jan 2024 22:53:32 +0800 Subject: [PATCH] [Feat] Add new Object format for LS Op --- .../alipay/oceanbase/rpc/ObTableClient.java | 1 + .../rpc/protocol/payload/impl/ObObj.java | 23 +- .../protocol/payload/impl/ObTableObjType.java | 309 ++++++++++++++++++ .../payload/impl/ObTableSerialUtil.java | 149 +++++++++ .../impl/execute/ObTableSingleOpEntity.java | 27 +- .../impl/execute/ObTableSingleOpQuery.java | 9 +- .../impl/execute/ObTableTabletOpResult.java | 1 + .../impl/execute/query/ObNewRange.java | 8 + .../rpc/ObTableCheckAndInsUpTest.java | 85 +++++ src/test/resources/ci.sql | 14 + 10 files changed, 603 insertions(+), 23 deletions(-) create mode 100644 src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObTableObjType.java create mode 100644 src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObTableSerialUtil.java diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java index 7595c293..75d0cca7 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java +++ b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java @@ -388,6 +388,7 @@ private void initMetadata() throws Exception { "The addr{}:{} failed to put into table roster, the node status may be wrong, Ignore", addr.getIp(), addr.getSvrPort()); RUNTIME.warn("initMetadata meet exception", e); + e.printStackTrace(); } } if (servers.isEmpty()) { diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObObj.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObObj.java index 10317fc2..7cc115a7 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObObj.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObObj.java @@ -26,9 +26,12 @@ public class ObObj implements ObSimplePayload { private static ObObj MAX_OBJECT; private static ObObj MIN_OBJECT; + private static long MAX_OBJECT_VALUE = -2L; + private static long MIN_OBJECT_VALUE = -3L; + static { - MAX_OBJECT = new ObObj(ObObjType.ObExtendType.getDefaultObjMeta(), -2L); - MIN_OBJECT = new ObObj(ObObjType.ObExtendType.getDefaultObjMeta(), -3L); + MAX_OBJECT = new ObObj(ObObjType.ObExtendType.getDefaultObjMeta(), MAX_OBJECT_VALUE); + MIN_OBJECT = new ObObj(ObObjType.ObExtendType.getDefaultObjMeta(), MIN_OBJECT_VALUE); } /* @@ -156,7 +159,11 @@ public void setValue(Object value) { */ public static ObObj getInstance(Object value) { ObObjMeta meta = ObObjType.defaultObjMeta(value); - return new ObObj(meta, value); + if (value instanceof ObObj) { + return new ObObj(meta, ((ObObj) value).getValue()); + } else { + return new ObObj(meta, value); + } } /* @@ -181,4 +188,14 @@ public String toString() { return "ObObj{" + "meta=" + meta + ", valueLength=" + valueLength + ", value=" + value + '}'; } + + public boolean isMinObj() { + return (getMeta().getType() == ObObjType.ObExtendType) + && ((long) getValue() == MIN_OBJECT_VALUE); + } + + public boolean isMaxObj() { + return (getMeta().getType() == ObObjType.ObExtendType) + && ((long) getValue() == MAX_OBJECT_VALUE); + } } diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObTableObjType.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObTableObjType.java new file mode 100644 index 00000000..1e8ff427 --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObTableObjType.java @@ -0,0 +1,309 @@ +/*- + * #%L + * OBKV Table Client Framework + * %% + * Copyright (C) 2024 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +package com.alipay.oceanbase.rpc.protocol.payload.impl; + +import com.alipay.oceanbase.rpc.util.Serialization; +import io.netty.buffer.ByteBuf; + +import java.util.HashMap; +import java.util.Map; + +public enum ObTableObjType { + + ObTableNullType(0) { + }, + + ObTableTinyIntType(1) { + }, + + ObTableSmallIntType(2) { + }, + + ObTableInt32Type(3) { + }, + + ObTableInt64Type(4) { + }, + + ObTableVarcharType(5) { + }, + + ObTableVarbinaryType(6) { + @Override + public void decode(ByteBuf buf, ObObj obj) { + ObObjType objType = getObjType(this); + ObObjMeta objMeta = objType.getDefaultObjMeta(); + objMeta.setCsType(ObCollationType.CS_TYPE_BINARY); + obj.setMeta(objMeta); + obj.setValue(objType.decode(buf, objMeta.getCsType())); + } + }, + + ObTableDoubleType(7) { + }, + + ObTableFloatType(8) { + }, + + ObTableTimestampType(9) { + @Override + public byte[] encode(ObObj obj) { + return encodeWithMeta(obj); + } + + @Override + public void decode(ByteBuf buf, ObObj obj) { + decodeWithMeta(buf, obj); + } + + @Override + public int getEncodedSize(ObObj obj) { + return getEncodedSizeWithMeta(obj); + } + }, + + ObTableDateTimeType(10) { + @Override + public byte[] encode(ObObj obj) { + return encodeWithMeta(obj); + } + + @Override + public void decode(ByteBuf buf, ObObj obj) { + decodeWithMeta(buf, obj); + } + + @Override + public int getEncodedSize(ObObj obj) { + return getEncodedSizeWithMeta(obj); + } + }, + + ObTableMinType(11) { + public byte[] encode(ObObj obj) { + byte[] bytes = new byte[this.getEncodedSize(obj)]; + int idx = 0; + System.arraycopy(Serialization.encodeI8(this.getValue()), 0, bytes, idx, 1); + idx += 1; + return bytes; + } + + public void decode(ByteBuf buf, ObObj obj) { + ObObjType objType = getObjType(this); + ObObjMeta objMeta = objType.getDefaultObjMeta(); + obj.setMeta(objMeta); + obj.setValue(-2L); + } + + public int getEncodedSize(ObObj obj) { + return DEFAULT_TABLE_OBJ_TYPE_SIZE; + } + }, + + ObTableMaxType(12) { + public byte[] encode(ObObj obj) { + byte[] bytes = new byte[this.getEncodedSize(obj)]; + int idx = 0; + System.arraycopy(Serialization.encodeI8(this.getValue()), 0, bytes, idx, 1); + idx += 1; + return bytes; + } + + public void decode(ByteBuf buf, ObObj obj) { + ObObjType objType = getObjType(this); + ObObjMeta objMeta = objType.getDefaultObjMeta(); + obj.setMeta(objMeta); + obj.setValue(-3L); + } + + public int getEncodedSize(ObObj obj) { + return DEFAULT_TABLE_OBJ_TYPE_SIZE; + } + }, + + // 13 ObTableUTinyIntType + // 14 ObTableUSmallIntType + // 15 ObTableUInt32Type + // 16 ObTableUInt64Type + + ObTableInvalidType(17) { + }; + + private int value; + private static Map map = new HashMap(); + + ObTableObjType(int value) { + this.value = value; + } + + static { + for (ObTableObjType type : ObTableObjType.values()) { + map.put(type.value, type); + } + } + + public static ObTableObjType getTableObjType(ObObj obj) { + ObObjType objType = obj.getMeta().getType(); + ObCollationType objCsType = obj.getMeta().getCsType(); + if (objType == ObObjType.ObNullType) { + // only for GET operation default value + return ObTableNullType; + } else if (objType == ObObjType.ObTinyIntType) { + return ObTableObjType.ObTableTinyIntType; + } else if (objType == ObObjType.ObSmallIntType) { + return ObTableObjType.ObTableSmallIntType; + } else if (objType == ObObjType.ObInt32Type) { + return ObTableObjType.ObTableInt32Type; + } else if (objType == ObObjType.ObInt64Type) { + return ObTableObjType.ObTableInt64Type; + } else if (objType == ObObjType.ObVarcharType) { + if (objCsType == ObCollationType.CS_TYPE_BINARY) { + return ObTableObjType.ObTableVarbinaryType; + } else { + return ObTableObjType.ObTableVarcharType; + } + } else if (objType == ObObjType.ObDoubleType) { + return ObTableObjType.ObTableDoubleType; + } else if (objType == ObObjType.ObFloatType) { + return ObTableObjType.ObTableFloatType; + } else if (objType == ObObjType.ObTimestampType) { + return ObTableObjType.ObTableTimestampType; + } else if (objType == ObObjType.ObDateTimeType) { + return ObTableObjType.ObTableDateTimeType; + } else if (objType == ObObjType.ObExtendType) { + if (obj.isMinObj()) { + return ObTableObjType.ObTableMinType; + } else if (obj.isMaxObj()) { + return ObTableObjType.ObTableMaxType; + } + } + + throw new IllegalArgumentException("cannot get ObTableObjType, invalid ob obj type: " + + objType.getClass().getName()); + } + + public static ObObjType getObjType(ObTableObjType tableObjType) { + if (tableObjType == ObTableNullType) { + return ObObjType.ObNullType; + } else if (tableObjType == ObTableTinyIntType) { + return ObObjType.ObTinyIntType; + } else if (tableObjType == ObTableSmallIntType) { + return ObObjType.ObSmallIntType; + } else if (tableObjType == ObTableInt32Type) { + return ObObjType.ObInt32Type; + } else if (tableObjType == ObTableInt64Type) { + return ObObjType.ObInt64Type; + } else if (tableObjType == ObTableVarcharType) { + return ObObjType.ObVarcharType; + } else if (tableObjType == ObTableVarbinaryType) { + return ObObjType.ObVarcharType; + } else if (tableObjType == ObTableDoubleType) { + return ObObjType.ObDoubleType; + } else if (tableObjType == ObTableFloatType) { + return ObObjType.ObFloatType; + } else if (tableObjType == ObTableTimestampType) { + return ObObjType.ObTimestampType; + } else if (tableObjType == ObTableDateTimeType) { + return ObObjType.ObDateTimeType; + } + + throw new IllegalArgumentException("cannot get ObTableObjType, invalid table obj type: " + + tableObjType.getClass().getName()); + } + + /* + * Value of. + */ + public static ObTableObjType valueOf(int value) { + return map.get(value); + } + + /* + * Get value. + */ + public byte getValue() { + return (byte) value; + } + + public byte[] encode(ObObj obj) { + ObObjType objType = obj.getMeta().getType(); + byte[] bytes = new byte[this.getEncodedSize(obj)]; + int idx = 0; + System.arraycopy(Serialization.encodeI8(this.getValue()), 0, bytes, idx, 1); + idx += 1; + + byte[] valueBytes = objType.encode(obj.getValue()); + System.arraycopy(valueBytes, 0, bytes, idx, valueBytes.length); + idx += valueBytes.length; + + return bytes; + } + + public void decode(ByteBuf buf, ObObj obj) { + ObObjType objType = getObjType(this); + ObObjMeta objMeta = objType.getDefaultObjMeta(); + obj.setMeta(objMeta); + obj.setValue(objType.decode(buf, objMeta.getCsType())); + } + + public int getEncodedSize(ObObj obj) { + ObObjType objType = obj.getMeta().getType(); + return DEFAULT_TABLE_OBJ_TYPE_SIZE + objType.getEncodedSize(obj.getValue()); + } + + public byte[] encodeWithMeta(ObObj obj) { + ObObjType objType = obj.getMeta().getType(); + ObTableObjType tableObjType = getTableObjType(obj); + byte[] bytes = new byte[tableObjType.getEncodedSize(obj)]; + int idx = 0; + System.arraycopy(Serialization.encodeI8(tableObjType.getValue()), 0, bytes, idx, 1); + idx += 1; + System.arraycopy(Serialization.encodeI8(obj.getMeta().getCsLevel().getByteValue()), 0, + bytes, idx, 1); + idx += 1; + System.arraycopy(Serialization.encodeI8(obj.getMeta().getCsType().getByteValue()), 0, + bytes, idx, 1); + idx += 1; + System.arraycopy(Serialization.encodeI8(obj.getMeta().getScale()), 0, bytes, idx, 1); + idx += 1; + byte[] valueBytes = objType.encode(obj.getValue()); + System.arraycopy(valueBytes, 0, bytes, idx, valueBytes.length); + idx += valueBytes.length; + + return bytes; + } + + public void decodeWithMeta(ByteBuf buf, ObObj obj) { + ObObjType objType = getObjType(this); + ObObjMeta meta = obj.getMeta(); + meta.setType(objType); + meta.setCsLevel(ObCollationLevel.valueOf(Serialization.decodeI8(buf.readByte()))); + meta.setCsType(ObCollationType.valueOf(Serialization.decodeI8(buf.readByte()))); + meta.setScale(Serialization.decodeI8(buf.readByte())); + obj.setValue(objType.decode(buf, meta.getCsType())); + } + + public int getEncodedSizeWithMeta(ObObj obj) { + ObObjType objType = getObjType(this); + int len = DEFAULT_TABLE_OBJ_META_SIZE + objType.getEncodedSize(obj.getValue()); + return len; + } + + public static int DEFAULT_TABLE_OBJ_TYPE_SIZE = 1; + public static int DEFAULT_TABLE_OBJ_META_SIZE = 4; +} diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObTableSerialUtil.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObTableSerialUtil.java new file mode 100644 index 00000000..29805cd3 --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObTableSerialUtil.java @@ -0,0 +1,149 @@ +/*- + * #%L + * OBKV Table Client Framework + * %% + * Copyright (C) 2024 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +package com.alipay.oceanbase.rpc.protocol.payload.impl; + +import com.alipay.oceanbase.rpc.ObGlobal; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObBorderFlag; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObNewRange; +import com.alipay.oceanbase.rpc.util.Serialization; +import io.netty.buffer.ByteBuf; + +import static com.alipay.oceanbase.rpc.protocol.payload.impl.ObTableObjType.*; + +public class ObTableSerialUtil { + static public int getEncodedSize(ObObj obj) { + return getTableObjType(obj).getEncodedSize(obj); + } + + static public byte[] encode(ObObj obj) { + return getTableObjType(obj).encode(obj); + } + + static public void decode(ByteBuf buf, ObObj obj) { + ObTableObjType tableObjType = decodeTableObjType(buf); + tableObjType.decode(buf, obj); + } + + public static ObTableObjType decodeTableObjType(ByteBuf buf) { + if (buf == null) { + throw new IllegalArgumentException("cannot get ObTableObjType, buf is null"); + } + byte type = Serialization.decodeI8(buf); + return ObTableObjType.valueOf(type); + } + + static public int getEncodedSize(ObNewRange range) { + int encodedSize = 0; + encodedSize += Serialization.getNeedBytes(range.getTableId()); + encodedSize += 1; // borderFlag + ObRowKey startKey = range.getStartKey(); + long rowkeySize = startKey.getObjCount(); + encodedSize += Serialization.getNeedBytes(rowkeySize); + for (int i = 0; i < rowkeySize; i++) { + ObObj obObj = startKey.getObj(i); + encodedSize += ObTableSerialUtil.getEncodedSize(obObj); + } + + ObRowKey endKey = range.getEndKey(); + rowkeySize = endKey.getObjCount(); + encodedSize += Serialization.getNeedBytes(rowkeySize); + for (int i = 0; i < rowkeySize; i++) { + ObObj obObj = endKey.getObj(i); + encodedSize += ObTableSerialUtil.getEncodedSize(obObj); + } + + if (ObGlobal.obVsnMajor() >= 4) { + encodedSize += Serialization.getNeedBytes(range.getFlag()); + } + + return encodedSize; + } + + static public byte[] encode(ObNewRange range) { + byte[] bytes = new byte[getEncodedSize(range)]; + int idx = 0; + + long tableId = range.getTableId(); + int len = Serialization.getNeedBytes(tableId); + System.arraycopy(Serialization.encodeVi64(tableId), 0, bytes, idx, len); + idx += len; + System + .arraycopy(Serialization.encodeI8(range.getBorderFlag().getValue()), 0, bytes, idx, 1); + idx += 1; + + ObRowKey startKey = range.getStartKey(); + long rowkeySize = startKey.getObjCount(); + len = Serialization.getNeedBytes(rowkeySize); + System.arraycopy(Serialization.encodeVi64(rowkeySize), 0, bytes, idx, len); + idx += len; + for (int i = 0; i < rowkeySize; i++) { + ObObj obObj = startKey.getObj(i); + byte[] objBytes = ObTableSerialUtil.encode(obObj); + System.arraycopy(objBytes, 0, bytes, idx, objBytes.length); + idx += objBytes.length; + } + + ObRowKey endKey = range.getEndKey(); + rowkeySize = endKey.getObjCount(); + len = Serialization.getNeedBytes(rowkeySize); + System.arraycopy(Serialization.encodeVi64(rowkeySize), 0, bytes, idx, len); + idx += len; + for (int i = 0; i < rowkeySize; i++) { + ObObj obObj = endKey.getObj(i); + byte[] objBytes = ObTableSerialUtil.encode(obObj); + System.arraycopy(objBytes, 0, bytes, idx, objBytes.length); + idx += objBytes.length; + } + + if (ObGlobal.obVsnMajor() >= 4) { + long flag = range.getFlag(); + len = Serialization.getNeedBytes(flag); + System.arraycopy(Serialization.encodeVi64(flag), 0, bytes, idx, len); + idx += len; + } + + return bytes; + } + + static public void decode(ByteBuf buf, ObNewRange range) { + range.setTableId(Serialization.decodeVi64(buf)); + range.setBorderFlag(ObBorderFlag.valueOf(Serialization.decodeI8(buf.readByte()))); + + long rowkeySize = Serialization.decodeVi64(buf); + ObRowKey startKey = new ObRowKey(); + for (int i = 0; i < rowkeySize; i++) { + ObObj obObj = new ObObj(); + decode(buf, obObj); + startKey.addObj(obObj); + } + range.setStartKey(startKey); + + rowkeySize = Serialization.decodeVi64(buf); + ObRowKey endKey = new ObRowKey(); + for (int i = 0; i < rowkeySize; i++) { + ObObj obObj = new ObObj(); + decode(buf, obObj); + endKey.addObj(obObj); + } + range.setEndKey(endKey); + + if (ObGlobal.obVsnMajor() >= 4) { + range.setFlag(Serialization.decodeVi64(buf)); + } + } +} diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOpEntity.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOpEntity.java index cbe3ba5a..460e0539 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOpEntity.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOpEntity.java @@ -21,6 +21,7 @@ import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj; import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObjMeta; import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObjType; +import com.alipay.oceanbase.rpc.protocol.payload.impl.ObTableSerialUtil; import com.alipay.oceanbase.rpc.util.Serialization; import io.netty.buffer.ByteBuf; @@ -68,8 +69,8 @@ public byte[] encode() { System.arraycopy(Serialization.encodeVi64(rowkey.size()), 0, bytes, idx, len); idx += len; for (ObObj obj : rowkey) { - len = obj.getEncodedSize(); - System.arraycopy(obj.encode(), 0, bytes, idx, len); + len = ObTableSerialUtil.getEncodedSize(obj); + System.arraycopy(ObTableSerialUtil.encode(obj), 0, bytes, idx, len); idx += len; } @@ -93,8 +94,8 @@ public byte[] encode() { System.arraycopy(Serialization.encodeVi64(propertiesValues.size()), 0, bytes, idx, len); idx += len; for (ObObj obj : propertiesValues) { - len = obj.getEncodedSize(); - System.arraycopy(obj.encode(), 0, bytes, idx, len); + len = ObTableSerialUtil.getEncodedSize(obj); + System.arraycopy(ObTableSerialUtil.encode(obj), 0, bytes, idx, len); idx += len; } @@ -132,7 +133,7 @@ public Object decode(ByteBuf buf) { int len = (int) Serialization.decodeVi64(buf); for (int i = 0; i < len; i++) { ObObj obj = new ObObj(); - obj.decode(buf); + ObTableSerialUtil.decode(buf, obj); rowkey.add(obj); } @@ -144,7 +145,7 @@ public Object decode(ByteBuf buf) { len = (int) Serialization.decodeVi64(buf); for (int i = 0; i < len; i++) { ObObj obj = new ObObj(); - obj.decode(buf); + ObTableSerialUtil.decode(buf, obj); propertiesValues.add(obj); } @@ -163,7 +164,7 @@ public long getPayloadContentSize() { payloadContentSize += Serialization.getNeedBytes(rowkey.size()); for (ObObj obj : rowkey) { - payloadContentSize += obj.getEncodedSize(); + payloadContentSize += ObTableSerialUtil.getEncodedSize(obj); } if (ignoreEncodePropertiesColumnNames) { @@ -175,7 +176,7 @@ public long getPayloadContentSize() { payloadContentSize += Serialization.getNeedBytes(propertiesValues.size()); for (ObObj obj : propertiesValues) { - payloadContentSize += obj.getEncodedSize(); + payloadContentSize += ObTableSerialUtil.getEncodedSize(obj); } return payloadContentSize; @@ -204,10 +205,7 @@ public static ObTableSingleOpEntity getInstance(String[] rowKeyNames, Object[] r for (int i = 0; i < rowKey.length; i++) { String name = rowKeyNames[i]; Object rowkey = rowKey[i]; - ObObjMeta rowkeyMeta = ObObjType.defaultObjMeta(rowkey); - ObObj obj = new ObObj(); - obj.setMeta(rowkeyMeta); - obj.setValue(rowkey); + ObObj obj = ObObj.getInstance(rowkey); entity.addRowKeyValue(name, obj); } } @@ -215,10 +213,7 @@ public static ObTableSingleOpEntity getInstance(String[] rowKeyNames, Object[] r for (int i = 0; i < propertiesNames.length; i++) { String name = propertiesNames[i]; Object value = propertiesValues[i]; - ObObjMeta meta = ObObjType.defaultObjMeta(value); - ObObj c = new ObObj(); - c.setMeta(meta); - c.setValue(value); + ObObj c = ObObj.getInstance(value); entity.addPropertyValue(name, c); } } diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOpQuery.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOpQuery.java index 4fa0c6f9..7cae5744 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOpQuery.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOpQuery.java @@ -19,6 +19,7 @@ import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload; import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj; +import com.alipay.oceanbase.rpc.protocol.payload.impl.ObTableSerialUtil; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObNewRange; import com.alipay.oceanbase.rpc.util.Serialization; import io.netty.buffer.ByteBuf; @@ -66,8 +67,8 @@ public byte[] encode() { System.arraycopy(Serialization.encodeVi64(scanRanges.size()), 0, bytes, idx, len); idx += len; for (ObNewRange range : scanRanges) { - len = range.getEncodedSize(); - System.arraycopy(range.encode(), 0, bytes, idx, len); + len = ObTableSerialUtil.getEncodedSize(range); + System.arraycopy(ObTableSerialUtil.encode(range), 0, bytes, idx, len); idx += len; } @@ -108,7 +109,7 @@ public Object decode(ByteBuf buf) { int len = (int) Serialization.decodeVi64(buf); for (int i = 0; i < len; i++) { ObNewRange range = new ObNewRange(); - range.decode(buf); + ObTableSerialUtil.decode(buf, range); scanRanges.add(range); } @@ -130,7 +131,7 @@ public long getPayloadContentSize() { payloadContentSize += Serialization.getNeedBytes(scanRanges.size()); for (ObNewRange range : scanRanges) { - payloadContentSize += range.getEncodedSize(); + payloadContentSize += ObTableSerialUtil.getEncodedSize(range); } return payloadContentSize + Serialization.getNeedBytes(indexName) diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableTabletOpResult.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableTabletOpResult.java index 40f0b3a0..4585a3b0 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableTabletOpResult.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableTabletOpResult.java @@ -127,3 +127,4 @@ public void setPropertiesColumnNames(List propertiesColumnNames) { this.propertiesColumnNames = propertiesColumnNames; } } + diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObNewRange.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObNewRange.java index 91b2db5e..9b54a51b 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObNewRange.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObNewRange.java @@ -257,4 +257,12 @@ public static ObNewRange getWholeRange() { return range; } + public long getFlag() { + return flag; + } + + public void setFlag(long flag) { + this.flag = flag; + } + } diff --git a/src/test/java/com/alipay/oceanbase/rpc/ObTableCheckAndInsUpTest.java b/src/test/java/com/alipay/oceanbase/rpc/ObTableCheckAndInsUpTest.java index cfdd8b13..fb3f338a 100644 --- a/src/test/java/com/alipay/oceanbase/rpc/ObTableCheckAndInsUpTest.java +++ b/src/test/java/com/alipay/oceanbase/rpc/ObTableCheckAndInsUpTest.java @@ -18,6 +18,7 @@ package com.alipay.oceanbase.rpc; import com.alipay.oceanbase.rpc.checkandmutate.CheckAndInsUp; +import com.alipay.oceanbase.rpc.exception.ObTableException; import com.alipay.oceanbase.rpc.filter.ObCompareOp; import com.alipay.oceanbase.rpc.filter.ObTableFilter; import com.alipay.oceanbase.rpc.mutation.BatchOperation; @@ -25,12 +26,16 @@ import com.alipay.oceanbase.rpc.mutation.InsertOrUpdate; import com.alipay.oceanbase.rpc.mutation.result.BatchOperationResult; import com.alipay.oceanbase.rpc.mutation.result.MutationResult; +import com.alipay.oceanbase.rpc.protocol.payload.ResultCodes; import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj; import com.alipay.oceanbase.rpc.util.ObTableClientTestUtil; +import com.alipay.oceanbase.rpc.util.TimeUtils; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.sql.Timestamp; +import java.util.Date; import java.util.Map; import static com.alipay.oceanbase.rpc.filter.ObTableFilterFactory.compareVal; @@ -404,4 +409,84 @@ public void testNonPartCheckAndInsUp() throws Exception { } finally { } } + + /* + CREATE TABLE IF NOT EXISTS `test_table_object` ( + `c1` tinyint primary key, + `c2` smallint not null, + `c3` int not null, + `c4` bigint not null, + `c5` varchar(128) not null, + `c6` varbinary(128) not null, + `c7` float not null, + `c8` double not null, + `c9` timestamp not null, + `c10` datetime not null + ); + */ + @Test + public void testCheckAndInsUpWithDiffObj() throws Exception { + String tableName = "test_table_object"; + if (!isVersionSupported()) { + System.out.println("current version is not supported, current version: " + + ObGlobal.OB_VERSION); + return; + } + // pre-clean data + client.delete(tableName).addScanRange(ObObj.getMin(), ObObj.getMax()).execute(); + + // 1. check not exists match: insup(1, 1, 1, 1, "hello", "world", 1.0f, 1.0d, now(), now()) if not exists c3 >= 200 + Byte c1Val = 1; + short c2Val = 1; + int c3Val = 1; + long c4Val = 1; + String c5Val = "hello"; + byte[] c6Val = "world".getBytes(); + float c7Val = 1.0f; + double c8Val = 1.0d; + long timeInMillis = System.currentTimeMillis(); + Timestamp c9Val = new Timestamp(timeInMillis); + Date c10Val = TimeUtils.strToDate("2024-01-30"); + Object c11Val = null; + + Object[] values = { c2Val, c3Val, c4Val, c5Val, c6Val, c7Val, c8Val, c9Val, c10Val, c11Val }; + String[] columns = { "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11" }; + + InsertOrUpdate insertOrUpdate1 = new InsertOrUpdate(); + insertOrUpdate1.setRowKey(row(colVal("c1", c1Val))); + for (int i = 0; i < values.length; i++) { + insertOrUpdate1.addMutateRow(row(colVal(columns[i], values[i]))); + } + ObTableFilter filter1 = compareVal(ObCompareOp.GE, "c3", 200); + CheckAndInsUp checkAndInsUp1 = new CheckAndInsUp(filter1, insertOrUpdate1, false); + BatchOperation batchOperation = client.batchOperation(tableName); + batchOperation.addOperation(checkAndInsUp1); + BatchOperationResult result = batchOperation.execute(); + Assert.assertEquals(1, result.size()); + Assert.assertEquals(1, result.get(0).getAffectedRows()); + + // 2. check not exists: insup(c1, c2, c3) (2, min, max) if not exists c3 >= 200 + // just test the table object deserialize + try { + Object[] values2 = { ObObj.getMin(), ObObj.getMax() }; + String[] columns2 = { "c2", "c3" }; + c1Val = 2; + + InsertOrUpdate insertOrUpdate2 = new InsertOrUpdate(); + insertOrUpdate2.setRowKey(row(colVal("c1", c1Val))); + for (int i = 1; i < values2.length; i++) { + insertOrUpdate2.addMutateRow(row(colVal(columns2[i], values2[i]))); + } + ObTableFilter filter2 = compareVal(ObCompareOp.GE, "c3", 200); + CheckAndInsUp checkAndInsUp2 = new CheckAndInsUp(filter2, insertOrUpdate2, false); + batchOperation = client.batchOperation(tableName); + batchOperation.addOperation(checkAndInsUp2); + result = batchOperation.execute(); + Assert.assertTrue(false); // cannot reach here + } catch (ObTableException e) { + e.printStackTrace(); + Assert + .assertEquals(ResultCodes.OB_KV_COLUMN_TYPE_NOT_MATCH.errorCode, e.getErrorCode()); + } + } } diff --git a/src/test/resources/ci.sql b/src/test/resources/ci.sql index 655c830d..95931f68 100644 --- a/src/test/resources/ci.sql +++ b/src/test/resources/ci.sql @@ -416,4 +416,18 @@ CREATE TABLE `hash_key_sub_part` ( PRIMARY KEY (`id`, `uid`, `object_id`) ) DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_general_ci PARTITION BY HASH(`id`) SUBPARTITION BY KEY(`uid`) subpartitions 4 PARTITIONS 8; +CREATE TABLE IF NOT EXISTS `test_table_object` ( + `c1` tinyint primary key, + `c2` smallint not null, + `c3` int not null, + `c4` bigint not null, + `c5` varchar(128) not null, + `c6` varbinary(128) not null, + `c7` float not null, + `c8` double not null, + `c9` timestamp(6) not null, + `c10` datetime(6) not null, + `c11` int default null +); + alter system set kv_hotkey_throttle_threshold = 50;