Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 53 additions & 17 deletions src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
package com.alipay.oceanbase.rpc;

import com.alibaba.fastjson.JSON;
import com.alipay.oceanbase.rpc.checkandmutate.CheckAndInsUp;
import com.alipay.oceanbase.rpc.constant.Constants;
import com.alipay.oceanbase.rpc.exception.*;
import com.alipay.oceanbase.rpc.batch.QueryByBatch;
import com.alipay.oceanbase.rpc.location.LocationUtil;
import com.alipay.oceanbase.rpc.filter.ObTableFilter;
import com.alipay.oceanbase.rpc.location.model.*;
import com.alipay.oceanbase.rpc.location.model.partition.ObPair;
import com.alipay.oceanbase.rpc.location.model.partition.ObPartIdCalculator;
import com.alipay.oceanbase.rpc.location.model.partition.ObPartitionLevel;
import com.alipay.oceanbase.rpc.mutation.*;
import com.alipay.oceanbase.rpc.mutation.result.MutationResult;
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.*;
Expand Down Expand Up @@ -910,8 +913,8 @@ public void syncRefreshMetadata() throws Exception {
* @param scanRangeColumns columns that need to be scaned
* @return the real table name
*/
public String getIndexTableName(final String dataTableName, final String indexName, List<String> scanRangeColumns)
throws Exception {
public String getIndexTableName(final String dataTableName, final String indexName,
List<String> scanRangeColumns) throws Exception {
String indexTableName = dataTableName;
if (indexName != null && !indexName.equals("PRIMARY")) {
String tmpTableName = constructIndexTableName(dataTableName, indexName);
Expand All @@ -925,27 +928,29 @@ public String getIndexTableName(final String dataTableName, final String indexNa
if (indexInfo.getIndexType().isGlobalIndex()) {
indexTableName = tmpTableName;
if (scanRangeColumns.isEmpty()) {
throw new ObTableException("query by global index need add all index keys in order");
throw new ObTableException(
"query by global index need add all index keys in order");
} else {
addRowKeyElement(indexTableName, scanRangeColumns.toArray(new String[scanRangeColumns.size()]));
addRowKeyElement(indexTableName,
scanRangeColumns.toArray(new String[scanRangeColumns.size()]));
}
}
}
return indexTableName;
}

public String constructIndexTableName(final String dataTableName, final String indexName)
throws Exception {
throws Exception {
// construct index table name
TableEntry entry = tableLocations.get(dataTableName);
Long dataTableId = null;
try {
if (entry == null) {
ObServerAddr addr = serverRoster.getServer(serverAddressPriorityTimeout,
serverAddressCachingTimeout);
serverAddressCachingTimeout);
dataTableId = LocationUtil.getTableIdFromRemote(addr, sysUA,
tableEntryAcquireConnectTimeout, tableEntryAcquireSocketTimeout, tenantName,
database, dataTableName);
tableEntryAcquireConnectTimeout, tableEntryAcquireSocketTimeout, tenantName,
database, dataTableName);
} else {
dataTableId = entry.getTableId();
}
Expand All @@ -957,7 +962,7 @@ public String constructIndexTableName(final String dataTableName, final String i
}

public ObIndexInfo getOrRefreshIndexInfo(final String indexName, final String indexTableName)
throws Exception {
throws Exception {
ObIndexInfo indexInfo = indexinfos.get(indexName);
if (indexInfo != null) {
return indexInfo;
Expand All @@ -968,8 +973,8 @@ public ObIndexInfo getOrRefreshIndexInfo(final String indexName, final String in
boolean acquired = lock.tryLock(tableEntryRefreshLockTimeout, TimeUnit.MILLISECONDS);
if (!acquired) {
String errMsg = "try to lock index infos refreshing timeout " + "dataSource:"
+ dataSourceName + " ,indexName:" + indexName + " , timeout:"
+ tableEntryRefreshLockTimeout + ".";
+ dataSourceName + " ,indexName:" + indexName + " , timeout:"
+ tableEntryRefreshLockTimeout + ".";
RUNTIME.error(errMsg);
throw new ObTableEntryRefreshException(errMsg);
}
Expand All @@ -979,21 +984,21 @@ public ObIndexInfo getOrRefreshIndexInfo(final String indexName, final String in
return indexInfo;
} else {
logger.info("index info is not exist, create new index info, indexName: {}",
indexName);
indexName);
int serverSize = serverRoster.getMembers().size();
int refreshTryTimes = tableEntryRefreshTryTimes > serverSize ? serverSize
: tableEntryRefreshTryTimes;
: tableEntryRefreshTryTimes;
for (int i = 0; i < refreshTryTimes; i++) {
ObServerAddr serverAddr = serverRoster.getServer(serverAddressPriorityTimeout,
serverAddressCachingTimeout);
serverAddressCachingTimeout);
indexInfo = getIndexInfoFromRemote(serverAddr, sysUA,
tableEntryAcquireConnectTimeout, tableEntryAcquireSocketTimeout,
indexTableName);
tableEntryAcquireConnectTimeout, tableEntryAcquireSocketTimeout,
indexTableName);
if (indexInfo != null) {
indexinfos.put(indexName, indexInfo);
} else {
RUNTIME.error("get index info from remote is null, index name: {}",
indexName);
indexName);
}
}
return indexInfo;
Expand Down Expand Up @@ -2265,6 +2270,27 @@ public ObPayload mutationWithFilter(final TableQuery tableQuery, final Object[]
final List<ObNewRange> keyRanges,
final ObTableOperation operation, final boolean withResult)
throws Exception {
return mutationWithFilter(tableQuery, rowKey, keyRanges, operation, withResult, false,
false);
}

/**
* execute mutation with filter
* @param tableQuery table query
* @param rowKey row key which want to mutate
* @param keyRanges scan range
* @param operation table operation
* @param withResult whether to bring back result
* @param checkAndExecute whether execute check and execute instead of query and mutate
* @param checkExists whether to check exists or not
* @return execute result
* @throws Exception exception
*/
public ObPayload mutationWithFilter(final TableQuery tableQuery, final Object[] rowKey,
final List<ObNewRange> keyRanges,
final ObTableOperation operation, final boolean withResult,
final boolean checkAndExecute, final boolean checkExists)
throws Exception {
final long start = System.currentTimeMillis();
if (tableQuery != null && tableQuery.getObTableQuery().getKeyRanges().isEmpty()) {
// fill a whole range if no range is added explicitly.
Expand All @@ -2287,6 +2313,8 @@ public ObPayload execute(ObPair<Long, ObTableParam> obPair) throws Exception {
request.setTableId(tableParam.getTableId());
// partId/tabletId
request.setPartitionId(tableParam.getPartitionId());
request.getTableQueryAndMutate().setIsCheckAndExecute(checkAndExecute);
request.getTableQueryAndMutate().setIsCheckNoExists(!checkExists);
ObPayload result = obTable.execute(request);
String endpoint = obTable.getIp() + ":" + obTable.getPort();
MonitorUtil.info(request, database, tableQuery.getTableName(), "QUERY_AND_MUTATE",
Expand Down Expand Up @@ -2517,6 +2545,14 @@ private ObTableQueryAndMutateRequest buildObTableQueryAndMutateRequest(ObTableQu
return request;
}

/**
* checkAndInsUp.
*/
public CheckAndInsUp checkAndInsUp(String tableName, ObTableFilter filter,
InsertOrUpdate insUp, boolean checkExists) {
return new CheckAndInsUp(this, tableName, filter, insUp, checkExists);
}

/**
* Set full username
* @param fullUserName user name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.alipay.oceanbase.rpc.protocol.payload.Pcodes;
import com.alipay.oceanbase.rpc.protocol.payload.impl.direct_load.ObTableDirectLoadResult;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableBatchOperationResult;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableLSOpResult;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationResult;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.mutate.ObTableQueryAndMutateResult;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQueryResult;
Expand Down Expand Up @@ -94,6 +95,15 @@ public ObPayload newPayload(ObRpcPacketHeader header) {
return new ObTableDirectLoadResult();
}
}, //
OB_TABLE_API_LS_EXECUTE(Pcodes.OB_TABLE_API_LS_EXECUTE) {
/**
* New payload.
*/
@Override
public ObPayload newPayload(ObRpcPacketHeader header) {
return new ObTableLSOpResult();
}
}, //
OB_ERROR_PACKET(Pcodes.OB_ERROR_PACKET) {
/*
* New payload.
Expand Down Expand Up @@ -136,6 +146,8 @@ public static ObTablePacketCode valueOf(short value) {
return OB_TABLE_API_EXECUTE_QUERY_SYNC;
case Pcodes.OB_TABLE_API_DIRECT_LOAD:
return OB_TABLE_API_DIRECT_LOAD;
case Pcodes.OB_TABLE_API_LS_EXECUTE:
return OB_TABLE_API_LS_EXECUTE;
case Pcodes.OB_ERROR_PACKET:
return OB_ERROR_PACKET;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*-
* #%L
* OBKV Table Client Framework
* %%
* Copyright (C) 2024 OceanBase
* %%
* OBKV Table Client Framework is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* #L%
*/

package com.alipay.oceanbase.rpc.checkandmutate;

import com.alipay.oceanbase.rpc.ObTableClient;
import com.alipay.oceanbase.rpc.exception.ObTableException;
import com.alipay.oceanbase.rpc.filter.ObTableFilter;
import com.alipay.oceanbase.rpc.mutation.InsertOrUpdate;
import com.alipay.oceanbase.rpc.mutation.result.MutationResult;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperation;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationType;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObNewRange;
import com.alipay.oceanbase.rpc.table.api.Table;
import com.alipay.oceanbase.rpc.table.api.TableQuery;

import java.util.ArrayList;
import java.util.List;

public class CheckAndInsUp {
private Table client;
private String tableName;
private ObTableFilter filter;
private InsertOrUpdate insUp;
boolean checkExists = true;

public CheckAndInsUp(ObTableFilter filter, InsertOrUpdate insUp, boolean check_exists)
throws IllegalArgumentException {
this.filter = filter;
this.insUp = insUp;
this.checkExists = check_exists;
this.tableName = null;
this.client = null;
}

public CheckAndInsUp(Table client, String tableName, ObTableFilter filter,
InsertOrUpdate insUp, boolean check_exists)
throws IllegalArgumentException {
this.client = client;
this.tableName = tableName;
this.filter = filter;
this.insUp = insUp;
this.checkExists = check_exists;
}

public Object[] getRowKey() {
return insUp.getRowKey();
}

public ObTableFilter getFilter() {
return filter;
}

public InsertOrUpdate getInsUp() {
return insUp;
}

public boolean isCheckExists() {
return checkExists;
}

public MutationResult execute() throws Exception {
if (null == tableName) {
throw new ObTableException("table name is null");
} else if (null == client) {
throw new ObTableException("client is null");
} else if (!(client instanceof ObTableClient)) {
throw new ObTableException("the client must be table clinet");
}

TableQuery query = client.query(tableName);
query.setFilter(filter);
Object[] rowKey = getRowKey();
List<ObNewRange> ranges = new ArrayList<>();
ObNewRange range = new ObNewRange();
range.setStartKey(ObRowKey.getInstance(insUp.getRowKey()));
range.setEndKey(ObRowKey.getInstance(insUp.getRowKey()));
ranges.add(range);
query.getObTableQuery().setKeyRanges(ranges);
ObTableOperation operation = ObTableOperation.getInstance(ObTableOperationType.INSERT_OR_UPDATE,
insUp.getRowKey(), insUp.getColumns(), insUp.getValues());

return new MutationResult(((ObTableClient)client).mutationWithFilter(query, rowKey, ranges, operation, false, true, checkExists));
}
}
19 changes: 10 additions & 9 deletions src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ public class LocationUtil {

private static final String OB_VERSION_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ OB_VERSION() AS CLUSTER_VERSION;";

private static final String PROXY_INDEX_INFO_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ data_table_id, table_id, index_type FROM oceanbase.__all_virtual_table " +
"where table_name = ?";
private static final String PROXY_INDEX_INFO_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ data_table_id, table_id, index_type FROM oceanbase.__all_virtual_table "
+ "where table_name = ?";

private static final String PROXY_TABLE_ID_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ table_id from oceanbase.__all_virtual_proxy_schema " +
"where tenant_name = ? and database_name = ? and table_name = ? limit 1";
private static final String PROXY_TABLE_ID_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ table_id from oceanbase.__all_virtual_proxy_schema "
+ "where tenant_name = ? and database_name = ? and table_name = ? limit 1";

private static final String OB_TENANT_EXIST_SQL = "SELECT /*+READ_CONSISTENCY(WEAK)*/ tenant_id from __all_tenant where tenant_name = ?;";

Expand Down Expand Up @@ -688,8 +688,9 @@ public static TableEntry getTableEntryLocationFromRemote(Connection connection,
}

public static Long getTableIdFromRemote(ObServerAddr obServerAddr, ObUserAuth sysUA,
long connectTimeout, long socketTimeout, String tenantName,
String databaseName, String tableName) throws ObTableEntryRefreshException {
long connectTimeout, long socketTimeout,
String tenantName, String databaseName, String tableName)
throws ObTableEntryRefreshException {
Long tableId = null;
Connection connection = null;
PreparedStatement ps = null;
Expand All @@ -706,11 +707,11 @@ public static Long getTableIdFromRemote(ObServerAddr obServerAddr, ObUserAuth sy
tableId = rs.getLong("table_id");
} else {
throw new ObTableEntryRefreshException("fail to get " + tableName
+ " table_id from remote");
+ " table_id from remote");
}
} catch (Exception e) {
throw new ObTableEntryRefreshException("fail to get " + tableName
+ " table_id from remote", e);
+ " table_id from remote", e);
} finally {
try {
if (null != rs) {
Expand All @@ -729,7 +730,7 @@ public static Long getTableIdFromRemote(ObServerAddr obServerAddr, ObUserAuth sy
public static ObIndexInfo getIndexInfoFromRemote(ObServerAddr obServerAddr, ObUserAuth sysUA,
long connectTimeout, long socketTimeout,
String indexTableName)
throws ObTableEntryRefreshException {
throws ObTableEntryRefreshException {
ObIndexInfo indexInfo = null;
Connection connection = null;
PreparedStatement ps = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,6 @@ public void setIndexType(ObIndexType indexType) {
@Override
public String toString() {
return "ObIndexInfo{" + "dataTableId=" + dataTableId + ", indexTableId=" + indexTableId
+ ", indexTableName=" + indexTableName + ", indexType=" + indexType + '}';
+ ", indexTableName=" + indexTableName + ", indexType=" + indexType + '}';
}
}
Loading