Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 52 additions & 13 deletions src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
import com.alipay.oceanbase.rpc.location.model.*;
import com.alipay.oceanbase.rpc.location.model.partition.ObPair;
import com.alipay.oceanbase.rpc.location.model.partition.ObPartIdCalculator;
import com.alipay.oceanbase.rpc.location.model.partition.ObPartitionInfo;
import com.alipay.oceanbase.rpc.location.model.partition.ObPartitionLevel;
import com.alipay.oceanbase.rpc.mutation.*;
import com.alipay.oceanbase.rpc.mutation.result.MutationResult;
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.*;
Expand Down Expand Up @@ -763,7 +763,7 @@ public void calculateContinuousFailure(String tableName, String errorMsg) throws
if (failures.incrementAndGet() > runtimeContinuousFailureCeiling) {
logger.warn("refresh table entry {} while execute failed times exceeded {}, msg: {}",
tableName, runtimeContinuousFailureCeiling, errorMsg);
getOrRefreshTableEntry(tableName, true, isTableEntryRefreshIntervalWait());
getOrRefreshTableEntry(tableName, true, isTableEntryRefreshIntervalWait(), true);
failures.set(0);
}
}
Expand Down Expand Up @@ -1021,6 +1021,21 @@ public ObIndexInfo getOrRefreshIndexInfo(final String indexName, final String in
*/
public TableEntry getOrRefreshTableEntry(final String tableName, final boolean refresh,
final boolean waitForRefresh) throws Exception {
return getOrRefreshTableEntry(tableName, refresh, waitForRefresh, false);
}

/**
* Get or refresh table entry.
* @param tableName table name
* @param refresh is re-fresh
* @param waitForRefresh wait re-fresh
* @param fetchAll fetch all data from server if needed
* @return this
* @throws Exception if fail
*/
public TableEntry getOrRefreshTableEntry(final String tableName, final boolean refresh,
final boolean waitForRefresh, boolean fetchAll)
throws Exception {

TableEntry tableEntry = tableLocations.get(tableName);
// attempt the cached data and try best to avoid lock
Expand All @@ -1035,7 +1050,9 @@ public TableEntry getOrRefreshTableEntry(final String tableName, final boolean r
punishInterval = Math.min(punishInterval, tableEntryRefreshIntervalCeiling);
long current = System.currentTimeMillis();
long interval = current - tableEntry.getRefreshTimeMills();
if (interval < punishInterval) {
long fetchAllInterval = current - tableEntry.getRefreshAllTimeMills();
if ((fetchAll && (fetchAllInterval < punishInterval))
|| (!fetchAll && (interval < punishInterval))) {
if (waitForRefresh) {
long toHoldTime = punishInterval - interval;
logger
Expand Down Expand Up @@ -1079,13 +1096,17 @@ public TableEntry getOrRefreshTableEntry(final String tableName, final boolean r

if (tableEntry != null) {
// the server roster is ordered by priority
long interval = (long) (tableEntryRefreshIntervalBase * Math.pow(2,
long punishInterval = (long) (tableEntryRefreshIntervalBase * Math.pow(2,
-serverRoster.getMaxPriority()));
interval = interval <= tableEntryRefreshIntervalCeiling ? interval
punishInterval = punishInterval <= tableEntryRefreshIntervalCeiling ? punishInterval
: tableEntryRefreshIntervalCeiling;
// control refresh frequency less than 100 milli second
// just in case of connecting to OB Server failed or change master
if (((System.currentTimeMillis() - tableEntry.getRefreshTimeMills())) < interval) {
long interval = System.currentTimeMillis() - tableEntry.getRefreshTimeMills();
long fetchAllInterval = System.currentTimeMillis()
- tableEntry.getRefreshAllTimeMills();
if ((fetchAll && (fetchAllInterval < punishInterval))
|| (!fetchAll && (interval < punishInterval))) {
return tableEntry;
}
}
Expand All @@ -1108,7 +1129,7 @@ public TableEntry getOrRefreshTableEntry(final String tableName, final boolean r

for (int i = 0; i < refreshTryTimes; i++) {
try {
return refreshTableEntry(tableEntry, tableName);
return refreshTableEntry(tableEntry, tableName, fetchAll);
} catch (ObTableNotExistException e) {
RUNTIME.error("getOrRefreshTableEntry meet exception", e);
throw e;
Expand Down Expand Up @@ -1159,12 +1180,24 @@ public TableEntry getOrRefreshTableEntry(final String tableName, final boolean r
*/
private TableEntry refreshTableEntry(TableEntry tableEntry, String tableName)
throws ObTableEntryRefreshException {
return refreshTableEntry(tableEntry, tableName, false);
}

/**
* 刷新 table entry 元数据
* @param tableEntry
* @param tableName
* @param fetchAll
* @return
* @throws ObTableEntryRefreshException
*/
private TableEntry refreshTableEntry(TableEntry tableEntry, String tableName, boolean fetchAll)
throws ObTableEntryRefreshException {
TableEntryKey tableEntryKey = new TableEntryKey(clusterName, tenantName, database,
tableName);
try {

// if table entry is exist we just need to refresh table locations
if (tableEntry != null) {
if (tableEntry != null && !fetchAll) {
tableEntry = loadTableEntryLocationWithPriority(serverRoster, //
tableEntryKey,//
tableEntry,//
Expand Down Expand Up @@ -1216,6 +1249,9 @@ private TableEntry refreshTableEntry(TableEntry tableEntry, String tableName)
tableEntry), e);
}
tableLocations.put(tableName, tableEntry);
if (fetchAll) {
tableEntry.setRefreshAllTimeMills(System.currentTimeMillis());
}
tableEntryRefreshContinuousFailureCount.set(0);
if (logger.isInfoEnabled()) {
logger.info(
Expand Down Expand Up @@ -1292,7 +1328,9 @@ private ObPair<Long, ReplicaLocation> getPartitionReplica(TableEntry tableEntry,
private ReplicaLocation getPartitionLocation(TableEntry tableEntry, long partId,
ObServerRoute route) {
if (ObGlobal.obVsnMajor() >= 4 && tableEntry.isPartitionTable()) {
long TabletId = tableEntry.getPartitionInfo().getPartTabletIdMap().get(partId);
ObPartitionInfo partInfo = tableEntry.getPartitionInfo();
Map<Long, Long> tabletIdMap = partInfo.getPartTabletIdMap();
long TabletId = tabletIdMap.get(partId);
return tableEntry.getPartitionEntry().getPartitionLocationWithTabletId(TabletId)
.getReplica(route);
} else {
Expand Down Expand Up @@ -1330,7 +1368,7 @@ public ObPair<Long, ObTableParam> getTable(String tableName, Object[] rowKey, bo
throws Exception {
TableEntry tableEntry = getOrRefreshTableEntry(tableName, refresh, waitForRefresh);

long partId = getPartition(tableEntry, rowKey);
long partId = getPartition(tableEntry, rowKey); // partition id in 3.x, origin partId in 4.x

return getTable(tableName, tableEntry, partId, waitForRefresh, route);
}
Expand Down Expand Up @@ -1389,7 +1427,7 @@ public ObPair<Long, ObTableParam> getTable(String tableName, List<ObNewRange> ke
/**
* get addr by pardId
* @param tableName table want to get
* @param partId partId where table located
* @param partId partId where table located (partition id in 3.x)
* @param refresh whether to refresh
* @param waitForRefresh whether wait for refresh
* @param route ObServer route
Expand All @@ -1407,7 +1445,7 @@ public ObPair<Long, ObTableParam> getTable(String tableName, long partId, boolea
* get addr from table entry by pardId
* @param tableName table want to get
* @param tableEntry tableEntry
* @param partId partId where table located
* @param partId partId where table located (partition id in 3.x)
* @param waitForRefresh whether wait for refresh
* @param route ObServer route
* @return ObPair of partId and table
Expand Down Expand Up @@ -1445,6 +1483,7 @@ public ObPair<Long, ObTableParam> getTable(String tableName, TableEntry tableEnt
}

ObTableParam param = new ObTableParam(obTable);
param.setPartId(partId); // used in getTable(), 4.x may change the origin partId
if (ObGlobal.obVsnMajor() >= 4 && tableEntry != null) {
long logicID = partId;
if (tableEntry.getPartitionInfo() != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class TableEntry {
private Long replicaNum = Constants.OB_INVALID_ID;
private ObPartitionInfo partitionInfo = null;
private volatile long refreshTimeMills;
private volatile long refreshAllTimeMills;
private Map<String, Integer> rowKeyElement = null;

// table location
Expand Down Expand Up @@ -143,13 +144,27 @@ public long getRefreshTimeMills() {
return refreshTimeMills;
}

/*
* Get refresh time mills.
*/
public long getRefreshAllTimeMills() {
return refreshAllTimeMills;
}

/*
* Set refresh time mills.
*/
public void setRefreshTimeMills(long refreshTimeMills) {
this.refreshTimeMills = refreshTimeMills;
}

/*
* Set refresh all time mills.
*/
public void setRefreshAllTimeMills(long refreshAllTimeMills) {
this.refreshAllTimeMills = refreshAllTimeMills;
}

public Map<String, Integer> getRowKeyElement() {
return rowKeyElement;
}
Expand Down Expand Up @@ -222,8 +237,9 @@ public void prepareForWeakRead(ObServerLdcLocation ldcLocation) {
public String toString() {
return "TableEntry{" + "tableId=" + tableId + ", partitionNum=" + partitionNum
+ ", replicaNum=" + replicaNum + ", partitionInfo=" + partitionInfo
+ ", refreshTimeMills=" + refreshTimeMills + ", rowKeyElement=" + rowKeyElement
+ ", tableLocation=" + tableLocation + ", tableEntryKey=" + tableEntryKey
+ ", partitionEntry=" + partitionEntry + '}';
+ ", refreshTimeMills=" + refreshTimeMills + ", refreshAllTimeMills="
+ refreshAllTimeMills + ", rowKeyElement=" + rowKeyElement + ", tableLocation="
+ tableLocation + ", tableEntryKey=" + tableEntryKey + ", partitionEntry="
+ partitionEntry + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ public void addTabletOperation(ObTableTabletOp tabletOperation) {
this.tabletOperations.add(tabletOperation);
int length = this.tabletOperations.size();
if (length == 1 && tabletOperation.isSameType()) {
setIsSameType(true);
return;
setIsSameType(true);
return;
}

if (isSameType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ public void partitionExecute(ObTableOperationResult[] results,
ObTableParam tableParam = partitionOperation.getValue().getLeft();
long tableId = tableParam.getTableId();
long partId = tableParam.getPartitionId();
long originPartId = tableParam.getPartId();
ObTable subObTable = tableParam.getObTable();
List<ObPair<Integer, ObTableOperation>> subOperationWithIndexList = partitionOperation
.getValue().getRight();
Expand Down Expand Up @@ -318,7 +319,8 @@ public void partitionExecute(ObTableOperationResult[] results,
if (obTableClient.isOdpMode()) {
subObTable = obTableClient.getOdpTable();
} else {
// 重试时重新 getTable
// getTable() when we need retry
// we should use partIdx to get table
if (tryTimes > 1) {
if (route == null) {
route = obTableClient.getRoute(batchOperation.isReadOnly());
Expand All @@ -327,7 +329,7 @@ public void partitionExecute(ObTableOperationResult[] results,
route.setBlackList(failedServerList);
}
subObTable = obTableClient
.getTable(tableName, partId, needRefreshTableEntry,
.getTable(tableName, originPartId, needRefreshTableEntry,
obTableClient.isTableEntryRefreshIntervalWait(), route).getRight()
.getObTable();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ public void partitionExecute(ObTableOperationResult[] results,
ObTableParam tableParam = partitionOperation.getValue().getLeft();
long tableId = tableParam.getTableId();
long partId = tableParam.getPartitionId();
long originPartId = tableParam.getPartId();
ObTable subObTable = tableParam.getObTable();
List<ObPair<Integer, ObTableSingleOp>> subOperationWithIndexList = partitionOperation
.getValue().getRight();
Expand Down Expand Up @@ -310,7 +311,7 @@ public void partitionExecute(ObTableOperationResult[] results,
if (failedServerList != null) {
route.setBlackList(failedServerList);
}
subObTable = obTableClient.getTable(tableName, partId, needRefreshTableEntry,
subObTable = obTableClient.getTable(tableName, originPartId, needRefreshTableEntry,
obTableClient.isTableEntryRefreshIntervalWait(), route).
getRight().getObTable();
}
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/com/alipay/oceanbase/rpc/table/ObTableParam.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class ObTableParam {
private ObTable obTable;
private long tableId = OB_INVALID_ID;
private long partitionId = INVALID_TABLET_ID; // partition id in 3.x aka tablet id in 4.x
private long partId = INVALID_TABLET_ID; // origin part id in 4.x, can be used when retry

/*
* constructor
Expand Down Expand Up @@ -98,4 +99,18 @@ public long getPartitionId() {
public void setPartitionId(long partitionId) {
this.partitionId = partitionId;
}

/*
* Get partId (partition id in 3.x, originPartId in 4.x)
*/
public long getPartId() {
return this.partId;
}

/*
* Set partId
*/
public void setPartId(long partId) {
this.partId = partId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,9 @@ public void testAggregationWithBigint() throws Exception {
Assert.assertTrue(((ObTableException) e).getMessage().contains(
"[OB_DATA_OUT_OF_RANGE][Out of range value for column 'sum(c3)' at row 0]"));
} finally {
client.delete("test_aggregation", "first_row");
client.delete("test_aggregation", "second_row");
client.delete("test_aggregation", "third_row");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ public void testSingleCheckInsUp() throws Exception {
public void testBatchCheckInsUpMutPart() throws Exception {
if (!isVersionSupported()) {
System.out.println("current version is not supported, current version: "
+ ObGlobal.OB_VERSION);
+ ObGlobal.OB_VERSION);
return;
}
// insert two record in different partition
Expand Down Expand Up @@ -359,7 +359,7 @@ public void testNonPartCheckAndInsUp() throws Exception {
String tableName = "test_bigint_table";
if (!isVersionSupported()) {
System.out.println("current version is not supported, current version: "
+ ObGlobal.OB_VERSION);
+ ObGlobal.OB_VERSION);
return;
}
// pre-clean data
Expand All @@ -371,7 +371,8 @@ public void testNonPartCheckAndInsUp() throws Exception {
insertOrUpdate1.setRowKey(row(colVal("c1", 1L)));
insertOrUpdate1.addMutateRow(row(colVal("c2", 100L)));
ObTableFilter filter1 = compareVal(ObCompareOp.GE, "c2", 100L);
CheckAndInsUp checkAndInsUp1 = client.checkAndInsUp(tableName, filter1, insertOrUpdate1, false);
CheckAndInsUp checkAndInsUp1 = client.checkAndInsUp(tableName, filter1,
insertOrUpdate1, false);
MutationResult result1 = checkAndInsUp1.execute();
Assert.assertEquals(1, result1.getAffectedRows());

Expand Down