Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -3296,7 +3296,8 @@ public String getPhyTableNameFromTableGroup(ObTableQueryRequest request, String
public byte[][][] getFirstPartStartKeys(String tableName) throws Exception {
// Check row key element
// getOrRefreshTableEntry() need row key element, we could remove this after we remove rk element
if (this.runningMode != RunningMode.HBASE && this.tableRowKeyElement.containsKey(tableName)) {
if (this.runningMode != RunningMode.HBASE
&& !this.tableRowKeyElement.containsKey(tableName)) {
throw new IllegalArgumentException("Row key element is empty for " + tableName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package com.alipay.oceanbase.rpc.location.model.partition;

import com.alipay.oceanbase.rpc.exception.ObTablePartitionConsistentException;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObColumn;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObjType;
import com.alipay.oceanbase.rpc.util.RandomUtil;
import com.alipay.oceanbase.rpc.util.TableClientLoggerFactory;
Expand Down Expand Up @@ -99,6 +101,32 @@ public List<Long> getPartIds(Object[] start, boolean startInclusive, Object[] en
boolean endInclusive) {
// close set
try {
// pre-check start and end
// should remove after remove addRowkeyElement
if (start.length != end.length) {
throw new IllegalArgumentException("length of start key and end key is not equal");
}

// check whether partition key is Min or Max, should refactor after remove addRowkeyElement
for (ObPair<ObColumn, List<Integer>> pair : orderedPartRefColumnRowKeyRelations) {
for (int refIdx : pair.getRight()) {
if (start.length <= refIdx) {
throw new IllegalArgumentException("rowkey length is " + start.length
+ ", which is shortest than " + refIdx);
}
if (start[refIdx] instanceof ObObj
&& (((ObObj) start[refIdx]).isMinObj() || ((ObObj) start[refIdx])
.isMaxObj())) {
return completeWorks;
}
if (end[refIdx] instanceof ObObj
&& (((ObObj) end[refIdx]).isMinObj() || ((ObObj) end[refIdx]).isMaxObj())) {
return completeWorks;
}
}
}

// eval partition key
List<Object> startValues = evalRowKeyValues(start);
Object startValue = startValues.get(0);
List<Object> endValues = evalRowKeyValues(end);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@

import com.alipay.oceanbase.rpc.exception.ObTablePartitionConsistentException;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObCollationType;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObColumn;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
import com.alipay.oceanbase.rpc.util.ObHashUtils;
import com.alipay.oceanbase.rpc.util.TableClientLoggerFactory;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.slf4j.Logger;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.*;

import static com.alipay.oceanbase.rpc.util.RandomUtil.getRandomNum;
import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.LCD;
Expand Down Expand Up @@ -90,7 +89,32 @@ public void setPartNum(int partNum) {
public List<Long> getPartIds(Object[] start, boolean startInclusive, Object[] end,
boolean endInclusive) {
try {
// eval rowkey
// pre-check start and end
// should remove after remove addRowkeyElement
if (start.length != end.length) {
throw new IllegalArgumentException("length of start key and end key is not equal");
}

// check whether partition key is Min or Max, should refactor after remove addRowkeyElement
for (ObPair<ObColumn, List<Integer>> pair : orderedPartRefColumnRowKeyRelations) {
for (int refIdx : pair.getRight()) {
if (start.length <= refIdx) {
throw new IllegalArgumentException("rowkey length is " + start.length
+ ", which is shortest than " + refIdx);
}
if (start[refIdx] instanceof ObObj
&& (((ObObj) start[refIdx]).isMinObj() || ((ObObj) start[refIdx])
.isMaxObj())) {
return completeWorks;
}
if (end[refIdx] instanceof ObObj
&& (((ObObj) end[refIdx]).isMinObj() || ((ObObj) end[refIdx]).isMaxObj())) {
return completeWorks;
}
}
}

// eval partition key
List<Object> startValues = evalRowKeyValues(start);
List<Object> endValues = evalRowKeyValues(end);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@

import com.alipay.oceanbase.rpc.exception.ObTablePartitionConsistentException;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObColumn;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
import com.alipay.oceanbase.rpc.util.StringUtil;
import com.alipay.oceanbase.rpc.util.TableClientLoggerFactory;
import org.slf4j.Logger;

import java.util.*;

import static com.alipay.oceanbase.rpc.constant.Constants.EMPTY_STRING;
import static com.alipay.oceanbase.rpc.location.model.partition.ObPartitionKey.MAX_PARTITION_ELEMENT;
import static com.alipay.oceanbase.rpc.location.model.partition.ObPartitionKey.MIN_PARTITION_ELEMENT;
import static com.alipay.oceanbase.rpc.protocol.payload.impl.column.ObSimpleColumn.DEFAULT_UTF8MB4_GENERAL_CI;
import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.LCD;
import static java.util.Collections.*;
Expand Down Expand Up @@ -139,8 +142,21 @@ protected List<Comparable> initComparableElementByTypes(List<Object> objects,
try {
for (int i = 0; i < objects.size(); i++) {
ObColumn obColumn = obColumns.get(i);
comparableElement.add(obColumn.getObObjType().parseToComparable(objects.get(i),
obColumn.getObCollationType()));
if (objects.get(i) instanceof ObObj) {
// deal with min / max
ObObj obj = (ObObj) objects.get(i);
if (obj.isMinObj()) {
comparableElement.add(MIN_PARTITION_ELEMENT);
} else if (obj.isMaxObj()) {
comparableElement.add(MAX_PARTITION_ELEMENT);
} else {
throw new IllegalArgumentException(String.format(
"failed to cast obj, obj=%s, types=%s", objects, obColumns));
}
} else {
comparableElement.add(obColumn.getObObjType().parseToComparable(objects.get(i),
obColumn.getObCollationType()));
}
}
} catch (Exception e) {
logger.error(LCD.convert("01-00024"), objects, obColumns, e);
Expand Down Expand Up @@ -216,11 +232,24 @@ public List<Object> evalRowKeyValues(Object... rowKey) throws IllegalArgumentExc
// row key is consists of multi column
List<Integer> refIndex = orderedPartRefColumnRowKeyRelation.getRight();
Object[] evalParams = new Object[refIndex.size()];
boolean needEval = true;
for (int j = 0; j < refIndex.size(); j++) {
//TODO where get the type of ref column ?
// TODO where get the type of ref column ?
if (refIndex.size() == 1 && partKey[refIndex.get(j)] instanceof ObObj) {
// set min max into eval values directly
// need refactor after addRowkeyElement has removed
ObObj obj = (ObObj) partKey[refIndex.get(j)];
if (obj.isMaxObj() || obj.isMinObj()) {
evalValues.add(obj);
needEval = false;
break;
}
}
evalParams[j] = partKey[refIndex.get(j)];
}
evalValues.add(orderedPartRefColumnRowKeyRelation.getLeft().evalValue(evalParams));
if (needEval) {
evalValues.add(orderedPartRefColumnRowKeyRelation.getLeft().evalValue(evalParams));
}
}
return evalValues;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ public List<Long> getPartIds(Object[] start, boolean startInclusive, Object[] en
boolean endInclusive) {

// can not detail the border effect so that the range is magnified
int startIdx = getBoundsIdx(start);
int stopIdx = getBoundsIdx(end);
int startIdx = getBoundsIdx(true, start);
int stopIdx = getBoundsIdx(true, end);
List<Long> partIds = new ArrayList<Long>();
for (int i = startIdx; i <= stopIdx; i++) {
partIds.add(this.bounds.get(i).value);
Expand All @@ -223,7 +223,7 @@ public List<Long> getPartIds(Object[] start, boolean startInclusive, Object[] en
@Override
public Long getPartId(Object... rowKey) {
try {
return this.bounds.get(getBoundsIdx(rowKey)).value;
return this.bounds.get(getBoundsIdx(false, rowKey)).value;
} catch (IllegalArgumentException e) {
RUNTIME.error(LCD.convert("01-00025"), e);
throw new IllegalArgumentException(
Expand All @@ -232,7 +232,7 @@ public Long getPartId(Object... rowKey) {

}

public int getBoundsIdx(Object... rowKey) {
public int getBoundsIdx(boolean isScan, Object... rowKey) {
if (rowKey.length != rowKeyElement.size()) {
throw new IllegalArgumentException("row key is consist of " + rowKeyElement
+ "but found" + Arrays.toString(rowKey));
Expand All @@ -248,6 +248,11 @@ public int getBoundsIdx(Object... rowKey) {
int pos = upperBound(this.bounds, new ObComparableKV<ObPartitionKey, Long>(searchKey,
(long) -1));
if (pos >= this.bounds.size()) {
if (isScan) {
// if range is bigger than rangeMax while scanning
// we just scan until last range
return this.bounds.size() - 1;
}
throw new ArrayIndexOutOfBoundsException("Table has no partition for value in "
+ this.getPartExpr());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,11 +258,18 @@ public void initPartitions() throws Exception {
Object[] start = new Object[startKeySize];
Object[] end = new Object[endKeySize];
for (int i = 0; i < startKeySize; i++) {
start[i] = startKey.getObj(i).getValue();
if (startKey.getObj(i).isMinObj() || startKey.getObj(i).isMaxObj()) {
start[i] = startKey.getObj(i);
} else {
start[i] = startKey.getObj(i).getValue();
}
}

for (int i = 0; i < endKeySize; i++) {
end[i] = endKey.getObj(i).getValue();
if (endKey.getObj(i).isMinObj() || endKey.getObj(i).isMaxObj()) {
end[i] = endKey.getObj(i);
} else {
end[i] = endKey.getObj(i).getValue();
}
}
ObBorderFlag borderFlag = rang.getBorderFlag();
// pairs -> List<Pair<logicId, param>>
Expand Down
17 changes: 17 additions & 0 deletions src/test/java/com/alipay/oceanbase/rpc/ObTableAggregationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,9 @@ public void testAggregationWithFilter() throws Exception {
ObTableAggregation obtableAggregationWithoutFilter = client
.aggregate("test_partition_aggregation");

obtableAggregationWithoutFilter
.addScanRange(new Object[] { 0L }, new Object[] { 150L });

// test
obtableAggregationWithoutFilter.max("c2");
obtableAggregationWithoutFilter.min("c2");
Expand All @@ -335,6 +338,8 @@ public void testAggregationWithFilter() throws Exception {
ObTableAggregation obtableAggregationWithFilter = client
.aggregate("test_partition_aggregation");

obtableAggregationWithFilter.addScanRange(new Object[] { 0L }, new Object[] { 150L });

// test
obtableAggregationWithFilter.max("c2");
obtableAggregationWithFilter.min("c2");
Expand Down Expand Up @@ -365,6 +370,9 @@ public void testAggregationWithFilter() throws Exception {
ObTableAggregation obtableAggregationWithFilterAndLimit = client
.aggregate("test_partition_aggregation");

obtableAggregationWithFilterAndLimit.addScanRange(new Object[] { 0L },
new Object[] { 150L });

// test
obtableAggregationWithFilterAndLimit.max("c2");
obtableAggregationWithFilterAndLimit.min("c2");
Expand Down Expand Up @@ -412,6 +420,8 @@ public void testAggregationWithEmptyRow() throws Exception {
// aggregate without insert
ObTableAggregation obtableAggregation = client.aggregate("test_partition_aggregation");

obtableAggregation.addScanRange(new Object[] { 0L }, new Object[] { 150L });

// test
obtableAggregation.max("c2");
obtableAggregation.min("c2");
Expand Down Expand Up @@ -449,6 +459,9 @@ public void testAggregationExistNull() throws Exception {
ObTableAggregation obtableAggregationWithoutFilter = client
.aggregate("test_partition_aggregation");

obtableAggregationWithoutFilter
.addScanRange(new Object[] { 0L }, new Object[] { 150L });

// test
obtableAggregationWithoutFilter.max("c1");
obtableAggregationWithoutFilter.min("c1");
Expand Down Expand Up @@ -497,6 +510,8 @@ public void testAggregationWithIllegalColumn() throws Exception {
ObTableAggregation obtableAggregationWithIllegal = client
.aggregate("test_partition_aggregation");

obtableAggregationWithIllegal.addScanRange(new Object[] { 0L }, new Object[] { 150L });

// test illegal column
obtableAggregationWithIllegal.max("c3");

Expand Down Expand Up @@ -584,6 +599,8 @@ public void testAggregationEmptyVal() throws Exception {
ObTableAggregation obtableAggregationWithFilter = client
.aggregate("test_partition_aggregation");

obtableAggregationWithFilter.addScanRange(new Object[] { 0L }, new Object[] { 150L });

// test
obtableAggregationWithFilter.max("c2");
obtableAggregationWithFilter.min("c2");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,21 @@ public void testAsyncQuery() throws Exception {
e.printStackTrace();
Assert.assertTrue(false);
}

// Test Min - Max Release after addRowkeyElement is completed
tableQuery = obTableClient.query(TEST_TABLE);
tableQuery
.addScanRange(new Object[] { ObObj.getMin(), "partition".getBytes(), timeStamp },
new Object[] { ObObj.getMax(), "partition".getBytes(), timeStamp });
tableQuery.setBatchSize(1);
QueryResultSet resultSet = tableQuery.asyncExecute();
int resultCount = 0;
while (resultSet.next()) {
Map<String, Object> value = resultSet.getRow();
resultCount += 1;
}
Assert.assertEquals(2, resultCount);

tableQuery = obTableClient.query(TEST_TABLE);
tableQuery.addScanRange(new Object[] { "key1_1".getBytes(), "partition".getBytes(),
timeStamp }, new Object[] { "key1_3".getBytes(), "partition".getBytes(),
Expand Down
Loading