Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,8 @@ public void calculateContinuousFailure(String tableName, String errorMsg) throws
tableName, runtimeContinuousFailureCeiling, errorMsg);
getOrRefreshTableEntry(tableName, true, isTableEntryRefreshIntervalWait(), true);
failures.set(0);
} else {
logger.warn("error msg: {}, current continues failure count: {}", errorMsg, failures);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.alipay.oceanbase.rpc.bolt.protocol;

import com.alipay.oceanbase.rpc.exception.ObTableRoutingWrongException;
import com.alipay.oceanbase.rpc.protocol.packet.ObRpcPacketHeader;
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
import com.alipay.oceanbase.rpc.protocol.payload.Pcodes;
Expand Down Expand Up @@ -104,6 +105,16 @@ public ObPayload newPayload(ObRpcPacketHeader header) {
return new ObTableLSOpResult();
}
}, //
OB_TABLE_API_MOVE(Pcodes.OB_TABLE_API_MOVE) {
/**
* New payload.
*/
@Override
public ObPayload newPayload(ObRpcPacketHeader header) {
throw new ObTableRoutingWrongException("Receive rerouting response packet. " +
"Java client is not supported and need to Refresh table router entry");
}
}, //
OB_ERROR_PACKET(Pcodes.OB_ERROR_PACKET) {
/*
* New payload.
Expand Down Expand Up @@ -148,6 +159,9 @@ public static ObTablePacketCode valueOf(short value) {
return OB_TABLE_API_DIRECT_LOAD;
case Pcodes.OB_TABLE_API_LS_EXECUTE:
return OB_TABLE_API_LS_EXECUTE;
case Pcodes.OB_TABLE_API_MOVE:
throw new ObTableRoutingWrongException("Receive rerouting response packet. " +
"Java client is not supported and need to Refresh table router entry");
case Pcodes.OB_ERROR_PACKET:
return OB_ERROR_PACKET;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@

import com.alipay.oceanbase.rpc.bolt.protocol.ObTablePacket;
import com.alipay.oceanbase.rpc.bolt.protocol.ObTablePacketCode;
import com.alipay.oceanbase.rpc.exception.ExceptionUtil;
import com.alipay.oceanbase.rpc.exception.ObTableLoginException;
import com.alipay.oceanbase.rpc.exception.ObTableRoutingWrongException;
import com.alipay.oceanbase.rpc.exception.ObTableUnexpectedException;
import com.alipay.oceanbase.rpc.exception.*;
import com.alipay.oceanbase.rpc.protocol.packet.ObCompressType;
import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload;
import com.alipay.oceanbase.rpc.protocol.payload.Credentialable;
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
Expand All @@ -35,6 +33,9 @@
import io.netty.buffer.ByteBuf;
import org.slf4j.Logger;

import static com.alipay.oceanbase.rpc.protocol.packet.ObCompressType.INVALID_COMPRESSOR;
import static com.alipay.oceanbase.rpc.protocol.packet.ObCompressType.NONE_COMPRESSOR;

public class ObTableRemoting extends BaseRemoting {

private static final Logger logger = TableClientLoggerFactory.getLogger(ObTableRemoting.class);
Expand Down Expand Up @@ -93,9 +94,14 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
try {
// decode packet header first
response.decodePacketHeader();

ObCompressType compressType = response.getHeader().getObCompressType();
if (compressType != INVALID_COMPRESSOR && compressType != NONE_COMPRESSOR) {
String errMessage = TraceUtil.formatTraceMessage(conn, request,
"Rpc Result is compressed. Java Client is not supported. msg:" + response.getMessage());
logger.warn(errMessage);
throw new FeatureNotSupportedException(errMessage);
}
ByteBuf buf = response.getPacketContentBuf();

// If response indicates the request is routed to wrong server, we should refresh the routing meta.
if (response.getHeader().isRoutingWrong()) {
String errMessage = TraceUtil.formatTraceMessage(conn, request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,6 @@ public interface Pcodes {
int OB_TABLE_API_EXECUTE_QUERY_SYNC = 0x1106;

int OB_TABLE_API_DIRECT_LOAD = 0x1123;
int OB_TABLE_API_MOVE = 0x1124;
int OB_TABLE_API_LS_EXECUTE = 0x1125;
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ public abstract class AbstractQueryStreamResult extends AbstractPayload implemen
protected ObTableQuery tableQuery;
protected long operationTimeout = -1;
protected String tableName;
// use to store the TableEntry Key:
// primary index or local index: key is primary table name
// global index: key is index table name (be like: __idx_<data_table_id>_<index_name>)
protected String indexTableName;
protected ObTableEntityType entityType;
private Map<Long, ObPair<Long, ObTableParam>> expectant; // Map<logicId, ObPair<logicId, param>>
private List<String> cacheProperties = new LinkedList<String>();
Expand Down Expand Up @@ -431,6 +435,16 @@ public void setTableName(String tableName) {
this.tableName = tableName;
}

/*
* Get index table name.
*/
public String getIndexTableName() { return indexTableName; }

/*
* Set index table name.
*/
public void setIndexTableName(String indexTableName) { this.indexTableName = indexTableName; }

/*
* Get entity type.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,13 @@ protected ObTableQueryResult execute(ObPair<Long, ObTableParam> partIdWithIndex,
route.setBlackList(failedServerList);
}
subObTable = client
.getTable(tableName, partIdWithIndex.getLeft(), needRefreshTableEntry,
.getTable(indexTableName, partIdWithIndex.getLeft(), needRefreshTableEntry,
client.isTableEntryRefreshIntervalWait(), route).getRight()
.getObTable();
}
}
result = subObTable.execute(request);
client.resetExecuteContinuousFailureCount(tableName);
client.resetExecuteContinuousFailureCount(indexTableName);
break;
} catch (Exception e) {
if (client.isOdpMode()) {
Expand All @@ -92,18 +92,18 @@ protected ObTableQueryResult execute(ObPair<Long, ObTableParam> partIdWithIndex,
logger
.warn(
"tablename:{} stream query execute while meet Exception needing retry, errorCode: {}, errorMsg: {}, try times {}",
tableName, ((ObTableException) e).getErrorCode(),
indexTableName, ((ObTableException) e).getErrorCode(),
e.getMessage(), tryTimes);
} else if (e instanceof IllegalArgumentException) {
logger
.warn(
"tablename:{} stream query execute while meet Exception needing retry, try times {}, errorMsg: {}",
tableName, tryTimes, e.getMessage());
indexTableName, tryTimes, e.getMessage());
} else {
logger
.warn(
"tablename:{} stream query execute while meet Exception needing retry, try times {}",
tableName, tryTimes, e);
indexTableName, tryTimes, e);
}
} else {
throw e;
Expand All @@ -113,7 +113,7 @@ protected ObTableQueryResult execute(ObPair<Long, ObTableParam> partIdWithIndex,
if ((tryTimes - 1) < client.getRuntimeRetryTimes()) {
logger.warn(
"tablename:{} partition id:{} retry when replica not readable: {}",
tableName, partIdWithIndex.getLeft(), e.getMessage(), e);
indexTableName, partIdWithIndex.getLeft(), e.getMessage(), e);
if (failedServerList == null) {
failedServerList = new HashSet<String>();
}
Expand All @@ -122,7 +122,7 @@ protected ObTableQueryResult execute(ObPair<Long, ObTableParam> partIdWithIndex,
logger
.warn(
"tablename:{} partition id:{} exhaust retry when replica not readable: {}",
tableName, partIdWithIndex.getLeft(), e.getMessage(), e);
indexTableName, partIdWithIndex.getLeft(), e.getMessage(), e);
throw e;
}
} else if (e instanceof ObTableException
Expand All @@ -131,21 +131,21 @@ protected ObTableQueryResult execute(ObPair<Long, ObTableParam> partIdWithIndex,
logger
.warn(
"tablename:{} partition id:{} stream query refresh table while meet Exception needing refresh, errorCode: {}",
tableName, partIdWithIndex.getLeft(),
indexTableName, partIdWithIndex.getLeft(),
((ObTableException) e).getErrorCode(), e);
if (client.isRetryOnChangeMasterTimes()
&& (tryTimes - 1) < client.getRuntimeRetryTimes()) {
logger
.warn(
"tablename:{} partition id:{} stream query retry while meet Exception needing refresh, errorCode: {} , retry times {}",
tableName, partIdWithIndex.getLeft(),
indexTableName, partIdWithIndex.getLeft(),
((ObTableException) e).getErrorCode(), tryTimes, e);
} else {
client.calculateContinuousFailure(tableName, e.getMessage());
client.calculateContinuousFailure(indexTableName, e.getMessage());
throw e;
}
} else {
client.calculateContinuousFailure(tableName, e.getMessage());
client.calculateContinuousFailure(indexTableName, e.getMessage());
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ protected ObTableQueryAsyncResult executeAsync(ObPair<Long, ObTableParam> partId
try {
if (needRefreshTableEntry) {
subObTable = client
.getTable(tableName, new Long[] { partIdWithObTable.getLeft() }, true,
.getTable(indexTableName, new Long[] { partIdWithObTable.getLeft() }, true,
client.isTableEntryRefreshIntervalWait()).getRight().getObTable();
}
result = subObTable.execute(streamRequest);
client.resetExecuteContinuousFailureCount(tableName);
client.resetExecuteContinuousFailureCount(indexTableName);
break;
} catch (Exception e) {
if (e instanceof ObTableException
Expand All @@ -89,11 +89,11 @@ protected ObTableQueryAsyncResult executeAsync(ObPair<Long, ObTableParam> partId
"stream query retry while meet ObTableMasterChangeException, errorCode: {} , retry times {}",
((ObTableException) e).getErrorCode(), tryTimes);
} else {
client.calculateContinuousFailure(tableName, e.getMessage());
client.calculateContinuousFailure(indexTableName, e.getMessage());
throw e;
}
} else {
client.calculateContinuousFailure(tableName, e.getMessage());
client.calculateContinuousFailure(indexTableName, e.getMessage());
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
public abstract class AbstractTableQueryImpl extends AbstractTableQuery {

protected ObTableQuery tableQuery;
// TableEntry key
protected String indexTableName;

/*
* Select.
Expand Down Expand Up @@ -181,4 +183,12 @@ public TableQuery setMaxResultSize(long maxResultSize) {
this.tableQuery.setMaxResultSize(maxResultSize);
return this;
}

public String getIndexTableName() {
return indexTableName;
}

public void setIndexTableName(String indexTableName) {
this.indexTableName = indexTableName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@ public class ObTableClientQueryAsyncImpl extends AbstractTableQueryImpl {

public ObTableClientQueryAsyncImpl(String tableName, ObTableClient client) {
this.tableName = tableName;
this.indexTableName = tableName;
this.obTableClient = client;
this.tableQuery = new ObTableQuery();
}

public ObTableClientQueryAsyncImpl(String tableName, ObTableQuery tableQuery,
ObTableClient client) {
this.tableName = tableName;
this.indexTableName = tableName;
this.obTableClient = client;
this.tableQuery = tableQuery;
}
Expand Down Expand Up @@ -112,6 +114,7 @@ public ObTableClientQueryAsyncStreamResult executeInternal(ObQueryOperationType
obTableClientQueryASyncStreamResult.setTableQuery(tableQuery);
obTableClientQueryASyncStreamResult.setEntityType(entityType);
obTableClientQueryASyncStreamResult.setTableName(tableName);
obTableClientQueryASyncStreamResult.setIndexTableName(indexTableName);
obTableClientQueryASyncStreamResult.setExpectant(partitionObTables);
obTableClientQueryASyncStreamResult.setOperationTimeout(operationTimeout);
obTableClientQueryASyncStreamResult.setClient(obTableClient);
Expand All @@ -134,6 +137,7 @@ public ObTableClientQueryAsyncStreamResult executeInternal(ObQueryOperationType
obTableClientQueryASyncStreamResult.setTableQuery(tableQuery);
obTableClientQueryASyncStreamResult.setEntityType(entityType);
obTableClientQueryASyncStreamResult.setTableName(tableName);
obTableClientQueryASyncStreamResult.setIndexTableName(indexTableName);
obTableClientQueryASyncStreamResult.setExpectant(partitionObTables);
obTableClientQueryASyncStreamResult.setOperationTimeout(operationTimeout);
obTableClientQueryASyncStreamResult.setClient(obTableClient);
Expand All @@ -151,7 +155,6 @@ public ObTableClientQueryAsyncStreamResult executeInternal(ObQueryOperationType

public Map<Long, ObPair<Long, ObTableParam>> getPartitions() throws Exception {
String indexName = tableQuery.getIndexName();
String indexTableName = tableName;
if (!this.obTableClient.isOdpMode()) {
indexTableName = obTableClient.getIndexTableName(tableName, indexName,
tableQuery.getScanRangeColumns());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public void addAggregation(ObTableAggregationType aggType, String aggColumn) {
*/
public ObTableClientQueryImpl() {
this.tableName = null;
this.indexTableName = null;
this.obTableClient = null;
this.tableQuery = new ObTableQuery();
this.rowKey = null;
Expand All @@ -64,6 +65,7 @@ public ObTableClientQueryImpl() {
*/
public ObTableClientQueryImpl(String tableName, ObTableClient client) {
this.tableName = tableName;
this.indexTableName = tableName;
this.obTableClient = client;
this.tableQuery = new ObTableQuery();
this.rowKey = null;
Expand All @@ -74,6 +76,7 @@ public ObTableClientQueryImpl(String tableName, ObTableClient client) {
*/
public ObTableClientQueryImpl(String tableName, ObTableQuery tableQuery, ObTableClient client) {
this.tableName = tableName;
this.indexTableName = tableName;
this.obTableClient = client;
this.tableQuery = tableQuery;
this.rowKey = null;
Expand Down Expand Up @@ -144,7 +147,6 @@ public ObTableClientQueryStreamResult executeInternal() throws Exception {
obTableClient.getOdpTable())));
} else {
String indexName = tableQuery.getIndexName();
String indexTableName = tableName;
if (!this.obTableClient.isOdpMode()) {
indexTableName = obTableClient.getIndexTableName(tableName, indexName,
tableQuery.getScanRangeColumns());
Expand Down Expand Up @@ -196,6 +198,7 @@ public ObTableClientQueryStreamResult executeInternal() throws Exception {
obTableClientQueryStreamResult.setTableQuery(tableQuery);
obTableClientQueryStreamResult.setEntityType(entityType);
obTableClientQueryStreamResult.setTableName(tableName);
obTableClientQueryStreamResult.setIndexTableName(indexTableName);
obTableClientQueryStreamResult.setExpectant(partitionObTables);
obTableClientQueryStreamResult.setClient(obTableClient);
obTableClientQueryStreamResult.setOperationTimeout(operationTimeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ public void test_query_in_global_index_table(String tableName) throws Exception
Assert.assertEquals(resultSet4.cacheSize(), recordCount);
count = 0;
while (resultSet4.next()) {
Map<String, Object> row = resultSet2.getRow();
Map<String, Object> row = resultSet4.getRow();
int c1 = (int) row.get("C1");
int c2 = (int) row.get("C2");
int c3 = (int) row.get("C3");
Expand Down