diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java index b5ef1df0..8ff9d49c 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java +++ b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java @@ -18,15 +18,18 @@ package com.alipay.oceanbase.rpc; import com.alibaba.fastjson.JSON; +import com.alipay.oceanbase.rpc.checkandmutate.CheckAndInsUp; import com.alipay.oceanbase.rpc.constant.Constants; import com.alipay.oceanbase.rpc.exception.*; import com.alipay.oceanbase.rpc.batch.QueryByBatch; import com.alipay.oceanbase.rpc.location.LocationUtil; +import com.alipay.oceanbase.rpc.filter.ObTableFilter; import com.alipay.oceanbase.rpc.location.model.*; import com.alipay.oceanbase.rpc.location.model.partition.ObPair; import com.alipay.oceanbase.rpc.location.model.partition.ObPartIdCalculator; import com.alipay.oceanbase.rpc.location.model.partition.ObPartitionLevel; import com.alipay.oceanbase.rpc.mutation.*; +import com.alipay.oceanbase.rpc.mutation.result.MutationResult; import com.alipay.oceanbase.rpc.protocol.payload.ObPayload; import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.*; @@ -910,8 +913,8 @@ public void syncRefreshMetadata() throws Exception { * @param scanRangeColumns columns that need to be scaned * @return the real table name */ - public String getIndexTableName(final String dataTableName, final String indexName, List scanRangeColumns) - throws Exception { + public String getIndexTableName(final String dataTableName, final String indexName, + List scanRangeColumns) throws Exception { String indexTableName = dataTableName; if (indexName != null && !indexName.equals("PRIMARY")) { String tmpTableName = constructIndexTableName(dataTableName, indexName); @@ -925,9 +928,11 @@ public String getIndexTableName(final String dataTableName, final String indexNa if (indexInfo.getIndexType().isGlobalIndex()) { indexTableName = tmpTableName; if (scanRangeColumns.isEmpty()) { - throw new ObTableException("query by global index need add all index keys in order"); + throw new ObTableException( + "query by global index need add all index keys in order"); } else { - addRowKeyElement(indexTableName, scanRangeColumns.toArray(new String[scanRangeColumns.size()])); + addRowKeyElement(indexTableName, + scanRangeColumns.toArray(new String[scanRangeColumns.size()])); } } } @@ -935,17 +940,17 @@ public String getIndexTableName(final String dataTableName, final String indexNa } public String constructIndexTableName(final String dataTableName, final String indexName) - throws Exception { + throws Exception { // construct index table name TableEntry entry = tableLocations.get(dataTableName); Long dataTableId = null; try { if (entry == null) { ObServerAddr addr = serverRoster.getServer(serverAddressPriorityTimeout, - serverAddressCachingTimeout); + serverAddressCachingTimeout); dataTableId = LocationUtil.getTableIdFromRemote(addr, sysUA, - tableEntryAcquireConnectTimeout, tableEntryAcquireSocketTimeout, tenantName, - database, dataTableName); + tableEntryAcquireConnectTimeout, tableEntryAcquireSocketTimeout, tenantName, + database, dataTableName); } else { dataTableId = entry.getTableId(); } @@ -957,7 +962,7 @@ public String constructIndexTableName(final String dataTableName, final String i } public ObIndexInfo getOrRefreshIndexInfo(final String indexName, final String indexTableName) - throws Exception { + throws Exception { ObIndexInfo indexInfo = indexinfos.get(indexName); if (indexInfo != null) { return indexInfo; @@ -968,8 +973,8 @@ public ObIndexInfo getOrRefreshIndexInfo(final String indexName, final String in boolean acquired = lock.tryLock(tableEntryRefreshLockTimeout, TimeUnit.MILLISECONDS); if (!acquired) { String errMsg = "try to lock index infos refreshing timeout " + "dataSource:" - + dataSourceName + " ,indexName:" + indexName + " , timeout:" - + tableEntryRefreshLockTimeout + "."; + + dataSourceName + " ,indexName:" + indexName + " , timeout:" + + tableEntryRefreshLockTimeout + "."; RUNTIME.error(errMsg); throw new ObTableEntryRefreshException(errMsg); } @@ -979,21 +984,21 @@ public ObIndexInfo getOrRefreshIndexInfo(final String indexName, final String in return indexInfo; } else { logger.info("index info is not exist, create new index info, indexName: {}", - indexName); + indexName); int serverSize = serverRoster.getMembers().size(); int refreshTryTimes = tableEntryRefreshTryTimes > serverSize ? serverSize - : tableEntryRefreshTryTimes; + : tableEntryRefreshTryTimes; for (int i = 0; i < refreshTryTimes; i++) { ObServerAddr serverAddr = serverRoster.getServer(serverAddressPriorityTimeout, - serverAddressCachingTimeout); + serverAddressCachingTimeout); indexInfo = getIndexInfoFromRemote(serverAddr, sysUA, - tableEntryAcquireConnectTimeout, tableEntryAcquireSocketTimeout, - indexTableName); + tableEntryAcquireConnectTimeout, tableEntryAcquireSocketTimeout, + indexTableName); if (indexInfo != null) { indexinfos.put(indexName, indexInfo); } else { RUNTIME.error("get index info from remote is null, index name: {}", - indexName); + indexName); } } return indexInfo; @@ -2265,6 +2270,27 @@ public ObPayload mutationWithFilter(final TableQuery tableQuery, final Object[] final List keyRanges, final ObTableOperation operation, final boolean withResult) throws Exception { + return mutationWithFilter(tableQuery, rowKey, keyRanges, operation, withResult, false, + false); + } + + /** + * execute mutation with filter + * @param tableQuery table query + * @param rowKey row key which want to mutate + * @param keyRanges scan range + * @param operation table operation + * @param withResult whether to bring back result + * @param checkAndExecute whether execute check and execute instead of query and mutate + * @param checkExists whether to check exists or not + * @return execute result + * @throws Exception exception + */ + public ObPayload mutationWithFilter(final TableQuery tableQuery, final Object[] rowKey, + final List keyRanges, + final ObTableOperation operation, final boolean withResult, + final boolean checkAndExecute, final boolean checkExists) + throws Exception { final long start = System.currentTimeMillis(); if (tableQuery != null && tableQuery.getObTableQuery().getKeyRanges().isEmpty()) { // fill a whole range if no range is added explicitly. @@ -2287,6 +2313,8 @@ public ObPayload execute(ObPair obPair) throws Exception { request.setTableId(tableParam.getTableId()); // partId/tabletId request.setPartitionId(tableParam.getPartitionId()); + request.getTableQueryAndMutate().setIsCheckAndExecute(checkAndExecute); + request.getTableQueryAndMutate().setIsCheckNoExists(!checkExists); ObPayload result = obTable.execute(request); String endpoint = obTable.getIp() + ":" + obTable.getPort(); MonitorUtil.info(request, database, tableQuery.getTableName(), "QUERY_AND_MUTATE", @@ -2517,6 +2545,14 @@ private ObTableQueryAndMutateRequest buildObTableQueryAndMutateRequest(ObTableQu return request; } + /** + * checkAndInsUp. + */ + public CheckAndInsUp checkAndInsUp(String tableName, ObTableFilter filter, + InsertOrUpdate insUp, boolean checkExists) { + return new CheckAndInsUp(this, tableName, filter, insUp, checkExists); + } + /** * Set full username * @param fullUserName user name diff --git a/src/main/java/com/alipay/oceanbase/rpc/bolt/protocol/ObTablePacketCode.java b/src/main/java/com/alipay/oceanbase/rpc/bolt/protocol/ObTablePacketCode.java index bf3f25b0..6412f02d 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/bolt/protocol/ObTablePacketCode.java +++ b/src/main/java/com/alipay/oceanbase/rpc/bolt/protocol/ObTablePacketCode.java @@ -22,6 +22,7 @@ import com.alipay.oceanbase.rpc.protocol.payload.Pcodes; import com.alipay.oceanbase.rpc.protocol.payload.impl.direct_load.ObTableDirectLoadResult; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableBatchOperationResult; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableLSOpResult; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationResult; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.mutate.ObTableQueryAndMutateResult; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQueryResult; @@ -94,6 +95,15 @@ public ObPayload newPayload(ObRpcPacketHeader header) { return new ObTableDirectLoadResult(); } }, // + OB_TABLE_API_LS_EXECUTE(Pcodes.OB_TABLE_API_LS_EXECUTE) { + /** + * New payload. + */ + @Override + public ObPayload newPayload(ObRpcPacketHeader header) { + return new ObTableLSOpResult(); + } + }, // OB_ERROR_PACKET(Pcodes.OB_ERROR_PACKET) { /* * New payload. @@ -136,6 +146,8 @@ public static ObTablePacketCode valueOf(short value) { return OB_TABLE_API_EXECUTE_QUERY_SYNC; case Pcodes.OB_TABLE_API_DIRECT_LOAD: return OB_TABLE_API_DIRECT_LOAD; + case Pcodes.OB_TABLE_API_LS_EXECUTE: + return OB_TABLE_API_LS_EXECUTE; case Pcodes.OB_ERROR_PACKET: return OB_ERROR_PACKET; } diff --git a/src/main/java/com/alipay/oceanbase/rpc/checkandmutate/CheckAndInsUp.java b/src/main/java/com/alipay/oceanbase/rpc/checkandmutate/CheckAndInsUp.java new file mode 100644 index 00000000..1a89c75e --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/rpc/checkandmutate/CheckAndInsUp.java @@ -0,0 +1,100 @@ +/*- + * #%L + * OBKV Table Client Framework + * %% + * Copyright (C) 2024 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +package com.alipay.oceanbase.rpc.checkandmutate; + +import com.alipay.oceanbase.rpc.ObTableClient; +import com.alipay.oceanbase.rpc.exception.ObTableException; +import com.alipay.oceanbase.rpc.filter.ObTableFilter; +import com.alipay.oceanbase.rpc.mutation.InsertOrUpdate; +import com.alipay.oceanbase.rpc.mutation.result.MutationResult; +import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperation; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationType; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObNewRange; +import com.alipay.oceanbase.rpc.table.api.Table; +import com.alipay.oceanbase.rpc.table.api.TableQuery; + +import java.util.ArrayList; +import java.util.List; + +public class CheckAndInsUp { + private Table client; + private String tableName; + private ObTableFilter filter; + private InsertOrUpdate insUp; + boolean checkExists = true; + + public CheckAndInsUp(ObTableFilter filter, InsertOrUpdate insUp, boolean check_exists) + throws IllegalArgumentException { + this.filter = filter; + this.insUp = insUp; + this.checkExists = check_exists; + this.tableName = null; + this.client = null; + } + + public CheckAndInsUp(Table client, String tableName, ObTableFilter filter, + InsertOrUpdate insUp, boolean check_exists) + throws IllegalArgumentException { + this.client = client; + this.tableName = tableName; + this.filter = filter; + this.insUp = insUp; + this.checkExists = check_exists; + } + + public Object[] getRowKey() { + return insUp.getRowKey(); + } + + public ObTableFilter getFilter() { + return filter; + } + + public InsertOrUpdate getInsUp() { + return insUp; + } + + public boolean isCheckExists() { + return checkExists; + } + + public MutationResult execute() throws Exception { + if (null == tableName) { + throw new ObTableException("table name is null"); + } else if (null == client) { + throw new ObTableException("client is null"); + } else if (!(client instanceof ObTableClient)) { + throw new ObTableException("the client must be table clinet"); + } + + TableQuery query = client.query(tableName); + query.setFilter(filter); + Object[] rowKey = getRowKey(); + List ranges = new ArrayList<>(); + ObNewRange range = new ObNewRange(); + range.setStartKey(ObRowKey.getInstance(insUp.getRowKey())); + range.setEndKey(ObRowKey.getInstance(insUp.getRowKey())); + ranges.add(range); + query.getObTableQuery().setKeyRanges(ranges); + ObTableOperation operation = ObTableOperation.getInstance(ObTableOperationType.INSERT_OR_UPDATE, + insUp.getRowKey(), insUp.getColumns(), insUp.getValues()); + + return new MutationResult(((ObTableClient)client).mutationWithFilter(query, rowKey, ranges, operation, false, true, checkExists)); + } +} diff --git a/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java b/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java index bb4615b7..0e3e00ca 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java +++ b/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java @@ -62,11 +62,11 @@ public class LocationUtil { private static final String OB_VERSION_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ OB_VERSION() AS CLUSTER_VERSION;"; - private static final String PROXY_INDEX_INFO_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ data_table_id, table_id, index_type FROM oceanbase.__all_virtual_table " + - "where table_name = ?"; + private static final String PROXY_INDEX_INFO_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ data_table_id, table_id, index_type FROM oceanbase.__all_virtual_table " + + "where table_name = ?"; - private static final String PROXY_TABLE_ID_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ table_id from oceanbase.__all_virtual_proxy_schema " + - "where tenant_name = ? and database_name = ? and table_name = ? limit 1"; + private static final String PROXY_TABLE_ID_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ table_id from oceanbase.__all_virtual_proxy_schema " + + "where tenant_name = ? and database_name = ? and table_name = ? limit 1"; private static final String OB_TENANT_EXIST_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ tenant_id from __all_tenant where tenant_name = ?;"; @@ -688,8 +688,9 @@ public static TableEntry getTableEntryLocationFromRemote(Connection connection, } public static Long getTableIdFromRemote(ObServerAddr obServerAddr, ObUserAuth sysUA, - long connectTimeout, long socketTimeout, String tenantName, - String databaseName, String tableName) throws ObTableEntryRefreshException { + long connectTimeout, long socketTimeout, + String tenantName, String databaseName, String tableName) + throws ObTableEntryRefreshException { Long tableId = null; Connection connection = null; PreparedStatement ps = null; @@ -706,11 +707,11 @@ public static Long getTableIdFromRemote(ObServerAddr obServerAddr, ObUserAuth sy tableId = rs.getLong("table_id"); } else { throw new ObTableEntryRefreshException("fail to get " + tableName - + " table_id from remote"); + + " table_id from remote"); } } catch (Exception e) { throw new ObTableEntryRefreshException("fail to get " + tableName - + " table_id from remote", e); + + " table_id from remote", e); } finally { try { if (null != rs) { @@ -729,7 +730,7 @@ public static Long getTableIdFromRemote(ObServerAddr obServerAddr, ObUserAuth sy public static ObIndexInfo getIndexInfoFromRemote(ObServerAddr obServerAddr, ObUserAuth sysUA, long connectTimeout, long socketTimeout, String indexTableName) - throws ObTableEntryRefreshException { + throws ObTableEntryRefreshException { ObIndexInfo indexInfo = null; Connection connection = null; PreparedStatement ps = null; diff --git a/src/main/java/com/alipay/oceanbase/rpc/location/model/ObIndexInfo.java b/src/main/java/com/alipay/oceanbase/rpc/location/model/ObIndexInfo.java index 809e59e1..e23652ac 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/location/model/ObIndexInfo.java +++ b/src/main/java/com/alipay/oceanbase/rpc/location/model/ObIndexInfo.java @@ -68,6 +68,6 @@ public void setIndexType(ObIndexType indexType) { @Override public String toString() { return "ObIndexInfo{" + "dataTableId=" + dataTableId + ", indexTableId=" + indexTableId - + ", indexTableName=" + indexTableName + ", indexType=" + indexType + '}'; + + ", indexTableName=" + indexTableName + ", indexType=" + indexType + '}'; } } diff --git a/src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java b/src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java index 8b1e34a3..01ea4512 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java +++ b/src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java @@ -18,9 +18,11 @@ package com.alipay.oceanbase.rpc.mutation; import com.alipay.oceanbase.rpc.ObTableClient; +import com.alipay.oceanbase.rpc.checkandmutate.CheckAndInsUp; import com.alipay.oceanbase.rpc.exception.ObTableException; import com.alipay.oceanbase.rpc.mutation.result.BatchOperationResult; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationType; +import com.alipay.oceanbase.rpc.table.ObTableClientLSBatchOpsImpl; import com.alipay.oceanbase.rpc.table.api.Table; import com.alipay.oceanbase.rpc.table.api.TableBatchOps; import com.alipay.oceanbase.rpc.table.api.TableQuery; @@ -34,7 +36,8 @@ public class BatchOperation { private Table client; boolean withResult; private List operations; - boolean isAtomic = false; + boolean isAtomic = false; + boolean hasCheckAndInsUp = false; /* * default constructor @@ -96,6 +99,15 @@ public BatchOperation addOperation(List mutations) { return this; } + /* + * add CheckAndInsUp + */ + public BatchOperation addOperation(CheckAndInsUp... insUps) { + this.operations.addAll(Arrays.asList(insUps)); + this.hasCheckAndInsUp = true; + return this; + } + public BatchOperation setIsAtomic(boolean isAtomic) { this.isAtomic = isAtomic; return this; @@ -103,9 +115,16 @@ public BatchOperation setIsAtomic(boolean isAtomic) { @SuppressWarnings("unchecked") public BatchOperationResult execute() throws Exception { - // add rowkeyElement - boolean hasSetRowkeyElement = false; + if (hasCheckAndInsUp) { + return executeWithLSBatchOp(); + } else { + return executeWithNormalBatchOp(); + } + } + + public BatchOperationResult executeWithNormalBatchOp() throws Exception { TableBatchOps batchOps = client.batch(tableName); + boolean hasSetRowkeyElement = false; for (Object operation : operations) { if (operation instanceof Mutation) { @@ -170,4 +189,31 @@ public BatchOperationResult execute() throws Exception { batchOps.setAtomicOperation(isAtomic); return new BatchOperationResult(batchOps.executeWithResult()); } + + public BatchOperationResult executeWithLSBatchOp() throws Exception { + ObTableClientLSBatchOpsImpl batchOps; + if (client instanceof ObTableClient) { + batchOps = new ObTableClientLSBatchOpsImpl(tableName, (ObTableClient) client); + boolean hasSetRowkeyElement = false; + for (Object operation : operations) { + if (operation instanceof CheckAndInsUp) { + CheckAndInsUp checkAndInsUp = (CheckAndInsUp) operation; + batchOps.addOperation(checkAndInsUp); + List rowKeyNames = checkAndInsUp.getInsUp().getRowKeyNames(); + if (!hasSetRowkeyElement && rowKeyNames != null) { + ((ObTableClient) client).addRowKeyElement(tableName, + rowKeyNames.toArray(new String[0])); + hasSetRowkeyElement = true; + } + } else { + throw new IllegalArgumentException( + "the operation in LS batch must be checkAndInsUp"); + } + } + } else { + throw new IllegalArgumentException( + "execute batch using ObTable diretly is not supporeted"); + } + return new BatchOperationResult(batchOps.executeWithResult()); + } } diff --git a/src/main/java/com/alipay/oceanbase/rpc/mutation/Mutation.java b/src/main/java/com/alipay/oceanbase/rpc/mutation/Mutation.java index 109cf34d..c774781e 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/mutation/Mutation.java +++ b/src/main/java/com/alipay/oceanbase/rpc/mutation/Mutation.java @@ -89,7 +89,7 @@ protected TableQuery getQuery() { /* * get row key */ - protected Object[] getRowKey() { + public Object[] getRowKey() { return rowKey; } diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/AbstractPayload.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/AbstractPayload.java index 90d94ef6..a4e2322e 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/AbstractPayload.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/AbstractPayload.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static com.alipay.oceanbase.rpc.property.Property.RPC_OPERATION_TIMEOUT; +import static com.alipay.oceanbase.rpc.util.Serialization.encodeObUniVersionHeader; import static com.alipay.oceanbase.rpc.util.Serialization.getObUniVersionHeaderLength; /* @@ -154,4 +155,16 @@ public long getSequence() { public void setSequence(long sequence) { this.sequence = sequence; } + + /* + * encode unis header + */ + protected int encodeHeader(byte[] bytes, int idx) { + int headerLen = (int) getObUniVersionHeaderLength(getVersion(), getPayloadContentSize()); + System.arraycopy(encodeObUniVersionHeader(getVersion(), getPayloadContentSize()), 0, bytes, + idx, headerLen); + idx += headerLen; + return idx; + } + } diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/Pcodes.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/Pcodes.java index bc89ac73..35712411 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/Pcodes.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/Pcodes.java @@ -31,4 +31,5 @@ public interface Pcodes { int OB_TABLE_API_EXECUTE_QUERY_SYNC = 0x1106; int OB_TABLE_API_DIRECT_LOAD = 0x1123; + int OB_TABLE_API_LS_EXECUTE = 0x1125; } diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObIndexType.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObIndexType.java index 72140ea0..9b766c36 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObIndexType.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObIndexType.java @@ -21,10 +21,16 @@ import java.util.Map; public enum ObIndexType { - IndexTypeIsNot(0), IndexTypeNormalLocal(1), IndexTypeUniqueLocal(2), IndexTypeNormalGlobal(3), - IndexTypeUniqueGlobal(4), IndexTypePrimary(5), IndexTypeDomainCtxcat(6), IndexTypeNormalGlobalLocalStorage(7), - IndexTypeUniqueGlobalLocalStorage(8), IndexTypeSpatialLocal(10), IndexTypeSpatialGlobal(11), - IndexTypeSpatialGlobalLocalStorage(12), IndexTypeMax(13); + IndexTypeIsNot(0), IndexTypeNormalLocal(1), IndexTypeUniqueLocal(2), IndexTypeNormalGlobal(3), IndexTypeUniqueGlobal( + 4), IndexTypePrimary( + 5), IndexTypeDomainCtxcat( + 6), IndexTypeNormalGlobalLocalStorage( + 7), IndexTypeUniqueGlobalLocalStorage( + 8), IndexTypeSpatialLocal( + 10), IndexTypeSpatialGlobal( + 11), IndexTypeSpatialGlobalLocalStorage( + 12), IndexTypeMax( + 13); private int value; private static Map map = new HashMap(); @@ -62,7 +68,7 @@ public byte getByteValue() { public boolean isGlobalIndex() { return valueOf(value) == ObIndexType.IndexTypeNormalGlobal - || valueOf(value) == ObIndexType.IndexTypeUniqueGlobal - || valueOf(value) == ObIndexType.IndexTypeSpatialGlobal; + || valueOf(value) == ObIndexType.IndexTypeUniqueGlobal + || valueOf(value) == ObIndexType.IndexTypeSpatialGlobal; } } diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableAbstractOperationRequest.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableAbstractOperationRequest.java index 247f3b96..5a0199a9 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableAbstractOperationRequest.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableAbstractOperationRequest.java @@ -54,14 +54,6 @@ public long getPayloadContentSize() { + 2 + 3; } - protected int encodeHeader(byte[] bytes, int idx) { - int headerLen = (int) getObUniVersionHeaderLength(getVersion(), getPayloadContentSize()); - System.arraycopy(encodeObUniVersionHeader(getVersion(), getPayloadContentSize()), 0, bytes, - idx, headerLen); - idx += headerLen; - return idx; - } - protected int encodeCredential(byte[] bytes, int idx) { byte[] strbytes = Serialization.encodeBytesString(credential); System.arraycopy(strbytes, 0, bytes, idx, strbytes.length); diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableLSOpFlag.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableLSOpFlag.java new file mode 100644 index 00000000..6c704adb --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableLSOpFlag.java @@ -0,0 +1,56 @@ +/*- + * #%L + * OBKV Table Client Framework + * %% + * Copyright (C) 2024 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +package com.alipay.oceanbase.rpc.protocol.payload.impl.execute; + +public class ObTableLSOpFlag { + private static final int FLAG_IS_SAME_TYPE = 1 << 0; + private static final int FLAG_IS_SAME_PROPERTIES_NAMES = 1 << 1; + private long flags = 0; + + public void setFlagIsSameType(boolean isSameType) { + if (isSameType) { + flags |= FLAG_IS_SAME_TYPE; + } else { + flags &= ~FLAG_IS_SAME_TYPE; + } + } + + public void setFlagIsSamePropertiesNames(boolean isSamePropertiesNames) { + if (isSamePropertiesNames) { + flags |= FLAG_IS_SAME_PROPERTIES_NAMES; + } else { + flags &= ~FLAG_IS_SAME_PROPERTIES_NAMES; + } + } + + public long getValue() { + return flags; + } + + public void setValue(long flags) { + this.flags = flags; + } + + public boolean getFlagIsSameType() { + return (flags & FLAG_IS_SAME_TYPE) != 0; + } + + public boolean getFlagIsSamePropertiesNames() { + return (flags & FLAG_IS_SAME_PROPERTIES_NAMES) != 0; + } +} diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableLSOpRequest.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableLSOpRequest.java new file mode 100644 index 00000000..9ef4ae78 --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableLSOpRequest.java @@ -0,0 +1,147 @@ +/*- + * #%L + * com.oceanbase:obkv-table-client + * %% + * Copyright (C) 2021 - 2024 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +package com.alipay.oceanbase.rpc.protocol.payload.impl.execute; + +import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload; +import com.alipay.oceanbase.rpc.protocol.payload.Credentialable; +import com.alipay.oceanbase.rpc.protocol.payload.Pcodes; +import com.alipay.oceanbase.rpc.util.ObBytesString; +import com.alipay.oceanbase.rpc.util.Serialization; +import io.netty.buffer.ByteBuf; + +import static com.alipay.oceanbase.rpc.util.Serialization.encodeObUniVersionHeader; +import static com.alipay.oceanbase.rpc.util.Serialization.getObUniVersionHeaderLength; + +/* +OB_SERIALIZE_MEMBER(ObTableLSOpRequest, + credential_, + entity_type_, + consistency_level_, + ls_op_); + */ +public class ObTableLSOpRequest extends AbstractPayload implements Credentialable { + protected ObBytesString credential; + protected ObTableEntityType entityType = ObTableEntityType.DYNAMIC; + protected ObTableConsistencyLevel consistencyLevel = ObTableConsistencyLevel.STRONG; + private ObTableLSOperation lsOperation = new ObTableLSOperation(); + + /* + * Get pcode. + */ + @Override + public int getPcode() { + return Pcodes.OB_TABLE_API_LS_EXECUTE; + } + + /* + * Encode. + */ + @Override + public byte[] encode() { + byte[] bytes = new byte[(int) getPayloadSize()]; + int idx = 0; + + // 0. encode ObTableLSOpRequest header + idx = encodeHeader(bytes, idx); + + // 1. encode credential + byte[] strbytes = Serialization.encodeBytesString(credential); + System.arraycopy(strbytes, 0, bytes, idx, strbytes.length); + idx += strbytes.length; + + // 2. encode entity_type + System.arraycopy(Serialization.encodeI8(entityType.getByteValue()), 0, bytes, idx, 1); + idx++; + + // 3. encode consistencyLevel level + System.arraycopy(Serialization.encodeI8(consistencyLevel.getByteValue()), 0, bytes, idx, 1); + idx++; + + // 4. encode lsOperation + int len = (int) lsOperation.getPayloadSize(); + System.arraycopy(lsOperation.encode(), 0, bytes, idx, len); + idx += len; + + return bytes; + } + + /* + * Decode. + */ + @Override + public Object decode(ByteBuf buf) { + super.decode(buf); + this.credential = Serialization.decodeBytesString(buf); + this.entityType = ObTableEntityType.valueOf(buf.readByte()); + this.consistencyLevel = ObTableConsistencyLevel.valueOf(buf.readByte()); + this.lsOperation = new ObTableLSOperation(); + this.lsOperation.decode(buf); + + return this; + } + + /* + * Get payload content size. + */ + @Override + public long getPayloadContentSize() { + return lsOperation.getPayloadSize() + Serialization.getNeedBytes(credential) + 1 + 1 + 1; + } + + /* + * Get batch operation. + */ + public ObTableLSOperation getLSOperation() { + return lsOperation; + } + + /* + * Set batch operation. + */ + public void addTabletOperation(ObTableTabletOp tabletOp) { + lsOperation.addTabletOperation(tabletOp); + } + + /* + * Set entity type. + */ + public void setEntityType(ObTableEntityType entityType) { + this.entityType = entityType; + } + + /* + * Set timeout. + */ + public void setTimeout(long timeout) { + this.timeout = timeout; + } + + /* + * Set consistency level. + */ + public void setConsistencyLevel(ObTableConsistencyLevel consistencyLevel) { + this.consistencyLevel = consistencyLevel; + } + + /* + * Set credential. + */ + public void setCredential(ObBytesString credential) { + this.credential = credential; + } +} diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableLSOpResult.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableLSOpResult.java new file mode 100644 index 00000000..7544218c --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableLSOpResult.java @@ -0,0 +1,130 @@ +/*- + * #%L + * com.oceanbase:obkv-table-client + * %% + * Copyright (C) 2021 - 2024 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +package com.alipay.oceanbase.rpc.protocol.payload.impl.execute; + +import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload; +import com.alipay.oceanbase.rpc.protocol.payload.Pcodes; +import com.alipay.oceanbase.rpc.util.Serialization; +import io.netty.buffer.ByteBuf; + +import java.util.ArrayList; +import java.util.List; + +import static com.alipay.oceanbase.rpc.util.Serialization.encodeObUniVersionHeader; +import static com.alipay.oceanbase.rpc.util.Serialization.getObUniVersionHeaderLength; + +public class ObTableLSOpResult extends AbstractPayload { + + private List results = new ArrayList(); + + /* + * Get pcode. + */ + @Override + public int getPcode() { + return Pcodes.OB_TABLE_API_LS_EXECUTE; + } + + /* + * Encode. + */ + @Override + public byte[] encode() { + byte[] bytes = new byte[(int) getPayloadSize()]; + int idx = 0; + + // 0. encode header + idx = encodeHeader(bytes, idx); + + // 1. encode results + int len = Serialization.getNeedBytes(this.results.size()); + System.arraycopy(Serialization.encodeVi64(this.results.size()), 0, bytes, idx, len); + idx += len; + + for (ObTableTabletOpResult result : results) { + len = (int) result.getPayloadSize(); + System.arraycopy(result.encode(), 0, bytes, idx, len); + idx += len; + } + + return bytes; + } + + /* + * Decode. + */ + @Override + public Object decode(ByteBuf buf) { + // 0. decode version + super.decode(buf); + + // 1. decode results + int len = (int) Serialization.decodeVi64(buf); + results = new ArrayList(len); + for (int i = 0; i < len; i++) { + ObTableTabletOpResult tabletOpResult = new ObTableTabletOpResult(); + tabletOpResult.decode(buf); + results.add(tabletOpResult); + } + + return this; + } + + /* + * Get payload content size. + */ + @Override + public long getPayloadContentSize() { + long payloadContentSize = 0; + payloadContentSize += Serialization.getNeedBytes(results.size()); + for (ObTableTabletOpResult result : results) { + payloadContentSize += result.getPayloadSize(); + } + + return payloadContentSize; + } + + /* + * Get results. + */ + public List getResults() { + return results; + } + + /* + * Set results. + */ + public void setResults(List results) { + this.results = results; + } + + /* + * Add result. + */ + public void addResult(ObTableTabletOpResult result) { + this.results.add(result); + } + + /* + * Add all results. + */ + public void addAllResults(List results) { + this.results.addAll(results); + } + +} diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableLSOperation.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableLSOperation.java new file mode 100644 index 00000000..549f0686 --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableLSOperation.java @@ -0,0 +1,159 @@ +/*- + * #%L + * com.oceanbase:obkv-table-client + * %% + * Copyright (C) 2021 - 2024 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +package com.alipay.oceanbase.rpc.protocol.payload.impl.execute; + +import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload; +import com.alipay.oceanbase.rpc.util.Serialization; +import io.netty.buffer.ByteBuf; + +import java.util.ArrayList; +import java.util.List; + +import static com.alipay.oceanbase.rpc.util.Serialization.encodeObUniVersionHeader; +import static com.alipay.oceanbase.rpc.util.Serialization.getObUniVersionHeaderLength; + +public class ObTableLSOperation extends AbstractPayload { + + private List tabletOperations = new ArrayList(); + private long lsId = INVALID_LS_ID; // i64 + + private static final int LS_ID_SIZE = 8; + private static final long INVALID_LS_ID = -1; + private ObTableLSOpFlag optionFlag = new ObTableLSOpFlag(); + + /* + OB_UNIS_DEF_SERIALIZE(ObTableLSOp, + ls_id_, + option_flag_, + tablet_ops_); + */ + + /* + * Encode. + */ + @Override + public byte[] encode() { + byte[] bytes = new byte[(int) getPayloadSize()]; + int idx = 0; + + // 0. encode header + idx = encodeHeader(bytes, idx); + + // 1. encode ls id + System.arraycopy(Serialization.encodeI64(lsId), 0, bytes, idx, 8); + idx += 8; + + // 2. encode option flag + int len = Serialization.getNeedBytes(optionFlag.getValue()); + System.arraycopy(Serialization.encodeVi64(optionFlag.getValue()), 0, bytes, idx, len); + idx += len; + + // 3. encode Operation + len = Serialization.getNeedBytes(tabletOperations.size()); + System.arraycopy(Serialization.encodeVi64(tabletOperations.size()), 0, bytes, idx, len); + idx += len; + for (ObTableTabletOp tabletOperation : tabletOperations) { + len = (int) tabletOperation.getPayloadSize(); + System.arraycopy(tabletOperation.encode(), 0, bytes, idx, len); + idx += len; + } + + return bytes; + } + + /* + * Decode. + */ + @Override + public Object decode(ByteBuf buf) { + // 0. decode header + super.decode(buf); + + // 1. decode others + this.lsId = Serialization.decodeI64(buf); + + // 2. decode flags + long flagValue = Serialization.decodeVi64(buf); + optionFlag.setValue(flagValue); + + // 3. decode Operation + int len = (int) Serialization.decodeVi64(buf); + tabletOperations = new ArrayList(len); + for (int i = 0; i < len; i++) { + ObTableTabletOp tabletOperation = new ObTableTabletOp(); + tabletOperation.decode(buf); + tabletOperations.add(tabletOperation); + } + + return this; + } + + /* + * Get payload content size. + */ + @Override + public long getPayloadContentSize() { + long payloadContentSize = 0; + payloadContentSize += Serialization.getNeedBytes(tabletOperations.size()); + for (ObTableTabletOp operation : tabletOperations) { + payloadContentSize += operation.getPayloadSize(); + } + + return payloadContentSize + LS_ID_SIZE + Serialization.getNeedBytes(optionFlag.getValue()); + } + + /* + * Get table operations. + */ + public List getTabletOperations() { + return tabletOperations; + } + + /* + * Add table operation. + */ + public void addTabletOperation(ObTableTabletOp tabletOperation) { + this.tabletOperations.add(tabletOperation); + int length = this.tabletOperations.size(); + if (length == 1 && tabletOperation.isSameType()) { + setIsSameType(true); + return; + } + + if (isSameType() + && length > 1 + && !(tabletOperation.isSameType() && (this.tabletOperations.get(length - 1) + .getSingleOperations().get(0).getSingleOpType() == this.tabletOperations + .get(length - 2).getSingleOperations().get(0).getSingleOpType()))) { + setIsSameType(false); + } + } + + public void setLsId(long lsId) { + this.lsId = lsId; + } + + public boolean isSameType() { + return optionFlag.getFlagIsSameType(); + } + + public void setIsSameType(boolean isSameType) { + optionFlag.setFlagIsSameType(isSameType); + } + +} diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableQueryAndMutateFlag.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableQueryAndMutateFlag.java new file mode 100644 index 00000000..aed1d3ce --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableQueryAndMutateFlag.java @@ -0,0 +1,52 @@ +/*- + * #%L + * com.oceanbase:obkv-table-client + * %% + * Copyright (C) 2021 - 2024 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +package com.alipay.oceanbase.rpc.protocol.payload.impl.execute; + +public class ObTableQueryAndMutateFlag { + private static final int FLAG_IS_CHECK_AND_EXECUTE = 1 << 0; + private static final int FLAG_IS_CHECK_NOT_EXISTS = 1 << 1; + private long flags = 0; + + public void setIsCheckAndExecute(boolean isCheckAndExecute) { + if (isCheckAndExecute) { + flags |= FLAG_IS_CHECK_AND_EXECUTE; + } else { + flags &= ~FLAG_IS_CHECK_AND_EXECUTE; + } + } + + public void setIsCheckNotExists(boolean isCheckNotExists) { + if (isCheckNotExists) { + flags |= FLAG_IS_CHECK_NOT_EXISTS; + } else { + flags &= ~FLAG_IS_CHECK_NOT_EXISTS; + } + } + + public long getValue() { + return flags; + } + + public boolean isCheckNotExists() { + return (flags & FLAG_IS_CHECK_NOT_EXISTS) != 0; + } + + public boolean isChekAndExecute() { + return (flags & FLAG_IS_CHECK_AND_EXECUTE) != 0; + } +} diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOp.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOp.java new file mode 100644 index 00000000..903c8d88 --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOp.java @@ -0,0 +1,128 @@ +/*- + * #%L + * com.oceanbase:obkv-table-client + * %% + * Copyright (C) 2021 - 2024 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +package com.alipay.oceanbase.rpc.protocol.payload.impl.execute; + +import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.mutate.ObTableQueryAndMutate; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQuery; +import com.alipay.oceanbase.rpc.util.Serialization; +import io.netty.buffer.ByteBuf; + +import static com.alipay.oceanbase.rpc.util.Serialization.encodeObUniVersionHeader; +import static com.alipay.oceanbase.rpc.util.Serialization.getObUniVersionHeaderLength; + +public class ObTableSingleOp extends AbstractPayload { + private ObTableQueryAndMutate queryAndMutate = new ObTableQueryAndMutate(); + private ObTableSingleOpType singleOpType; + + /* + * Encode. + */ + @Override + public byte[] encode() { + byte[] bytes = new byte[(int) getPayloadSize()]; + int idx = 0; + + // 0. encode header + idx = encodeHeader(bytes, idx); + + // 1. encode op type + byte opTypeVal = singleOpType.getValue(); + System.arraycopy(Serialization.encodeI8(opTypeVal), 0, bytes, idx, 1); + idx += 1; + + // 1. encode query and mutate/op + int len = (int) queryAndMutate.getPayloadSize(); + System.arraycopy(queryAndMutate.encode(), 0, bytes, idx, len); + idx += len; + + return bytes; + } + + /* + * Decode. + */ + @Override + public Object decode(ByteBuf buf) { + super.decode(buf); + + this.queryAndMutate = new ObTableQueryAndMutate(); + this.queryAndMutate.decode(buf); + + byte opTypeVal = Serialization.decodeI8(buf); + + this.singleOpType.setValue(opTypeVal); + return this; + } + + /* + * Get payload content size. + */ + @Override + public long getPayloadContentSize() { + + long opTypeLen = Serialization.getNeedBytes(singleOpType.getValue()); + return queryAndMutate.getPayloadSize() + opTypeLen; + + } + + /* + * Get table query. + */ + public ObTableQuery getTableQuery() { + return queryAndMutate.getTableQuery(); + } + + /* + * Set table query. + */ + public void setTableQuery(ObTableQuery tableQuery) { + this.queryAndMutate.setTableQuery(tableQuery); + } + + /* + * Get mutations. + */ + public ObTableBatchOperation getMutations() { + return queryAndMutate.getMutations(); + } + + /* + * Set mutations. + */ + public void setMutations(ObTableBatchOperation mutations) { + this.queryAndMutate.setMutations(mutations); + } + + public void setIsCheckAndExecute(boolean isCheckAndExecute) { + queryAndMutate.setIsCheckAndExecute(isCheckAndExecute); + } + + public void setIsCheckNoExists(boolean isCheckNoExists) { + queryAndMutate.setIsCheckNoExists(isCheckNoExists); + } + + public ObTableSingleOpType getSingleOpType() { + return singleOpType; + } + + public void setSingleOpType(ObTableSingleOpType singleOpType) { + this.singleOpType = singleOpType; + } + +} diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOpType.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOpType.java new file mode 100644 index 00000000..8f06047c --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOpType.java @@ -0,0 +1,74 @@ +/*- + * #%L + * com.oceanbase:obkv-table-client + * %% + * Copyright (C) 2021 - 2024 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +package com.alipay.oceanbase.rpc.protocol.payload.impl.execute; + +import com.alipay.oceanbase.rpc.protocol.payload.Constants; + +import java.util.HashMap; +import java.util.Map; + +public enum ObTableSingleOpType { + SINGLE_GET(0), + SINGLE_INSERT(1), + SINGLE_DEL(2), + SINGLE_UPDATE(3), + SINGLE_INSERT_OR_UPDATE(4), + SINGLE_REPLACE(5), + SINGLE_INCREMENT(6), + SINGLE_APPEND(7), + SINGLE_MAX(63), // reserved + SYNC_QUERY(64), + ASYNC_QUERY(65), + QUERY_AND_MUTATE(66), + SINGLE_OP_TYPE_MAX(67); + + private int value; + private static Map map = new HashMap<>(); + + ObTableSingleOpType(int value) { + this.value = value; + } + + static { + for (ObTableSingleOpType type : ObTableSingleOpType.values()) { + map.put(type.value, type); + } + } + + /* + * Value of. + */ + public static ObTableSingleOpType valueOf(byte value) { + return map.get(value); + } + + /* + * Get value. + */ + public byte getValue() { + return (byte) value; + } + + /* + * Set value. + */ + public void setValue(byte value) { + this.value = value; + } + +} diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableTabletOp.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableTabletOp.java new file mode 100644 index 00000000..ccd46a81 --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableTabletOp.java @@ -0,0 +1,166 @@ +/*- + * #%L + * com.oceanbase:obkv-table-client + * %% + * Copyright (C) 2021 - 2024 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +package com.alipay.oceanbase.rpc.protocol.payload.impl.execute; + +import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload; +import com.alipay.oceanbase.rpc.protocol.payload.Constants; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.mutate.ObTableQueryAndMutate; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQuery; +import com.alipay.oceanbase.rpc.util.Serialization; +import io.netty.buffer.ByteBuf; + +import java.util.ArrayList; +import java.util.List; + +import static com.alipay.oceanbase.rpc.util.Serialization.encodeObUniVersionHeader; +import static com.alipay.oceanbase.rpc.util.Serialization.getObUniVersionHeaderLength; + +/* +OB_UNIS_DEF_SERIALIZE(ObTableTabletOp, + table_id_, + tablet_id_, + option_flag_, + single_ops_); + */ +public class ObTableTabletOp extends AbstractPayload { + private List singleOperations = new ArrayList<>(); + private long tableId = Constants.OB_INVALID_ID; // vi64 + private long tabletId = Constants.INVALID_TABLET_ID; // i64 + ObTableTabletOpFlag optionFlag = new ObTableTabletOpFlag(); + + private static final int tabletIdSize = 8; + + /* + * Encode. + */ + @Override + public byte[] encode() { + byte[] bytes = new byte[(int) getPayloadSize()]; + int idx = 0; + + // 0. encode header + idx = encodeHeader(bytes, idx); + + // 1. encode table id + int len = Serialization.getNeedBytes(tableId); + System.arraycopy(Serialization.encodeVi64(tableId), 0, bytes, idx, len); + idx += len; + + // 2. encode tablet id + System.arraycopy(Serialization.encodeI64(tabletId), 0, bytes, idx, 8); + idx += 8; + + // 2. encode option flag + len = Serialization.getNeedBytes(optionFlag.getValue()); + System.arraycopy(Serialization.encodeVi64(optionFlag.getValue()), 0, bytes, idx, len); + idx += len; + + // 4. encode Operation + len = Serialization.getNeedBytes(singleOperations.size()); + System.arraycopy(Serialization.encodeVi64(singleOperations.size()), 0, bytes, idx, len); + idx += len; + for (ObTableSingleOp singleOperation : singleOperations) { + len = (int) singleOperation.getPayloadSize(); + System.arraycopy(singleOperation.encode(), 0, bytes, idx, len); + idx += len; + } + + return bytes; + } + + /* + * Decode. + */ + @Override + public Object decode(ByteBuf buf) { + // 0. decode header + super.decode(buf); + + // 1. decode table id and tablet id + this.tableId = Serialization.decodeVi64(buf); + this.tabletId = Serialization.decodeI64(buf); + + // 2. decode other flags + long flagsValue = Serialization.decodeVi64(buf); + optionFlag.setValue(flagsValue); + + // 3. decode Operation + int len = (int) Serialization.decodeVi64(buf); + for (int i = 0; i < len; i++) { + ObTableSingleOp singleOperation = new ObTableSingleOp(); + singleOperation.decode(buf); + singleOperations.add(singleOperation); + } + + return this; + } + + /* + * Get payload content size. + */ + @Override + public long getPayloadContentSize() { + long payloadContentSize = 0; + payloadContentSize += Serialization.getNeedBytes(singleOperations.size()); + for (ObTableSingleOp operation : singleOperations) { + payloadContentSize += operation.getPayloadSize(); + } + + return payloadContentSize + Serialization.getNeedBytes(tableId) + tabletIdSize + Serialization.getNeedBytes(optionFlag.getValue()); + } + + /* + * Get table operations. + */ + public List getSingleOperations() { + return singleOperations; + } + + /* + * Set table operations. + */ + public void setSingleOperations(List singleOperations) { + setIsSameType(true); + ObTableSingleOpType prevType = null; + for (ObTableSingleOp o : singleOperations) { + if (prevType != null && prevType != o.getSingleOpType()) { + setIsSameType(false); + } else { + prevType = o.getSingleOpType(); + } + } + this.singleOperations = singleOperations; + } + + public void setTableId(long tableId) { + this.tableId = tableId; + } + + public void setTabletId(long tabletId) { + this.tabletId = tabletId; + } + + public long getTabletId() { + return tabletId; + } + + public boolean isSameType() { return optionFlag.getFlagIsSameType(); } + + public void setIsSameType(boolean isSameType) { optionFlag.setFlagIsSameType(isSameType);} + +} diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableTabletOpFlag.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableTabletOpFlag.java new file mode 100644 index 00000000..11db83f1 --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableTabletOpFlag.java @@ -0,0 +1,56 @@ +/*- + * #%L + * com.oceanbase:obkv-table-client + * %% + * Copyright (C) 2021 - 2024 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +package com.alipay.oceanbase.rpc.protocol.payload.impl.execute; + +public class ObTableTabletOpFlag { + private static final int FLAG_IS_SAME_TYPE = 1 << 0; + private static final int FLAG_IS_SAME_PROPERTIES_NAMES = 1 << 1; + private long flags = 0; + + public void setFlagIsSameType(boolean isSameType) { + if (isSameType) { + flags |= FLAG_IS_SAME_TYPE; + } else { + flags &= ~FLAG_IS_SAME_TYPE; + } + } + + public void setFlagIsSamePropertiesNames(boolean isSamePropertiesNames) { + if (isSamePropertiesNames) { + flags |= FLAG_IS_SAME_PROPERTIES_NAMES; + } else { + flags &= ~FLAG_IS_SAME_PROPERTIES_NAMES; + } + } + + public long getValue() { + return flags; + } + + public void setValue(long flags) { + this.flags = flags; + } + + public boolean getFlagIsSameType() { + return (flags & FLAG_IS_SAME_TYPE) != 0; + } + + public boolean getFlagIsSamePropertiesNames() { + return (flags & FLAG_IS_SAME_PROPERTIES_NAMES) != 0; + } +} diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableTabletOpResult.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableTabletOpResult.java new file mode 100644 index 00000000..c601babf --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableTabletOpResult.java @@ -0,0 +1,21 @@ +/*- + * #%L + * com.oceanbase:obkv-table-client + * %% + * Copyright (C) 2021 - 2024 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +package com.alipay.oceanbase.rpc.protocol.payload.impl.execute; + +public class ObTableTabletOpResult extends ObTableBatchOperationResult { +} diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/mutate/ObTableQueryAndMutate.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/mutate/ObTableQueryAndMutate.java index b094ddc1..396f1814 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/mutate/ObTableQueryAndMutate.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/mutate/ObTableQueryAndMutate.java @@ -20,6 +20,7 @@ import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload; import com.alipay.oceanbase.rpc.protocol.payload.Pcodes; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableBatchOperation; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableQueryAndMutateFlag; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQuery; import com.alipay.oceanbase.rpc.util.Serialization; import io.netty.buffer.ByteBuf; @@ -36,9 +37,10 @@ */ public class ObTableQueryAndMutate extends AbstractPayload { - private ObTableQuery tableQuery; - private ObTableBatchOperation mutations; - private boolean returnAffectedEntity = false; + private ObTableQuery tableQuery; + private ObTableBatchOperation mutations; + private boolean returnAffectedEntity = false; + private ObTableQueryAndMutateFlag queryAndMutateFlag = new ObTableQueryAndMutateFlag(); /* * Get pcode. @@ -73,6 +75,11 @@ public byte[] encode() { idx += len; System.arraycopy(Serialization.encodeI8(returnAffectedEntity ? (byte) 1 : (byte) 0), 0, bytes, idx, 1); + idx++; + long flags = queryAndMutateFlag.getValue(); + len = Serialization.getNeedBytes(flags); + System.arraycopy(Serialization.encodeVi64(flags), 0, bytes, idx, len); + idx += len; return bytes; } @@ -101,7 +108,7 @@ public Object decode(ByteBuf buf) { public long getPayloadContentSize() { return tableQuery.getPayloadSize() // + mutations.getPayloadSize() // - + 1;// returnAffectedEntity + + Serialization.getNeedBytes(queryAndMutateFlag.getValue()) + 1;// returnAffectedEntity } /* @@ -145,4 +152,16 @@ public boolean isReturnAffectedEntity() { public void setReturnAffectedEntity(boolean returnAffectedEntity) { this.returnAffectedEntity = returnAffectedEntity; } + + public boolean isReadonly() { + return false; + } + + public void setIsCheckAndExecute(boolean isCheckAndExecute) { + queryAndMutateFlag.setIsCheckAndExecute(isCheckAndExecute); + } + + public void setIsCheckNoExists(boolean isCheckNoExists) { + queryAndMutateFlag.setIsCheckNotExists(isCheckNoExists); + } } diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java index 6ce0509f..3828cd2d 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java @@ -22,14 +22,21 @@ import com.alipay.oceanbase.rpc.bolt.transport.ObPacketFactory; import com.alipay.oceanbase.rpc.bolt.transport.ObTableConnection; import com.alipay.oceanbase.rpc.bolt.transport.ObTableRemoting; +import com.alipay.oceanbase.rpc.checkandmutate.CheckAndInsUp; import com.alipay.oceanbase.rpc.exception.ExceptionUtil; import com.alipay.oceanbase.rpc.exception.ObTableConnectionStatusException; import com.alipay.oceanbase.rpc.exception.ObTableException; import com.alipay.oceanbase.rpc.exception.ObTableServerConnectException; import com.alipay.oceanbase.rpc.batch.QueryByBatch; +import com.alipay.oceanbase.rpc.filter.ObTableFilter; import com.alipay.oceanbase.rpc.mutation.*; +import com.alipay.oceanbase.rpc.mutation.result.MutationResult; import com.alipay.oceanbase.rpc.protocol.payload.ObPayload; +import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.*; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.mutate.ObTableQueryAndMutateResult; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObNewRange; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQuery; import com.alipay.oceanbase.rpc.table.api.TableBatchOps; import com.alipay.oceanbase.rpc.table.api.TableQuery; import com.alipay.remoting.ConnectionEventHandler; @@ -38,6 +45,8 @@ import com.alipay.remoting.exception.RemotingException; import java.net.ConnectException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.atomic.AtomicLong; @@ -338,6 +347,14 @@ public BatchOperation batchOperation(String tableName) { return new BatchOperation(this, tableName); } + /** + * Insert. + */ + public CheckAndInsUp checkAndInsUp(String tableName, ObTableFilter filter, + InsertOrUpdate insUp, boolean checkExists) { + return new CheckAndInsUp(this, tableName, filter, insUp, checkExists); + } + /* * Execute. */ diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java new file mode 100644 index 00000000..ce6209af --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java @@ -0,0 +1,531 @@ +/*- + * #%L + * com.oceanbase:obkv-table-client + * %% + * Copyright (C) 2021 - 2024 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +package com.alipay.oceanbase.rpc.table; + +import com.alipay.oceanbase.rpc.ObTableClient; +import com.alipay.oceanbase.rpc.checkandmutate.CheckAndInsUp; +import com.alipay.oceanbase.rpc.exception.*; +import com.alipay.oceanbase.rpc.location.model.ObServerRoute; +import com.alipay.oceanbase.rpc.location.model.partition.ObPair; +import com.alipay.oceanbase.rpc.mutation.InsertOrUpdate; +import com.alipay.oceanbase.rpc.mutation.Mutation; +import com.alipay.oceanbase.rpc.mutation.result.MutationResult; +import com.alipay.oceanbase.rpc.protocol.payload.ResultCodes; +import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.*; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.mutate.ObTableQueryAndMutate; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObNewRange; +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQuery; +import com.alipay.oceanbase.rpc.threadlocal.ThreadLocalMap; +import com.alipay.oceanbase.rpc.util.MonitorUtil; +import com.alipay.oceanbase.rpc.util.TableClientLoggerFactory; +import org.slf4j.Logger; + +import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.LCD; +import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.RUNTIME; + +public class ObTableClientLSBatchOpsImpl extends AbstractTableBatchOps { + + private static final Logger logger = TableClientLoggerFactory + .getLogger(ObTableClientBatchOpsImpl.class); + private final ObTableClient obTableClient; + private ExecutorService executorService; + private boolean returningAffectedEntity = false; + private List batchOperation; + + /* + * Ob table client batch ops impl. + */ + public ObTableClientLSBatchOpsImpl(String tableName, ObTableClient obTableClient) { + this.tableName = tableName; + this.obTableClient = obTableClient; + this.executorService = obTableClient.getRuntimeBatchExecutor(); + this.batchOperation = new ArrayList<>(); + } + + /* + * Get ob table batch operation. + */ + @Override + public ObTableBatchOperation getObTableBatchOperation() { + return null; + } + + public List getSingleOperations() { + return batchOperation; + } + + /* + * Get. + */ + @Override + public void get(Object[] rowkeys, String[] columns) { + throw new FeatureNotSupportedException(); + } + + /* + * Update. + */ + @Override + public void update(Object[] rowkeys, String[] columns, Object[] values) { + throw new FeatureNotSupportedException(); + } + + /* + * Delete. + */ + @Override + public void delete(Object[] rowkeys) { + throw new FeatureNotSupportedException(); + } + + /* + * Insert. + */ + @Override + public void insert(Object[] rowkeys, String[] columns, Object[] values) { + throw new FeatureNotSupportedException(); + } + + /* + * Replace. + */ + @Override + public void replace(Object[] rowkeys, String[] columns, Object[] values) { + throw new FeatureNotSupportedException(); + } + + /* + * Insert or update. + */ + @Override + public void insertOrUpdate(Object[] rowkeys, String[] columns, Object[] values) { + throw new FeatureNotSupportedException(); + } + + /* + * Increment. + */ + @Override + public void increment(Object[] rowkeys, String[] columns, Object[] values, boolean withResult) { + throw new FeatureNotSupportedException(); + } + + /* + * Append. + */ + @Override + public void append(Object[] rowkeys, String[] columns, Object[] values, boolean withResult) { + throw new FeatureNotSupportedException(); + } + + private void addOperation(ObTableSingleOp singleOp) { + batchOperation.add(singleOp); + } + + public void addOperation(CheckAndInsUp checkAndInsUp) { + InsertOrUpdate insUp = checkAndInsUp.getInsUp(); + ObTableSingleOp singleOp = new ObTableSingleOp(); + + ObTableOperation operation = ObTableOperation.getInstance( + ObTableOperationType.INSERT_OR_UPDATE, insUp.getRowKey(), insUp.getColumns(), + insUp.getValues()); + ObTableBatchOperation operations = new ObTableBatchOperation(); + operations.addTableOperation(operation); + singleOp.setMutations(operations); + + ObTableQuery tableQuery = new ObTableQuery(); + ObNewRange range = new ObNewRange(); + range.setStartKey(ObRowKey.getInstance(insUp.getRowKey())); + range.setEndKey(ObRowKey.getInstance(insUp.getRowKey())); + tableQuery.addKeyRange(range); + tableQuery.setFilterString(checkAndInsUp.getFilter().toString()); + singleOp.setTableQuery(tableQuery); + singleOp.setIsCheckAndExecute(true); + singleOp.setIsCheckNoExists(!checkAndInsUp.isCheckExists()); + singleOp.setSingleOpType(ObTableSingleOpType.QUERY_AND_MUTATE); + addOperation(singleOp); + } + + /* + * Execute. + */ + public List execute() throws Exception { + List results = new ArrayList(batchOperation.size()); + for (ObTableOperationResult result : executeInternal().getResults()) { + int errCode = result.getHeader().getErrno(); + if (errCode == ResultCodes.OB_SUCCESS.errorCode) { + results.add(result.getAffectedRows()); + } else { + results.add(ExceptionUtil.convertToObTableException(result.getExecuteHost(), + result.getExecutePort(), result.getSequence(), result.getUniqueId(), errCode, + result.getHeader().getErrMsg())); + } + } + return results; + } + + /* + * Execute with result + */ + public List executeWithResult() throws Exception { + List results = new ArrayList(batchOperation.size()); + for (ObTableOperationResult result : executeInternal().getResults()) { + int errCode = result.getHeader().getErrno(); + if (errCode == ResultCodes.OB_SUCCESS.errorCode) { + results.add(new MutationResult(result)); + } else { + results.add(ExceptionUtil.convertToObTableException(result.getExecuteHost(), + result.getExecutePort(), result.getSequence(), result.getUniqueId(), errCode, + result.getHeader().getErrMsg())); + } + } + return results; + } + + public Map>>> partitionPrepare() + throws Exception { + // TODO: currently, we only support tablet level operation aggregation + List operations = getSingleOperations(); + // map: > + Map>>> partitionOperationsMap = + new HashMap(); + + // In ODP mode, client send the request to ODP directly without route + if (obTableClient.isOdpMode()) { + ObPair>> obTableOperations = + new ObPair(new ObTableParam(obTableClient.getOdpTable()), + new ArrayList>()); + for (int i = 0; i < operations.size(); i++) { + ObTableSingleOp operation = operations.get(i); + obTableOperations.getRight().add(new ObPair(i, operation)); + } + partitionOperationsMap.put(0L, obTableOperations); + return partitionOperationsMap; + } + + for (int i = 0; i < operations.size(); i++) { + ObTableSingleOp operation = operations.get(i); + ObRowKey rowKeyObject = operation.getTableQuery().getKeyRanges().get(0).getStartKey(); + int rowKeySize = rowKeyObject.getObjs().size(); + Object[] rowKey = new Object[rowKeySize]; + for (int j = 0; j < rowKeySize; j++) { + rowKey[j] = rowKeyObject.getObj(j).getValue(); + } + ObPair tableObPair= obTableClient.getTable(tableName, rowKey, + false, false, obTableClient.getRoute(false)); + ObPair>> obTableOperations = + partitionOperationsMap.get(tableObPair.getLeft()); + if (obTableOperations == null) { + obTableOperations = new ObPair<>(tableObPair.getRight(), new ArrayList<>()); + partitionOperationsMap.put(tableObPair.getLeft(), obTableOperations); + } + obTableOperations.getRight().add(new ObPair(i, operation)); + } + + if (atomicOperation) { + if (partitionOperationsMap.size() > 1) { + throw new ObTablePartitionConsistentException( + "require atomic operation but found across partition may cause consistent problem "); + } + } + return partitionOperationsMap; + } + + /* + * Partition execute. + */ + public void partitionExecute(ObTableOperationResult[] results, + Map.Entry>>> partitionOperation) + throws Exception { + ObTableParam tableParam = partitionOperation.getValue().getLeft(); + long tableId = tableParam.getTableId(); + long partId = tableParam.getPartitionId(); + ObTable subObTable = tableParam.getObTable(); + List> subOperationWithIndexList = partitionOperation + .getValue().getRight(); + + ObTableLSOpRequest subRequest = new ObTableLSOpRequest(); + List subOperations = new ArrayList<>(); + for (ObPair operationWithIndex : subOperationWithIndexList) { + subOperations.add(operationWithIndex.getRight()); + } + ObTableTabletOp tabletOp = new ObTableTabletOp(); + tabletOp.setSingleOperations(subOperations); + tabletOp.setTableId(tableId); + tabletOp.setTabletId(partId); + subRequest.addTabletOperation(tabletOp); + subRequest.setEntityType(entityType); + subRequest.setTimeout(subObTable.getObTableOperationTimeout()); + + ObTableLSOpResult subLSOpResult; + boolean needRefreshTableEntry = false; + int tryTimes = 0; + long startExecute = System.currentTimeMillis(); + Set failedServerList = null; + ObServerRoute route = null; + + while (true) { + obTableClient.checkStatus(); + long currentExecute = System.currentTimeMillis(); + long costMillis = currentExecute - startExecute; + if (costMillis > obTableClient.getRuntimeMaxWait()) { + logger.error("table name: {} partition id:{} it has tried " + tryTimes + + " times and it has waited " + costMillis + " ms" + + " which exceeds runtime max wait timeout " + + obTableClient.getRuntimeMaxWait() + " ms", tableName, partId); + throw new ObTableTimeoutExcetion("it has tried " + tryTimes + + " times and it has waited " + costMillis + + "ms which exceeds runtime max wait timeout " + + obTableClient.getRuntimeMaxWait() + "ms"); + } + tryTimes++; + try { + if (obTableClient.isOdpMode()) { + subObTable = obTableClient.getOdpTable(); + } else { + if (tryTimes > 1) { + if (route == null) { + route = obTableClient.getRoute(false); + } + if (failedServerList != null) { + route.setBlackList(failedServerList); + } + subObTable = obTableClient.getTable(tableName, partId, needRefreshTableEntry, + obTableClient.isTableEntryRefreshIntervalWait(), route). + getRight().getObTable(); + } + } + subLSOpResult = (ObTableLSOpResult) subObTable.execute(subRequest); + obTableClient.resetExecuteContinuousFailureCount(tableName); + break; + } catch (Exception ex) { + if (obTableClient.isOdpMode()) { + if ((tryTimes - 1) < obTableClient.getRuntimeRetryTimes()) { + logger.warn("batch ops execute while meet Exception, tablename:{}, errorCode: {} , errorMsg: {}, try times {}", + tableName, ((ObTableException) ex).getErrorCode(), ex.getMessage(), tryTimes); + } else { + throw ex; + } + } else if (ex instanceof ObTableReplicaNotReadableException) { + if ((tryTimes - 1) < obTableClient.getRuntimeRetryTimes()) { + logger.warn("tablename:{} partition id:{} retry when replica not readable: {}", + tableName, partId, ex.getMessage()); + if (failedServerList == null) { + failedServerList = new HashSet(); + } + failedServerList.add(subObTable.getIp()); + } else { + logger.warn("exhaust retry when replica not readable: {}", ex.getMessage()); + throw ex; + } + } else if (ex instanceof ObTableException + && ((ObTableException) ex).isNeedRefreshTableEntry()) { + needRefreshTableEntry = true; + logger.warn("tablename:{} partition id:{} batch ops refresh table while meet ObTableMasterChangeException, errorCode: {}", + tableName, partId, ((ObTableException) ex).getErrorCode(), ex); + if (obTableClient.isRetryOnChangeMasterTimes() + && (tryTimes - 1) < obTableClient.getRuntimeRetryTimes()) { + logger.warn("tablename:{} partition id:{} batch ops retry while meet ObTableMasterChangeException, errorCode: {} , retry times {}", + tableName, partId, ((ObTableException) ex).getErrorCode(), + tryTimes, ex); + } else { + obTableClient.calculateContinuousFailure(tableName, ex.getMessage()); + throw ex; + } + } else { + obTableClient.calculateContinuousFailure(tableName, ex.getMessage()); + throw ex; + } + } + Thread.sleep(obTableClient.getRuntimeRetryInterval()); + } + + long endExecute = System.currentTimeMillis(); + + if (subLSOpResult == null) { + RUNTIME.error("tablename:{} partition id:{} check batch operation result error: client get unexpected NULL result", + tableName, partId); + throw new ObTableUnexpectedException("check batch operation result error: client get unexpected NULL result"); + } + + List tabletOpResults = subLSOpResult.getResults(); + if (tabletOpResults.size() != 1) { + throw new ObTableUnexpectedException("check batch result error: partition " + + partId + " expect tablet op result size 1" + + " actual result size is " + + tabletOpResults.size()); + } + + List subObTableOperationResults = tabletOpResults.get(0).getResults(); + + if (subObTableOperationResults.size() < subOperations.size()) { + // only one result when it across failed + // only one result when hkv puts + if (subObTableOperationResults.size() == 1 && entityType == ObTableEntityType.HKV) { + ObTableOperationResult subObTableOperationResult = subObTableOperationResults.get(0); + subObTableOperationResult.setExecuteHost(subObTable.getIp()); + subObTableOperationResult.setExecutePort(subObTable.getPort()); + for (ObPair SubOperationWithIndexList : subOperationWithIndexList) { + results[SubOperationWithIndexList.getLeft()] = subObTableOperationResult; + } + } else { + throw new IllegalArgumentException( + "check batch operation result size error: operation size [" + + subOperations.size() + "] result size [" + + subObTableOperationResults.size() + "]"); + } + } else { + if (subOperationWithIndexList.size() != subObTableOperationResults.size()) { + throw new ObTableUnexpectedException("check batch result error: partition " + + partId + " expect result size " + + subOperationWithIndexList.size() + + " actual result size " + + subObTableOperationResults.size()); + } + for (int i = 0; i < subOperationWithIndexList.size(); i++) { + ObTableOperationResult subObTableOperationResult = subObTableOperationResults.get(i); + subObTableOperationResult.setExecuteHost(subObTable.getIp()); + subObTableOperationResult.setExecutePort(subObTable.getPort()); + results[subOperationWithIndexList.get(i).getLeft()] = subObTableOperationResult; + } + } + String endpoint = subObTable.getIp() + ":" + subObTable.getPort(); + MonitorUtil.info(subRequest, subObTable.getDatabase(), tableName, + "BATCH-partitionExecute-", endpoint, tabletOp, + subObTableOperationResults.size(), endExecute - startExecute, + obTableClient.getslowQueryMonitorThreshold()); + } + + /* + * Execute internal. + */ + public ObTableBatchOperationResult executeInternal() throws Exception { + + long start = System.currentTimeMillis(); + final ObTableOperationResult[] obTableOperationResults = new ObTableOperationResult[batchOperation + .size()]; + Map>>> partitions = partitionPrepare(); + long getTableTime = System.currentTimeMillis(); + final Map context = ThreadLocalMap.getContextMap(); + if (executorService != null && !executorService.isShutdown() && partitions.size() > 1) { + // execute sub-batch operation in parallel + final ConcurrentTaskExecutor executor = new ConcurrentTaskExecutor(executorService, + partitions.size()); + for (final Map.Entry>>> entry : partitions + .entrySet()) { + executor.execute(new ConcurrentTask() { + /* + * Do task. + */ + @Override + public void doTask() { + try { + ThreadLocalMap.transmitContextMap(context); + partitionExecute(obTableOperationResults, entry); + } catch (Exception e) { + logger.error(LCD.convert("01-00026"), e); + executor.collectExceptions(e); + } finally { + ThreadLocalMap.reset(); + } + } + }); + } + long timeoutTs = obTableClient.getRuntimeBatchMaxWait() * 1000L * 1000L + + System.nanoTime(); + try { + while (timeoutTs > System.nanoTime()) { + try { + executor.waitComplete(1, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new ObTableUnexpectedException( + "Batch Concurrent Execute interrupted", e); + } + + if (executor.getThrowableList().size() > 0) { + throw new ObTableUnexpectedException("Batch Concurrent Execute Error", + executor.getThrowableList().get(0)); + } + + if (executor.isComplete()) { + break; + } + } + } finally { + executor.stop(); + } + + if (executor.getThrowableList().size() > 0) { + throw new ObTableUnexpectedException("Batch Concurrent Execute Error", executor + .getThrowableList().get(0)); + } + + if (!executor.isComplete()) { + throw new ObTableUnexpectedException( + "Batch Concurrent Execute Error, runtimeBatchMaxWait: " + + obTableClient.getRuntimeBatchMaxWait() + "ms"); + } + + } else { + // Execute sub-batch operation one by one + for (final Map.Entry>>> entry : partitions + .entrySet()) { + partitionExecute(obTableOperationResults, entry); + } + } + + ObTableBatchOperationResult batchOperationResult = new ObTableBatchOperationResult(); + for (ObTableOperationResult obTableOperationResult : obTableOperationResults) { + batchOperationResult.addResult(obTableOperationResult); + } + + MonitorUtil.info(batchOperationResult, obTableClient.getDatabase(), tableName, "BATCH", "", + obTableOperationResults.length, getTableTime - start, System.currentTimeMillis() + - getTableTime, + obTableClient.getslowQueryMonitorThreshold()); + + return batchOperationResult; + } + + /* + * clear batch operations1 + */ + public void clear() { + batchOperation = new ArrayList<>(); + } + + /* + * Set executor service. + */ + public void setExecutorService(ExecutorService executorService) { + this.executorService = executorService; + } + + public boolean isReturningAffectedEntity() { + return returningAffectedEntity; + } + + public void setReturningAffectedEntity(boolean returningAffectedEntity) { + this.returningAffectedEntity = returningAffectedEntity; + } +} diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryAsyncImpl.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryAsyncImpl.java index 2b55e6a0..8c9119c4 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryAsyncImpl.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryAsyncImpl.java @@ -153,7 +153,8 @@ public Map> getPartitions() throws Exception { String indexName = tableQuery.getIndexName(); String indexTableName = tableName; if (!this.obTableClient.isOdpMode()) { - indexTableName = obTableClient.getIndexTableName(tableName, indexName, tableQuery.getScanRangeColumns()); + indexTableName = obTableClient.getIndexTableName(tableName, indexName, + tableQuery.getScanRangeColumns()); } this.partitionObTables = new HashMap>(); diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java index d4683ea9..c08501d0 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java @@ -143,7 +143,8 @@ public ObTableClientQueryStreamResult executeInternal() throws Exception { String indexName = tableQuery.getIndexName(); String indexTableName = tableName; if (!this.obTableClient.isOdpMode()) { - indexTableName = obTableClient.getIndexTableName(tableName, indexName, tableQuery.getScanRangeColumns()); + indexTableName = obTableClient.getIndexTableName(tableName, indexName, + tableQuery.getScanRangeColumns()); } for (ObNewRange rang : tableQuery.getKeyRanges()) { @@ -162,8 +163,8 @@ public ObTableClientQueryStreamResult executeInternal() throws Exception { } ObBorderFlag borderFlag = rang.getBorderFlag(); List> pairs = obTableClient.getTables(indexTableName, - start, borderFlag.isInclusiveStart(), end, borderFlag.isInclusiveEnd(), false, - false, obTableClient.getReadRoute()); + start, borderFlag.isInclusiveStart(), end, borderFlag.isInclusiveEnd(), false, + false, obTableClient.getReadRoute()); for (ObPair pair : pairs) { partitionObTables.put(pair.getLeft(), pair); } diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/api/Table.java b/src/main/java/com/alipay/oceanbase/rpc/table/api/Table.java index 94a4e5f1..8feb3057 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/api/Table.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/api/Table.java @@ -17,6 +17,8 @@ package com.alipay.oceanbase.rpc.table.api; +import com.alipay.oceanbase.rpc.checkandmutate.CheckAndInsUp; +import com.alipay.oceanbase.rpc.filter.ObTableFilter; import com.alipay.oceanbase.rpc.mutation.*; import java.util.Map; @@ -97,4 +99,6 @@ Map append(String tableName, Object[] rowkeys, String[] columns, void addProperty(String property, String value); + CheckAndInsUp checkAndInsUp(String tableName, ObTableFilter filter, InsertOrUpdate insUp, + boolean checkExists); } diff --git a/src/main/java/com/alipay/oceanbase/rpc/util/MonitorUtil.java b/src/main/java/com/alipay/oceanbase/rpc/util/MonitorUtil.java index 8185455d..d1f26370 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/util/MonitorUtil.java +++ b/src/main/java/com/alipay/oceanbase/rpc/util/MonitorUtil.java @@ -37,6 +37,9 @@ public class MonitorUtil { private static String buildParamsString(List rowKeys) { + if (rowKeys == null) { + return ""; + } StringBuilder stringBuilder = new StringBuilder(); for (Object value : rowKeys) { if (value instanceof byte[]) { @@ -117,8 +120,8 @@ private static String logMessage(String traceId, String database, String tableNa endpoint = endpoint.replaceAll(",", "#"); } // if rowkeys is empty point, then append "rowKeys:null" into log message - String argsValue = rowKeys == null ? "rowKeys:null" : buildParamsString(Arrays - .asList(rowKeys)); + String argsValue = (rowKeys == null || rowKeys.length == 0) ? "rowKeys:null" + : buildParamsString(Arrays.asList(rowKeys)); ResultCodes resultCode = ResultCodes.valueOf(result.getHeader().getErrno()); String res = ""; @@ -164,8 +167,8 @@ private static String logMessage(String traceId, String database, String tableNa endpoint = endpoint.replaceAll(",", "#"); } // if rowkeys is empty point, then append "rowKeys:null" into log message - String argsValue = rowKeys == null ? "rowKeys:null" : buildParamsString(Arrays - .asList(rowKeys)); + String argsValue = (rowKeys == null || rowKeys.isEmpty()) ? "rowKeys:null" + : buildParamsString(rowKeys); StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append(traceId).append(" - ").append(database).append(",").append(tableName) @@ -283,4 +286,26 @@ public static void info(final ObPayload payload, String database, String tableNa params, result, routeTableTime, executeTime)); } } + + /** + * for tablet op + */ + private static void logTabletOpMessage(final ObPayload payload, String database, + String tableName, String methodName, String endpoint, + ObTableTabletOp tabletOp, int resultSize, + long executeTime, long slowQueryMonitorThreshold) { + if (executeTime < slowQueryMonitorThreshold) { + return; + } + String traceId = formatTraceMessage(payload); + MONITOR.info(logMessage(traceId, database, tableName, + methodName + "-" + tabletOp.getTabletId(), endpoint, null, resultSize, executeTime)); + } + + public static void info(final ObPayload payload, String database, String tableName, + String methodName, String endpoint, ObTableTabletOp tabletOp, + int resultSize, long executeTime, long slowQueryMonitorThreshold) { + logTabletOpMessage(payload, database, tableName, methodName, endpoint, tabletOp, + resultSize, executeTime, slowQueryMonitorThreshold); + } } diff --git a/src/test/java/com/alipay/oceanbase/rpc/ObTableCheckAndInsUpTest.java b/src/test/java/com/alipay/oceanbase/rpc/ObTableCheckAndInsUpTest.java new file mode 100644 index 00000000..ff0f0643 --- /dev/null +++ b/src/test/java/com/alipay/oceanbase/rpc/ObTableCheckAndInsUpTest.java @@ -0,0 +1,300 @@ +/*- + * #%L + * com.oceanbase:obkv-table-client + * %% + * Copyright (C) 2021 - 2024 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +package com.alipay.oceanbase.rpc; + +import com.alipay.oceanbase.rpc.checkandmutate.CheckAndInsUp; +import com.alipay.oceanbase.rpc.filter.ObCompareOp; +import com.alipay.oceanbase.rpc.filter.ObTableFilter; +import com.alipay.oceanbase.rpc.mutation.BatchOperation; +import com.alipay.oceanbase.rpc.mutation.Delete; +import com.alipay.oceanbase.rpc.mutation.InsertOrUpdate; +import com.alipay.oceanbase.rpc.mutation.result.BatchOperationResult; +import com.alipay.oceanbase.rpc.mutation.result.MutationResult; +import com.alipay.oceanbase.rpc.util.ObTableClientTestUtil; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Map; + +import static com.alipay.oceanbase.rpc.filter.ObTableFilterFactory.compareVal; +import static com.alipay.oceanbase.rpc.mutation.MutationFactory.colVal; +import static com.alipay.oceanbase.rpc.mutation.MutationFactory.row; + +public class ObTableCheckAndInsUpTest { + public ObTableClient client; + private static long MINI_SUPP_VERSION = ObGlobal.calcVersion(4, (short) 2, (byte) 1, + (byte) 2); + private static final String TABLE_NAME = "test_mutation"; + + @Before + public void setup() throws Exception { + final ObTableClient obTableClient = ObTableClientTestUtil.newTestClient(); + obTableClient.init(); + this.client = obTableClient; + } + + private boolean isVersionSupported() { + if (ObTableClientTestUtil.isOBVersionGreaterEqualThan(MINI_SUPP_VERSION)) { + return true; + } + return false; + } + + /* + CREATE TABLE `test_mutation` ( + `c1` bigint NOT NULL, + `c2` varchar(20) NOT NULL, + `c3` varbinary(1024) DEFAULT NULL, + `c4` bigint DEFAULT NULL, + PRIMARY KEY(`c1`, `c2`)) partition by range columns (`c1`) ( + PARTITION p0 VALUES LESS THAN (300), + PARTITION p1 VALUES LESS THAN (1000), + PARTITION p2 VALUES LESS THAN MAXVALUE); + + */ + @Test + public void testBatchWithDiffRows() throws Exception { + if (!isVersionSupported()) { + System.out.println("current version is not supported, current version: " + + ObGlobal.OB_VERSION); + return; + } + try { + // 0. prepare data, insert(1, 'c2_v0', 'c3_v0', 100),(2, 'c2_v0', 'c3_v0', 100),(3, 'c2_v0', 'c3_v0', 100),(4, 'c2_v0', 'c3_v0', 100) + for (long i = 1L; i <= 4L; i++) { + InsertOrUpdate insertOrUpdate = client.insertOrUpdate(TABLE_NAME); + insertOrUpdate.setRowKey(row(colVal("c1", i), colVal("c2", "c2_v0"))); + insertOrUpdate.addMutateRow(row(colVal("c3", "c3_v0"), colVal("c4", 100L))); + MutationResult res = insertOrUpdate.execute(); + Assert.assertEquals(1, res.getAffectedRows()); + } + + BatchOperation batchOperation = client.batchOperation(TABLE_NAME); + // 1. check exists match: insup(1, 'c2_v0', 'c3_v0', 200) if exists c3 >= 'c3_v0'; + InsertOrUpdate insertOrUpdate1 = new InsertOrUpdate(); + insertOrUpdate1.setRowKey(row(colVal("c1", 1L), colVal("c2", "c2_v0"))); + insertOrUpdate1.addMutateRow(row(colVal("c3", "c3_v0"), colVal("c4", 200L))); + ObTableFilter filter = compareVal(ObCompareOp.GE, "c3", "c3_v0"); + CheckAndInsUp checkAndInsUp1 = new CheckAndInsUp(filter, insertOrUpdate1, true); + + // 2. check exists not match: insup(2, 'c2_v0', 'c3_v0', 200) if exists c3 > 'c3_v0'; + InsertOrUpdate insertOrUpdate2 = new InsertOrUpdate(); + insertOrUpdate2.setRowKey(row(colVal("c1", 2L), colVal("c2", "c2_v0"))); + insertOrUpdate2.addMutateRow(row(colVal("c3", "c3_v0"), colVal("c4", 200L))); + filter = compareVal(ObCompareOp.GT, "c3", "c3_v0"); + CheckAndInsUp checkAndInsUp2 = new CheckAndInsUp(filter, insertOrUpdate2, true); + + // 3. check no exists match: insup(3, 'c2_v0', 'c3_v0', 200) if not exists c4 > 200 + InsertOrUpdate insertOrUpdate3 = new InsertOrUpdate(); + insertOrUpdate3.setRowKey(row(colVal("c1", 3L), colVal("c2", "c2_v0"))); + insertOrUpdate3.addMutateRow(row(colVal("c3", "c3_v0"), colVal("c4", 200L))); + filter = compareVal(ObCompareOp.GE, "c4", 200L); + CheckAndInsUp checkAndInsUp3 = new CheckAndInsUp(filter, insertOrUpdate3, false); + + // 4. check no exists not match: insup(4, 'c2_v0', 'c3_v0', 200) if exists c4 is null + InsertOrUpdate insertOrUpdate4 = new InsertOrUpdate(); + insertOrUpdate4.setRowKey(row(colVal("c1", 4L), colVal("c2", "c2_v0"))); + insertOrUpdate4.addMutateRow(row(colVal("c3", "c3_v0"), colVal("c4", 200L))); + filter = compareVal(ObCompareOp.IS_NOT, "c4", null); + CheckAndInsUp checkAndInsUp4 = new CheckAndInsUp(filter, insertOrUpdate4, false); + + // 5. verify result + batchOperation.addOperation(checkAndInsUp1, checkAndInsUp2, checkAndInsUp3, + checkAndInsUp4); + BatchOperationResult batchOperationResult = batchOperation.execute(); + Assert.assertEquals(4, batchOperationResult.size()); + Assert.assertEquals(1, batchOperationResult.get(0).getAffectedRows()); + Assert.assertEquals(0, batchOperationResult.get(1).getAffectedRows()); + Assert.assertEquals(1, batchOperationResult.get(2).getAffectedRows()); + Assert.assertEquals(0, batchOperationResult.get(3).getAffectedRows()); + + Map res = client.get(TABLE_NAME, new Object[] { 1L, "c2_v0" }, + new String[] { "c4" }); + Assert.assertEquals(200L, res.get("c4")); + res = client.get(TABLE_NAME, new Object[] { 2L, "c2_v0" }, new String[] { "c4" }); + Assert.assertEquals(100L, res.get("c4")); + res = client.get(TABLE_NAME, new Object[] { 3L, "c2_v0" }, new String[] { "c4" }); + Assert.assertEquals(200L, res.get("c4")); + res = client.get(TABLE_NAME, new Object[] { 4L, "c2_v0" }, new String[] { "c4" }); + Assert.assertEquals(100L, res.get("c4")); + } catch (Exception e) { + e.printStackTrace(); + Assert.assertTrue(false); + } finally { + for (long i = 1L; i <= 4L; i++) { + Delete delete = client.delete(TABLE_NAME); + delete.setRowKey(row(colVal("c1", i), colVal("c2", "c2_v0"))); + MutationResult res = delete.execute(); + Assert.assertEquals(1, res.getAffectedRows()); + } + } + } + + @Test + public void testBatchWithSameRows() throws Exception { + if (!isVersionSupported()) { + System.out.println("current version is not supported, current version: " + + ObGlobal.OB_VERSION); + return; + } + + try { + // 0. prepare data, insert(5, 'c2_v0', 'c3_v0', 100) + InsertOrUpdate insertOrUpdate = client.insertOrUpdate(TABLE_NAME); + insertOrUpdate.setRowKey(row(colVal("c1", 5L), colVal("c2", "c2_v0"))); + insertOrUpdate.addMutateRow(row(colVal("c3", "c3_v0".getBytes()), colVal("c4", 100L))); + MutationResult result = insertOrUpdate.execute(); + Assert.assertEquals(1, result.getAffectedRows()); + + // 1. check exists match: insup(5, 'c2_v0', c3_v0, 200) if exists c3 is not null; + InsertOrUpdate insertOrUpdate1 = new InsertOrUpdate(); + insertOrUpdate1.setRowKey(row(colVal("c1", 5L), colVal("c2", "c2_v0"))); + insertOrUpdate1.addMutateRow(row(colVal("c4", 200L))); + ObTableFilter filter = compareVal(ObCompareOp.IS_NOT, "c3", null); + CheckAndInsUp checkAndInsUp1 = new CheckAndInsUp(filter, insertOrUpdate1, true); + + // 2. check exists not match: insup(5, 'c2_v0', 'c3_v1', 200) if exists c4 > 200 ; + InsertOrUpdate insertOrUpdate2 = new InsertOrUpdate(); + insertOrUpdate2.setRowKey(row(colVal("c1", 5L), colVal("c2", "c2_v0"))); + insertOrUpdate2.addMutateRow(row(colVal("c3", "c3_v1".getBytes()), colVal("c4", 200L))); + filter = compareVal(ObCompareOp.GT, "c4", 200L); + CheckAndInsUp checkAndInsUp2 = new CheckAndInsUp(filter, insertOrUpdate2, true); + + // 3. check no exists match: insup(5, 'c2_v0', 'c3_v1', 300) if not exists c4 > 300 ; + InsertOrUpdate insertOrUpdate3 = new InsertOrUpdate(); + insertOrUpdate3.setRowKey(row(colVal("c1", 5L), colVal("c2", "c2_v0"))); + insertOrUpdate3.addMutateRow(row(colVal("c3", "c3_v1".getBytes()), colVal("c4", 300L))); + filter = compareVal(ObCompareOp.GT, "c4", 300L); + CheckAndInsUp checkAndInsUp3 = new CheckAndInsUp(filter, insertOrUpdate3, false); + + // 3. check no exists not match: insup(5, 'c2_v0', 'c3_v1', 400) if not exists c4 >= 300 ; + InsertOrUpdate insertOrUpdate4 = new InsertOrUpdate(); + insertOrUpdate4.setRowKey(row(colVal("c1", 5L), colVal("c2", "c2_v0"))); + insertOrUpdate4.addMutateRow(row(colVal("c3", "c3_v1".getBytes()), colVal("c4", 400L))); + filter = compareVal(ObCompareOp.GE, "c4", 300L); + CheckAndInsUp checkAndInsUp4 = new CheckAndInsUp(filter, insertOrUpdate4, false); + + BatchOperation batchOperation = client.batchOperation(TABLE_NAME); + batchOperation.addOperation(checkAndInsUp1, checkAndInsUp2, checkAndInsUp3, + checkAndInsUp4); + BatchOperationResult batchOperationResult = batchOperation.execute(); + Assert.assertEquals(4, batchOperationResult.size()); + Assert.assertEquals(1, batchOperationResult.get(0).getAffectedRows()); + Assert.assertEquals(0, batchOperationResult.get(1).getAffectedRows()); + Assert.assertEquals(1, batchOperationResult.get(2).getAffectedRows()); + Assert.assertEquals(0, batchOperationResult.get(3).getAffectedRows()); + + Map res = client.get(TABLE_NAME, new Object[] { 5L, "c2_v0" }, null); + Assert.assertEquals("c3_v1", new String((byte[]) res.get("c3"), "UTF-8")); + Assert.assertEquals(300L, res.get("c4")); + } catch (Exception e) { + e.printStackTrace(); + Assert.assertTrue(false); + } finally { + Delete delete = client.delete(TABLE_NAME); + delete.setRowKey(row(colVal("c1", 5L), colVal("c2", "c2_v0"))); + MutationResult res = delete.execute(); + Assert.assertEquals(1, res.getAffectedRows()); + } + } + + @Test + public void testSingleCheckInsUp() throws Exception { + if (!isVersionSupported()) { + System.out.println("current version is not supported, current version: " + + ObGlobal.OB_VERSION); + return; + } + + try { + // 0. prepare data, insert(1, 'c2_v0', 'c3_v0', 100),(2, 'c2_v0', 'c3_v0', 100),(3, 'c2_v0', 'c3_v0', 100),(4, 'c2_v0', 'c3_v0', 100) + for (long i = 1L; i <= 4L; i++) { + InsertOrUpdate insertOrUpdate = client.insertOrUpdate(TABLE_NAME); + insertOrUpdate.setRowKey(row(colVal("c1", i), colVal("c2", "c2_v0"))); + insertOrUpdate.addMutateRow(row(colVal("c3", "c3_v0"), colVal("c4", 100L))); + MutationResult res = insertOrUpdate.execute(); + Assert.assertEquals(1, res.getAffectedRows()); + } + + // 1. check exists match: insup(1, 'c2_v0', 'c3_v0', 200) if exists c3 >= 'c3_v0'; + InsertOrUpdate insertOrUpdate1 = new InsertOrUpdate(); + insertOrUpdate1.setRowKey(row(colVal("c1", 1L), colVal("c2", "c2_v0"))); + insertOrUpdate1.addMutateRow(row(colVal("c3", "c3_v0"), colVal("c4", 200L))); + ObTableFilter filter = compareVal(ObCompareOp.GE, "c3", "c3_v0"); + CheckAndInsUp checkAndInsUp1 = client.checkAndInsUp(TABLE_NAME, filter, + insertOrUpdate1, true); + MutationResult result1 = checkAndInsUp1.execute(); + Assert.assertEquals(1, result1.getAffectedRows()); + Map res = client.get(TABLE_NAME, new Object[] { 1L, "c2_v0" }, + new String[] { "c3", "c4" }); + Assert.assertEquals("c3_v0", new String((byte[]) res.get("c3"), "UTF-8")); + Assert.assertEquals(200L, res.get("c4")); + + // 2. check exists not match: insup(2, 'c2_v0', 'c3_v0', 200) if exists c3 > 'c3_v0'; + InsertOrUpdate insertOrUpdate2 = new InsertOrUpdate(); + insertOrUpdate2.setRowKey(row(colVal("c1", 2L), colVal("c2", "c2_v0"))); + insertOrUpdate2.addMutateRow(row(colVal("c3", "c3_v0"), colVal("c4", 200L))); + filter = compareVal(ObCompareOp.GT, "c3", "c3_v0"); + CheckAndInsUp checkAndInsUp2 = client.checkAndInsUp(TABLE_NAME, filter, + insertOrUpdate2, true); + MutationResult result2 = checkAndInsUp2.execute(); + Assert.assertEquals(0, result2.getAffectedRows()); + res = client.get(TABLE_NAME, new Object[] { 2L, "c2_v0" }, new String[] { "c3", "c4" }); + Assert.assertEquals("c3_v0", new String((byte[]) res.get("c3"), "UTF-8")); + Assert.assertEquals(100L, res.get("c4")); + + // 3. check no exists match: insup(3, 'c2_v0', 'c3_v0', 200) if not exists c4 > 200 + InsertOrUpdate insertOrUpdate3 = new InsertOrUpdate(); + insertOrUpdate3.setRowKey(row(colVal("c1", 3L), colVal("c2", "c2_v0"))); + insertOrUpdate3.addMutateRow(row(colVal("c3", "c3_v1"), colVal("c4", 200L))); + filter = compareVal(ObCompareOp.GE, "c4", 200L); + CheckAndInsUp checkAndInsUp3 = client.checkAndInsUp(TABLE_NAME, filter, + insertOrUpdate3, false); + MutationResult result3 = checkAndInsUp3.execute(); + Assert.assertEquals(1, result3.getAffectedRows()); + res = client.get(TABLE_NAME, new Object[] { 3L, "c2_v0" }, new String[] { "c3", "c4" }); + Assert.assertEquals("c3_v1", new String((byte[]) res.get("c3"), "UTF-8")); + Assert.assertEquals(200L, res.get("c4")); + + // 4. check no exists not match: insup(4, 'c2_v0', 'c3_v0', 200) if exists c4 is null + InsertOrUpdate insertOrUpdate4 = new InsertOrUpdate(); + insertOrUpdate4.setRowKey(row(colVal("c1", 4L), colVal("c2", "c2_v0"))); + insertOrUpdate4.addMutateRow(row(colVal("c3", "c3_v0"), colVal("c4", 200L))); + filter = compareVal(ObCompareOp.IS_NOT, "c4", null); + CheckAndInsUp checkAndInsUp4 = client.checkAndInsUp(TABLE_NAME, filter, + insertOrUpdate4, false); + MutationResult result4 = checkAndInsUp4.execute(); + Assert.assertEquals(0, result4.getAffectedRows()); + res = client.get(TABLE_NAME, new Object[] { 3L, "c2_v0" }, new String[] { "c3", "c4" }); + Assert.assertEquals("c3_v1", new String((byte[]) res.get("c3"), "UTF-8")); + Assert.assertEquals(200L, res.get("c4")); + + } catch (Exception e) { + e.printStackTrace(); + Assert.assertTrue(false); + } finally { + for (long i = 1L; i <= 4L; i++) { + Delete delete = client.delete(TABLE_NAME); + delete.setRowKey(row(colVal("c1", i), colVal("c2", "c2_v0"))); + MutationResult res = delete.execute(); + Assert.assertEquals(1, res.getAffectedRows()); + } + } + } +} diff --git a/src/test/java/com/alipay/oceanbase/rpc/ObTableGlobalIndexTest.java b/src/test/java/com/alipay/oceanbase/rpc/ObTableGlobalIndexTest.java index 3e0e14e2..148a1abb 100644 --- a/src/test/java/com/alipay/oceanbase/rpc/ObTableGlobalIndexTest.java +++ b/src/test/java/com/alipay/oceanbase/rpc/ObTableGlobalIndexTest.java @@ -16,6 +16,7 @@ */ package com.alipay.oceanbase.rpc; + import com.alipay.oceanbase.rpc.mutation.Row; import com.alipay.oceanbase.rpc.stream.QueryResultSet; import com.alipay.oceanbase.rpc.table.api.TableQuery; @@ -34,7 +35,8 @@ import static org.junit.Assert.assertEquals; public class ObTableGlobalIndexTest { - ObTableClient client; + ObTableClient client; + @Before public void setup() throws Exception { setEnableIndexDirectSelect(); @@ -56,8 +58,9 @@ public void cleanPartitionLocationTable(String tableName) throws Exception { } public void checkIndexTableRow(String tableName, int recordCount) throws Exception { - String sql1 = "select table_name from oceanbase.__all_virtual_table where data_table_id = " + - "(select table_id from oceanbase.__all_virtual_table where table_name = '" + tableName + "')"; + String sql1 = "select table_name from oceanbase.__all_virtual_table where data_table_id = " + + "(select table_id from oceanbase.__all_virtual_table where table_name = '" + + tableName + "')"; Connection connection = ObTableClientTestUtil.getConnection(); Statement statement = connection.createStatement(); ResultSet resultSet = statement.executeQuery(sql1); @@ -94,8 +97,8 @@ public void test_insert(String tableName, int recordCount) throws Exception { String[] properties_name = { "C2", "C3" }; for (int i = 0; i < recordCount; i++) { int key = gen_key(i); - long affectRows = client.insert(tableName, new Object[] { key }, properties_name , - new Object[] { key + 1, ("hello " + key).getBytes() }); + long affectRows = client.insert(tableName, new Object[] { key }, properties_name, + new Object[] { key + 1, ("hello " + key).getBytes() }); Assert.assertEquals(1, affectRows); } // check index table row counts @@ -104,9 +107,10 @@ public void test_insert(String tableName, int recordCount) throws Exception { // get data for (int i = 0; i < recordCount; i++) { int key = gen_key(i); - Map result = client.get(tableName, new Object[] { key }, properties_name); + Map result = client.get(tableName, new Object[] { key }, + properties_name); Assert.assertEquals(key + 1, result.get("C2")); - Assert.assertEquals("hello " + key, result.get("C3")); + Assert.assertEquals("hello " + key, result.get("C3")); } } finally { @@ -129,32 +133,36 @@ public void test_update() throws Exception { public void test_update(String tableName, int recordCount) throws Exception { try { - client.addRowKeyElement(tableName, new String[]{"C1"}); - String[] properties_name = {"C2", "C3"}; + client.addRowKeyElement(tableName, new String[] { "C1" }); + String[] properties_name = { "C2", "C3" }; // prepare data for (int i = 0; i < recordCount; i++) { int key = gen_key(i); - long affectRows = client.insert(tableName, new Object[]{key}, properties_name, - new Object[]{key + 1, ("hello " + key).getBytes()}); + long affectRows = client.insert(tableName, new Object[] { key }, properties_name, + new Object[] { key + 1, ("hello " + key).getBytes() }); Assert.assertEquals(1, affectRows); } // update global index key for (int i = 0; i < recordCount; i++) { int key = gen_key(i); - long affectRows = client.update(tableName, new Object[]{ key }, new String[] { "C2" }, new Object[] { key + 2 }); + long affectRows = client.update(tableName, new Object[] { key }, + new String[] { "C2" }, new Object[] { key + 2 }); Assert.assertEquals(1, affectRows); - Map result = client.get(tableName, new Object[] { key }, properties_name); + Map result = client.get(tableName, new Object[] { key }, + properties_name); Assert.assertEquals(key + 2, result.get("C2")); - Assert.assertEquals("hello " + key, result.get("C3")); + Assert.assertEquals("hello " + key, result.get("C3")); } // update other key for (int i = 0; i < recordCount; i++) { int key = gen_key(i); - long affectRows = client.update(tableName, new Object[]{ key }, new String[] { "C3" }, new Object[] { "hi " + key }); + long affectRows = client.update(tableName, new Object[] { key }, + new String[] { "C3" }, new Object[] { "hi " + key }); Assert.assertEquals(1, affectRows); - Map result = client.get(tableName, new Object[] { key }, properties_name); + Map result = client.get(tableName, new Object[] { key }, + properties_name); Assert.assertEquals(key + 2, result.get("C2")); - Assert.assertEquals("hi " + key, result.get("C3")); + Assert.assertEquals("hi " + key, result.get("C3")); } checkIndexTableRow(tableName, recordCount); } finally { @@ -177,28 +185,30 @@ public void test_insert_or_update() throws Exception { public void test_insert_or_update(String tableName, int recordCount) throws Exception { try { - client.addRowKeyElement(tableName, new String[]{"C1"}); - String[] properties_name = {"C2", "C3"}; + client.addRowKeyElement(tableName, new String[] { "C1" }); + String[] properties_name = { "C2", "C3" }; // prepare data: insert for (int i = 0; i < recordCount; i++) { int key = gen_key(i); - long affectRows = client.insertOrUpdate(tableName, new Object[]{key}, properties_name, - new Object[]{key + 1, ("hello " + key).getBytes()}); + long affectRows = client.insertOrUpdate(tableName, new Object[] { key }, + properties_name, new Object[] { key + 1, ("hello " + key).getBytes() }); Assert.assertEquals(1, affectRows); - Map result = client.get(tableName, new Object[] { key }, properties_name); + Map result = client.get(tableName, new Object[] { key }, + properties_name); Assert.assertEquals(key + 1, result.get("C2")); - Assert.assertEquals("hello " + key, result.get("C3")); + Assert.assertEquals("hello " + key, result.get("C3")); } // insert again: update for (int i = 0; i < recordCount; i++) { int key = gen_key(i); - long affectRows = client.insertOrUpdate(tableName, new Object[]{key}, properties_name, - new Object[]{key + 2, ("hi " + key).getBytes()}); + long affectRows = client.insertOrUpdate(tableName, new Object[] { key }, + properties_name, new Object[] { key + 2, ("hi " + key).getBytes() }); Assert.assertEquals(1, affectRows); - Map result = client.get(tableName, new Object[] { key }, properties_name); + Map result = client.get(tableName, new Object[] { key }, + properties_name); Assert.assertEquals(key + 2, result.get("C2")); - Assert.assertEquals("hi " + key, result.get("C3")); + Assert.assertEquals("hi " + key, result.get("C3")); } checkIndexTableRow(tableName, recordCount); @@ -222,15 +232,16 @@ public void test_replace() throws Exception { public void test_replace(String tableName, int recordCount) throws Exception { try { - client.addRowKeyElement(tableName, new String[]{"C1"}); - String[] properties_name = {"C2", "C3"}; + client.addRowKeyElement(tableName, new String[] { "C1" }); + String[] properties_name = { "C2", "C3" }; // prepare data: insert for (int i = 0; i < recordCount; i++) { int key = gen_key(i); - long affectRows = client.replace(tableName, new Object[]{key}, properties_name, - new Object[]{key + 1, ("hello " + key).getBytes()}); + long affectRows = client.replace(tableName, new Object[] { key }, properties_name, + new Object[] { key + 1, ("hello " + key).getBytes() }); Assert.assertEquals(1, affectRows); - Map result = client.get(tableName, new Object[]{key}, properties_name); + Map result = client.get(tableName, new Object[] { key }, + properties_name); Assert.assertEquals(key + 1, result.get("C2")); Assert.assertEquals("hello " + key, result.get("C3")); } @@ -238,10 +249,11 @@ public void test_replace(String tableName, int recordCount) throws Exception { // insert again: update for (int i = 0; i < recordCount; i++) { int key = gen_key(i); - long affectRows = client.replace(tableName, new Object[]{key}, properties_name, - new Object[]{key + 2, ("hi " + key).getBytes()}); + long affectRows = client.replace(tableName, new Object[] { key }, properties_name, + new Object[] { key + 2, ("hi " + key).getBytes() }); Assert.assertEquals(2, affectRows); - Map result = client.get(tableName, new Object[]{key}, properties_name); + Map result = client.get(tableName, new Object[] { key }, + properties_name); Assert.assertEquals(key + 2, result.get("C2")); Assert.assertEquals("hi " + key, result.get("C3")); } @@ -268,35 +280,38 @@ public void test_increment_append() throws Exception { public void test_increment_append(String tableName, int recordCount) throws Exception { try { - client.addRowKeyElement(tableName, new String[]{"c1"}); - String[] properties_name = {"c2"}; + client.addRowKeyElement(tableName, new String[] { "c1" }); + String[] properties_name = { "c2" }; // increment without record for (int i = 0; i < recordCount; i++) { int key = gen_key(i); - Map affect_res = client.increment(tableName, new Object[]{ key }, - properties_name, new Object[] { key + 1 }, true); + Map affect_res = client.increment(tableName, new Object[] { key }, + properties_name, new Object[] { key + 1 }, true); Assert.assertEquals(key + 1, affect_res.get("c2")); - Map get_result = client.get(tableName, new Object[]{ key }, properties_name); - Assert.assertEquals( key + 1, get_result.get("c2")); + Map get_result = client.get(tableName, new Object[] { key }, + properties_name); + Assert.assertEquals(key + 1, get_result.get("c2")); } // increment without column value for (int i = 0; i < recordCount; i++) { int key = gen_key(i); - Map affect_res = client.increment(tableName, new Object[]{key}, - properties_name, new Object[] { key + 1 }, true); + Map affect_res = client.increment(tableName, new Object[] { key }, + properties_name, new Object[] { key + 1 }, true); Assert.assertEquals(2 * key + 2, affect_res.get("c2")); - Map get_result = client.get(tableName, new Object[]{key}, properties_name); + Map get_result = client.get(tableName, new Object[] { key }, + properties_name); Assert.assertEquals(2 * key + 2, get_result.get("c2")); } // append with empty column value for (int i = 0; i < recordCount; i++) { int key = gen_key(i); - Map affect_res = client.append(tableName, new Object[]{key}, - new String[] { "c3" }, new Object[] { "hi~".getBytes() }, true); + Map affect_res = client.append(tableName, new Object[] { key }, + new String[] { "c3" }, new Object[] { "hi~".getBytes() }, true); Assert.assertEquals("hi~", affect_res.get("c3")); - Map get_result = client.get(tableName, new Object[]{key}, new String[] { "c3" }); + Map get_result = client.get(tableName, new Object[] { key }, + new String[] { "c3" }); Assert.assertEquals("hi~", get_result.get("c3")); } checkIndexTableRow(tableName, recordCount); @@ -304,10 +319,11 @@ public void test_increment_append(String tableName, int recordCount) throws Exce // append with not empty column value for (int i = 0; i < recordCount; i++) { int key = gen_key(i); - Map affect_res = client.append(tableName, new Object[]{key}, - new String[] { "c3" }, new Object[] { " hi~".getBytes() }, true); + Map affect_res = client.append(tableName, new Object[] { key }, + new String[] { "c3" }, new Object[] { " hi~".getBytes() }, true); Assert.assertEquals("hi~ hi~", affect_res.get("c3")); - Map get_result = client.get(tableName, new Object[]{key}, new String[] { "c3" }); + Map get_result = client.get(tableName, new Object[] { key }, + new String[] { "c3" }); Assert.assertEquals("hi~ hi~", get_result.get("c3")); } checkIndexTableRow(tableName, recordCount); @@ -344,14 +360,14 @@ public void test_query_in_global_index_table(String tableName) throws Exception Object[] properties_value = null; for (int i = 0; i < recordCount; i++) { if (i % 3 == 0) { - properties_value = new Object[] { i + 1, i + 2 }; + properties_value = new Object[] { i + 1, i + 2 }; } else if (i % 3 == 1) { - properties_value = new Object[] { i + 100 + 1, i + 100 + 2 }; + properties_value = new Object[] { i + 100 + 1, i + 100 + 2 }; } else if (i % 3 == 2) { - properties_value = new Object[] { i + 200 + 1, i + 200 + 2 }; + properties_value = new Object[] { i + 200 + 1, i + 200 + 2 }; } - long affectRows = client.insert(tableName, new Object[] { i }, - properties_name, properties_value); + long affectRows = client.insert(tableName, new Object[] { i }, properties_name, + properties_value); Assert.assertEquals(1, affectRows); } @@ -360,12 +376,12 @@ public void test_query_in_global_index_table(String tableName) throws Exception query.addScanRange(new Object[] { 0 }, new Object[] { recordCount }); QueryResultSet resultSet = query.execute(); - int count = 0; + int count = 0; while (resultSet.next()) { Map row = resultSet.getRow(); - int c1 = (int)row.get("C1"); - int c2 = (int)row.get("C2"); - int c3 = (int)row.get("C3"); + int c1 = (int) row.get("C1"); + int c2 = (int) row.get("C2"); + int c3 = (int) row.get("C3"); if (c1 % 3 == 0) { Assert.assertEquals(c1 + 1, c2); Assert.assertEquals(c1 + 2, c3); @@ -388,9 +404,9 @@ public void test_query_in_global_index_table(String tableName) throws Exception count = 0; while (resultSet2.next()) { Map row = resultSet2.getRow(); - int c1 = (int)row.get("C1"); - int c2 = (int)row.get("C2"); - int c3 = (int)row.get("C3"); + int c1 = (int) row.get("C1"); + int c2 = (int) row.get("C2"); + int c3 = (int) row.get("C3"); if (c1 % 3 == 0) { Assert.assertEquals(c1 + 1, c2); Assert.assertEquals(c1 + 2, c3); @@ -415,16 +431,16 @@ public void test_query_in_global_index_table(String tableName) throws Exception // query by local index, will lookup primary table TableQuery query4 = client.query(tableName).indexName("idx2"); query4.setScanRangeColumns("C3"); - query4.addScanRange(new Object[] { 0 }, new Object[] { recordCount + 200 + 2}); - query4.select("C1", "C2","C3"); + query4.addScanRange(new Object[] { 0 }, new Object[] { recordCount + 200 + 2 }); + query4.select("C1", "C2", "C3"); QueryResultSet resultSet4 = query4.execute(); Assert.assertEquals(resultSet4.cacheSize(), recordCount); count = 0; while (resultSet4.next()) { Map row = resultSet2.getRow(); - int c1 = (int)row.get("C1"); - int c2 = (int)row.get("C2"); - int c3 = (int)row.get("C3"); + int c1 = (int) row.get("C1"); + int c2 = (int) row.get("C2"); + int c3 = (int) row.get("C3"); if (c1 % 3 == 0) { Assert.assertEquals(c1 + 1, c2); Assert.assertEquals(c1 + 2, c3); @@ -458,14 +474,14 @@ public void test_query_sync(String tableName) throws Exception { Object[] properties_value = null; for (int i = 0; i < recordCount; i++) { if (i % 3 == 0) { - properties_value = new Object[] { i + 1, i + 2 }; + properties_value = new Object[] { i + 1, i + 2 }; } else if (i % 3 == 1) { - properties_value = new Object[] { i + 100 + 1, i + 100 + 2 }; + properties_value = new Object[] { i + 100 + 1, i + 100 + 2 }; } else if (i % 3 == 2) { - properties_value = new Object[] { i + 200 + 1, i + 200 + 2 }; + properties_value = new Object[] { i + 200 + 1, i + 200 + 2 }; } - long affectRows = client.insert(tableName, new Object[] { i }, - properties_name, properties_value); + long affectRows = client.insert(tableName, new Object[] { i }, properties_name, + properties_value); Assert.assertEquals(1, affectRows); } @@ -482,9 +498,9 @@ public void test_query_sync(String tableName) throws Exception { for (int i = 0; i < 5; i++) { Assert.assertTrue(result.next()); Map row = result.getRow(); - int c1 = (int)row.get("C1"); - int c2 = (int)row.get("C2"); - int c3 = (int)row.get("C3"); + int c1 = (int) row.get("C1"); + int c2 = (int) row.get("C2"); + int c3 = (int) row.get("C3"); if (c1 % 3 == 0) { Assert.assertEquals(c1 + 1, c2); Assert.assertEquals(c1 + 2, c3); @@ -514,41 +530,37 @@ PRIMARY KEY (`c1`, `c2`), **/ @Test public void test_ttl_query_with_global_index() throws Exception { - String tableName = "test_ttl_timestamp_with_index"; - String rowKey1 = "c1"; - String rowKey2 = "c2"; - String intCol = "c3"; - String intCol2 = "c4"; - String expireCol = "expired_ts"; + String tableName = "test_ttl_timestamp_with_index"; + String rowKey1 = "c1"; + String rowKey2 = "c2"; + String intCol = "c3"; + String intCol2 = "c4"; + String expireCol = "expired_ts"; String prefixKey = "test"; long[] keyIds = { 1L, 2L }; try { // 1. insert records with null expired_ts for (long id : keyIds) { client.insert(tableName).setRowKey(colVal(rowKey1, prefixKey), colVal(rowKey2, id)) - .addMutateColVal(colVal(intCol, id+100)) - .addMutateColVal(colVal(intCol2, id+200)) - .addMutateColVal(colVal(expireCol, null)).execute(); + .addMutateColVal(colVal(intCol, id + 100)) + .addMutateColVal(colVal(intCol2, id + 200)) + .addMutateColVal(colVal(expireCol, null)).execute(); } // 2. query all inserted records - QueryResultSet resultSet = client.query(tableName) - .indexName("idx2") - .setScanRangeColumns(intCol) - .addScanRange(new Object[] {101L}, new Object[] {102L}) - .execute(); + QueryResultSet resultSet = client.query(tableName).indexName("idx2") + .setScanRangeColumns(intCol) + .addScanRange(new Object[] { 101L }, new Object[] { 102L }).execute(); Assert.assertEquals(resultSet.cacheSize(), keyIds.length); // 3. update the expired_ts Timestamp curTs = new Timestamp(System.currentTimeMillis()); - client.update(tableName).setRowKey(colVal(rowKey1, prefixKey), colVal(rowKey2, keyIds[1])) - .addMutateColVal(colVal(expireCol, curTs)).execute(); + client.update(tableName) + .setRowKey(colVal(rowKey1, prefixKey), colVal(rowKey2, keyIds[1])) + .addMutateColVal(colVal(expireCol, curTs)).execute(); // 3. re-query all inserted records, the expired record won't be returned - resultSet = client.query(tableName) - .indexName("idx2") - .setScanRangeColumns(intCol2) - .addScanRange(new Object[] {101L}, new Object[] {102L}) - .execute(); + resultSet = client.query(tableName).indexName("idx2").setScanRangeColumns(intCol2) + .addScanRange(new Object[] { 101L }, new Object[] { 102L }).execute(); Assert.assertEquals(resultSet.cacheSize(), 1); Assert.assertTrue(resultSet.next()); Row row = resultSet.getResultRow(); @@ -558,7 +570,8 @@ public void test_ttl_query_with_global_index() throws Exception { Assert.assertTrue(false); } finally { for (long id : keyIds) { - client.delete(tableName).setRowKey(colVal(rowKey1, prefixKey), colVal(rowKey2, id)).execute(); + client.delete(tableName).setRowKey(colVal(rowKey1, prefixKey), colVal(rowKey2, id)) + .execute(); } } }