Skip to content

Commit

Permalink
HBASE-27210 Clean up error-prone findings in hbase-endpoint (apache#4646
Browse files Browse the repository at this point in the history
)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
  • Loading branch information
apurtell committed Jul 27, 2022
1 parent 9573466 commit ac8b3a7
Show file tree
Hide file tree
Showing 12 changed files with 51 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -694,10 +694,9 @@ public <R, S, P extends Message, Q extends Message, T extends Message> double st
public <R, S, P extends Message, Q extends Message, T extends Message> double
std(final Table table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
Pair<List<S>, Long> p = getStdArgs(table, ci, scan);
double res = 0d;
double avg = ci.divideForAvg(p.getFirst().get(0), p.getSecond());
double avgOfSumSq = ci.divideForAvg(p.getFirst().get(1), p.getSecond());
res = avgOfSumSq - (avg) * (avg); // variance
double res = avgOfSumSq - avg * avg; // variance
res = Math.pow(res, 0.5);
return res;
}
Expand Down Expand Up @@ -870,14 +869,6 @@ public <R, S, P extends Message, Q extends Message, T extends Message> R median(
}

byte[] getBytesFromResponse(ByteString response) {
ByteBuffer bb = response.asReadOnlyByteBuffer();
bb.rewind();
byte[] bytes;
if (bb.hasArray()) {
bytes = bb.array();
} else {
bytes = response.toByteArray();
}
return bytes;
return response.toByteArray();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ private static void validateParameters(Scan scan, boolean canFamilyBeAbsent) thr
* @return the instance
* @throws IOException Either we couldn't instantiate the method object, or "parseFrom" failed.
*/
@SuppressWarnings("unchecked")
@SuppressWarnings({ "unchecked", "TypeParameterUnusedInFormals" })
// Used server-side too by Aggregation Coprocesor Endpoint. Undo this interdependence. TODO.
public static <T extends Message> T getParsedGenericInstance(Class<?> runtimeClass, int position,
ByteString b) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Collections;
import java.util.List;
import java.util.NavigableSet;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Scan;
Expand All @@ -52,12 +53,12 @@
* aggregate function at a region level. {@link ColumnInterpreter} is used to interpret column
* value. This class is parameterized with the following (these are the types with which the
* {@link ColumnInterpreter} is parameterized, and for more description on these, refer to
* {@link ColumnInterpreter}):
* @param <T> Cell value data type
* @param <S> Promoted data type
* @param <P> PB message that is used to transport initializer specific bytes
* @param <Q> PB message that is used to transport Cell (&lt;T&gt;) instance
* @param <R> PB message that is used to transport Promoted (&lt;S&gt;) instance
* {@link ColumnInterpreter}):<br>
* &lt;T&gt; Cell value data type<br>
* &lt;S&gt; Promoted data type<br>
* &lt;P&gt; PB message that is used to transport initializer specific bytes<br>
* &lt;Q&gt; PB message that is used to transport Cell (&lt;T&gt;) instance<br>
* &lt;R&gt; PB message that is used to transport Promoted (&lt;S&gt;) instance<br>
*/
@InterfaceAudience.Private
public class AggregateImplementation<T, S, P extends Message, Q extends Message, R extends Message>
Expand Down Expand Up @@ -109,10 +110,7 @@ public void getMax(RpcController controller, AggregateRequest request,
CoprocessorRpcUtils.setControllerException(controller, e);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {
}
IOUtils.closeQuietly(scanner);
}
}
log.info("Maximum from this region is "
Expand Down Expand Up @@ -162,10 +160,7 @@ public void getMin(RpcController controller, AggregateRequest request,
CoprocessorRpcUtils.setControllerException(controller, e);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {
}
IOUtils.closeQuietly(scanner);
}
}
log.info("Minimum from this region is "
Expand Down Expand Up @@ -218,10 +213,7 @@ public void getSum(RpcController controller, AggregateRequest request,
CoprocessorRpcUtils.setControllerException(controller, e);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {
}
IOUtils.closeQuietly(scanner);
}
}
log.debug("Sum from this region is " + env.getRegion().getRegionInfo().getRegionNameAsString()
Expand Down Expand Up @@ -269,10 +261,7 @@ public void getRowNum(RpcController controller, AggregateRequest request,
CoprocessorRpcUtils.setControllerException(controller, e);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {
}
IOUtils.closeQuietly(scanner);
}
}
log.info("Row counter from this region is "
Expand Down Expand Up @@ -333,10 +322,7 @@ public void getAvg(RpcController controller, AggregateRequest request,
CoprocessorRpcUtils.setControllerException(controller, e);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {
}
IOUtils.closeQuietly(scanner);
}
}
done.run(response);
Expand Down Expand Up @@ -399,10 +385,7 @@ public void getStd(RpcController controller, AggregateRequest request,
CoprocessorRpcUtils.setControllerException(controller, e);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {
}
IOUtils.closeQuietly(scanner);
}
}
done.run(response);
Expand Down Expand Up @@ -464,10 +447,7 @@ public void getMedian(RpcController controller, AggregateRequest request,
CoprocessorRpcUtils.setControllerException(controller, e);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {
}
IOUtils.closeQuietly(scanner);
}
}
done.run(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
Expand Down Expand Up @@ -111,6 +110,7 @@ static Map<byte[], Response> run(final Configuration conf, final String[] args)
return run(conf, arguments.getFirst(), arguments.getSecond(), arguments.getThird());
}

@SuppressWarnings("ModifiedButNotUsed")
public static Map<byte[], Response> run(final Configuration conf, TableName tableName, Scan scan,
Path dir) throws Throwable {
FileSystem fs = dir.getFileSystem(conf);
Expand All @@ -127,7 +127,6 @@ public static Map<byte[], Response> run(final Configuration conf, TableName tabl
table.coprocessorService(ExportProtos.ExportService.class, scan.getStartRow(),
scan.getStopRow(), (ExportProtos.ExportService service) -> {
ServerRpcController controller = new ServerRpcController();
Map<byte[], ExportProtos.ExportResponse> rval = new TreeMap<>(Bytes.BYTES_COMPARATOR);
CoprocessorRpcUtils.BlockingRpcCallback<ExportProtos.ExportResponse> rpcCallback =
new CoprocessorRpcUtils.BlockingRpcCallback<>();
service.export(controller, request, rpcCallback);
Expand Down Expand Up @@ -192,7 +191,7 @@ private static SequenceFile.Writer.Option getOutputPath(final Configuration conf

private static List<SequenceFile.Writer.Option> getWriterOptions(final Configuration conf,
final RegionInfo info, final ExportProtos.ExportRequest request) throws IOException {
List<SequenceFile.Writer.Option> rval = new LinkedList<>();
List<SequenceFile.Writer.Option> rval = new ArrayList<>(5);
rval.add(SequenceFile.Writer.keyClass(ImmutableBytesWritable.class));
rval.add(SequenceFile.Writer.valueClass(Result.class));
rval.add(getOutputPath(conf, info, request));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class TestAsyncAggregationClient {

private static byte[] CQ2 = Bytes.toBytes("CQ2");

private static int COUNT = 1000;
private static long COUNT = 1000;

private static AsyncConnection CONN;

Expand Down Expand Up @@ -141,7 +141,7 @@ public void testMedian() throws InterruptedException, ExecutionException {
long halfSum = COUNT * (COUNT - 1) / 4;
long median = 0L;
long sum = 0L;
for (int i = 0; i < COUNT; i++) {
for (long i = 0; i < COUNT; i++) {
sum += i;
if (sum > halfSum) {
median = i - 1;
Expand All @@ -158,7 +158,7 @@ public void testMedianWithWeight() throws InterruptedException, ExecutionExcepti
LongStream.range(0, COUNT).map(l -> l * l).reduce((l1, l2) -> l1 + l2).getAsLong() / 2;
long median = 0L;
long sum = 0L;
for (int i = 0; i < COUNT; i++) {
for (long i = 0; i < COUNT; i++) {
sum += i * i;
if (sum > halfSum) {
median = i - 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public void testCountController() throws Exception {

// reversed, regular
scanInfo.setReadType(ReadType.STREAM);
counter = doScan(table, scanInfo, counter + 1);
doScan(table, scanInfo, counter + 1);

// make sure we have no priority count
verifyPriorityGroupCount(HConstants.ADMIN_QOS, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ public void itCreatesConnectionless() throws Throwable {
try {
client.rowCount(TABLE_NAME, new LongColumnInterpreter(), new Scan());
fail("Expected IOException");
} catch (Throwable e) {
assertTrue(e instanceof IOException);
} catch (IOException e) {
assertTrue(e.getMessage().contains("Connection not initialized"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,14 @@ public void testAggregationNullResponse() throws Throwable {
builder.build(), ROWS[0], ROWS[ROWS.length - 1],
ColumnAggregationNullResponseSumResponse.getDefaultInstance());

int sumResult = 0;
int expectedResult = 0;
long sumResult = 0;
long expectedResult = 0;
for (Map.Entry<byte[], ColumnAggregationNullResponseSumResponse> e : results.entrySet()) {
LOG.info(
"Got value " + e.getValue().getSum() + " for region " + Bytes.toStringBinary(e.getKey()));
sumResult += e.getValue().getSum();
}
for (int i = 0; i < rowSeperator2; i++) {
for (long i = 0; i < rowSeperator2; i++) {
expectedResult += i;
}
assertEquals("Invalid result", expectedResult, sumResult);
Expand Down Expand Up @@ -171,14 +171,14 @@ public void testAggregationWithReturnValue() throws Throwable {
Table table = util.getConnection().getTable(TEST_TABLE);
Map<byte[], SumResponse> results =
sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[0], ROWS[ROWS.length - 1]);
int sumResult = 0;
int expectedResult = 0;
long sumResult = 0;
long expectedResult = 0;
for (Map.Entry<byte[], SumResponse> e : results.entrySet()) {
LOG.info(
"Got value " + e.getValue().getSum() + " for region " + Bytes.toStringBinary(e.getKey()));
sumResult += e.getValue().getSum();
}
for (int i = 0; i < ROWSIZE; i++) {
for (long i = 0; i < ROWSIZE; i++) {
expectedResult += i;
}
assertEquals("Invalid result", expectedResult, sumResult);
Expand All @@ -194,7 +194,7 @@ public void testAggregationWithReturnValue() throws Throwable {
"Got value " + e.getValue().getSum() + " for region " + Bytes.toStringBinary(e.getKey()));
sumResult += e.getValue().getSum();
}
for (int i = rowSeperator1; i < ROWSIZE; i++) {
for (long i = rowSeperator1; i < ROWSIZE; i++) {
expectedResult += i;
}
assertEquals("Invalid result", expectedResult, sumResult);
Expand All @@ -206,14 +206,14 @@ public void testAggregation() throws Throwable {
Table table = util.getConnection().getTable(TEST_TABLE);
Map<byte[], SumResponse> results =
sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[0], ROWS[ROWS.length - 1]);
int sumResult = 0;
int expectedResult = 0;
long sumResult = 0;
long expectedResult = 0;
for (Map.Entry<byte[], SumResponse> e : results.entrySet()) {
LOG.info(
"Got value " + e.getValue().getSum() + " for region " + Bytes.toStringBinary(e.getKey()));
sumResult += e.getValue().getSum();
}
for (int i = 0; i < ROWSIZE; i++) {
for (long i = 0; i < ROWSIZE; i++) {
expectedResult += i;
}
assertEquals("Invalid result", expectedResult, sumResult);
Expand All @@ -227,7 +227,7 @@ public void testAggregation() throws Throwable {
"Got value " + e.getValue().getSum() + " for region " + Bytes.toStringBinary(e.getKey()));
sumResult += e.getValue().getSum();
}
for (int i = rowSeperator1; i < ROWSIZE; i++) {
for (long i = rowSeperator1; i < ROWSIZE; i++) {
expectedResult += i;
}
assertEquals("Invalid result", expectedResult, sumResult);
Expand Down Expand Up @@ -266,14 +266,14 @@ public void update(byte[] region, byte[] row,
hasError = true;
}

int sumResult = 0;
int expectedResult = 0;
long sumResult = 0;
long expectedResult = 0;
for (Map.Entry<byte[], ColumnAggregationWithErrorsSumResponse> e : results.entrySet()) {
LOG.info(
"Got value " + e.getValue().getSum() + " for region " + Bytes.toStringBinary(e.getKey()));
sumResult += e.getValue().getSum();
}
for (int i = 0; i < rowSeperator2; i++) {
for (long i = 0; i < rowSeperator2; i++) {
expectedResult += i;
}
assertEquals("Invalid result", expectedResult, sumResult);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,13 @@ public void testAggregation() throws Throwable {
Table table = util.getConnection().getTable(TEST_TABLE);
Map<byte[], Long> results =
sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[0], ROWS[ROWS.length - 1]);
int sumResult = 0;
int expectedResult = 0;
long sumResult = 0;
long expectedResult = 0;
for (Map.Entry<byte[], Long> e : results.entrySet()) {
LOG.info("Got value " + e.getValue() + " for region " + Bytes.toStringBinary(e.getKey()));
sumResult += e.getValue();
}
for (int i = 0; i < ROWSIZE; i++) {
for (long i = 0; i < ROWSIZE; i++) {
expectedResult += i;
}
assertEquals("Invalid result", expectedResult, sumResult);
Expand Down Expand Up @@ -271,6 +271,7 @@ public void testCoprocessorServiceNullResponse() throws Throwable {
String> results = table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
ROWS[0], ROWS[ROWS.length - 1],
new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, String>() {
@Override
public String call(TestRpcServiceProtos.TestProtobufRpcProto instance)
throws IOException {
CoprocessorRpcUtils.BlockingRpcCallback<TestProtos.EchoResponseProto> callback =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,12 @@ private static final void verifyTable(TableName tableName) throws Throwable {
try {
Map<byte[], Long> results =
sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[0], ROWS[ROWS.length - 1]);
int sumResult = 0;
int expectedResult = 0;
long sumResult = 0;
long expectedResult = 0;
for (Map.Entry<byte[], Long> e : results.entrySet()) {
sumResult += e.getValue();
}
for (int i = 0; i < ROWSIZE; i++) {
for (long i = 0; i < ROWSIZE; i++) {
expectedResult += i;
}
assertEquals("Invalid result", expectedResult, sumResult);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,10 @@ protected void runExportMain(String[] args) throws Throwable {
Export.main(args);
}

/**
* Skip the test which is unrelated to the coprocessor.Export.
*/
@Test
@Ignore
@Override
public void testImport94Table() throws Throwable {
// Skip the test which is unrelated to the coprocessor.Export.
}
}
Loading

0 comments on commit ac8b3a7

Please sign in to comment.