Skip to content

Commit

Permalink
Fix OpenTSDB#118 by properly parsing out append responses in a multi …
Browse files Browse the repository at this point in the history
…action

that appear after the last returned response. E.g. if the multi
action had an append -> put -> append, we would throw an IOOBE
and fail to properly parse the append. Thanks @alienth
  • Loading branch information
manolama committed Sep 10, 2015
1 parent f94f67c commit 8db357f
Show file tree
Hide file tree
Showing 7 changed files with 402 additions and 9 deletions.
1 change: 1 addition & 0 deletions Makefile
Expand Up @@ -105,6 +105,7 @@ asynchbase_SOURCES := \
src/RegionClient.java \
src/RegionClientStats.java \
src/RegionInfo.java \
src/RegionLocation.java \
src/RegionOfflineException.java \
src/RegionMovedException.java \
src/RegionOpeningException.java \
Expand Down
139 changes: 139 additions & 0 deletions src/HBaseClient.java
Expand Up @@ -2019,6 +2019,145 @@ static Deferred<Object> tooManyAttempts(final HBaseRpc request,
return Deferred.fromError(e);
}

private RegionLocation toRegionLocation(ArrayList<KeyValue> meta_row) {

if (meta_row.isEmpty()) {
throw new TableNotFoundException();
}
String host = null;
int port = -1;
RegionInfo region = null;
byte[] start_key = null;

for (final KeyValue kv : meta_row) {
final byte[] qualifier = kv.qualifier();
if (Arrays.equals(REGIONINFO, qualifier)) {
final byte[][] tmp = new byte[1][]; // Yes, this is ugly.
region = RegionInfo.fromKeyValue(kv, tmp);
if (knownToBeNSREd(region)) {
invalidateRegionCache(region.name(), true, "has marked it as split.");
return null;
}
start_key = tmp[0];
} else if (Arrays.equals(SERVER, qualifier)
&& kv.value() != EMPTY_ARRAY) { // Empty during NSRE.
final byte[] hostport = kv.value();
int colon = hostport.length - 1;
for (/**/; colon > 0 /* Can't be at the beginning */; colon--) {
if (hostport[colon] == ':') {
break;
}
}
if (colon == 0) {
throw BrokenMetaException.badKV(region, "an `info:server' cell"
+ " doesn't contain `:' to separate the `host:port'"
+ Bytes.pretty(hostport), kv);
}
host = getIP(new String(hostport, 0, colon));
try {
port = parsePortNumber(new String(hostport, colon + 1,
hostport.length - colon - 1));
} catch (NumberFormatException e) {
throw BrokenMetaException.badKV(region, "an `info:server' cell"
+ " contains an invalid port: " + e.getMessage() + " in "
+ Bytes.pretty(hostport), kv);
}
}
}
if (start_key == null) {
throw new BrokenMetaException(null, "It didn't contain any"
+ " `info:regioninfo' cell: " + meta_row);
}

final byte[] region_name = region.name();
if (host == null) {
// When there's no `info:server' cell, it typically means that the
// location of this region is about to be updated in META, so we
// consider this as an NSRE.
invalidateRegionCache(region_name, true, "no longer has it assigned.");
return null;
}
return new RegionLocation(region, start_key, host, port);
}

public Deferred<List<RegionLocation>> locateRegions(final byte[] table) {

// But we don't want to do this for hbase:meta, .META. or -ROOT-.
if (Bytes.equals(table, HBASE96_META) || Bytes.equals(table, META) || Bytes.equals(table, ROOT)) {
return Deferred.fromResult(null);
}

final byte[] start = EMPTY_ARRAY;
final byte[] stop = EMPTY_ARRAY;

// Create the scan bounds.
final byte[] meta_start = createRegionSearchKey(table, start);
// In this case, we want the scan to start immediately at the
// first entry, but createRegionSearchKey finds the last entry.
meta_start[meta_start.length - 1] = 0;

// The stop bound is trickier. If the user wants the whole table,
// expressed by passing EMPTY_ARRAY, then we need to append a null
// byte to the table name (thus catching all rows in the desired
// table, but excluding those from others.) If the user specifies
// an explicit stop key, we must leave the table name alone.
final byte[] meta_stop;
if (stop.length == 0) {
meta_stop = createRegionSearchKey(table, stop); // will return "table,,:"
meta_stop[table.length] = 0; // now have "table\0,:"
meta_stop[meta_stop.length - 1] = ','; // now have "table\0,,"
} else {
meta_stop = createRegionSearchKey(table, stop);
}

if (rootregion == null) {
// If we don't know where the root region is, we don't yet know whether
// there is even a -ROOT- region at all (pre HBase 0.95). So we can't
// start scanning meta right away, because we don't yet know whether
// meta is named ".META." or "hbase:meta". So instead we first check
// whether the table exists, which will force us to do a first meta
// lookup (and therefore figure out what the name of meta is).
class RetryLocateRegions implements Callback<Deferred<List<RegionLocation>>, Object> {
public Deferred<List<RegionLocation>> call(final Object unused) {
return locateRegions(table);
}
public String toString() {
return "Retry locateRegions (" + Bytes.pretty(table) + ", "
+ Bytes.pretty(start) + ", " + Bytes.pretty(stop) + ")";
}
}
return ensureTableExists(table).addCallbackDeferring(new RetryLocateRegions());
}

final Scanner meta_scanner = newScanner(has_root ? META : HBASE96_META);
meta_scanner.setStartKey(meta_start);
meta_scanner.setStopKey(meta_stop);

final List<RegionLocation> result = new ArrayList<RegionLocation>();

class LocateRegionsContinue
implements Callback<Deferred<List<RegionLocation>>, ArrayList<ArrayList<KeyValue>>> {
public Deferred<List<RegionLocation>> call(final ArrayList<ArrayList<KeyValue>> results) {
if (results != null && !results.isEmpty()) {
for (ArrayList<KeyValue> meta_row : results) {
RegionLocation region_location = toRegionLocation(meta_row);
if (region_location != null) {
result.add(region_location);
}
}
return meta_scanner.nextRows().addCallbackDeferring(this);
}
return Deferred.fromResult(result);
}

public String toString() {
return "LocateRegionsContinue scanner=" + meta_scanner;
}
}

return meta_scanner.nextRows().addCallbackDeferring(new LocateRegionsContinue());
}

/** @return the rpc timeout timer */
HashedWheelTimer getRpcTimeoutTimer() {
return rpc_timeout_timer;
Expand Down
18 changes: 13 additions & 5 deletions src/MultiAction.java
Expand Up @@ -529,13 +529,15 @@ public int compare(final BatchableRpc a, final BatchableRpc b) {
@Override
Object deserialize(final ChannelBuffer buf, final int cell_size) {
final MultiResponse resp = readProtobuf(buf, MultiResponse.PARSER);
final int responses = resp.getRegionActionResultCount();
final int nrpcs = batch.size();
final Object[] resps = new Object[nrpcs];
int n = 0; // Index in `batch'.
int r = 0; // Index in `regionActionResult' in the PB.
ArrayList<KeyValue> kvs = null;
int kv_index = 0;
while (n < nrpcs) {

while (n < nrpcs && r < responses) {
final RegionActionResult results = resp.getRegionActionResult(r++);
final int nresults = results.getResultOrExceptionCount();
if (results.hasException()) {
Expand Down Expand Up @@ -627,14 +629,20 @@ Object deserialize(final ChannelBuffer buf, final int cell_size) {
} else {
result = SUCCESS;
}
}
}
resps[n++] = result;
}
}
if (n != nrpcs) {
throw new InvalidResponseException("Expected " + nrpcs
+ " results but got " + n,
resp);
// handle trailing appends that don't have a response.
while (n < nrpcs && batch.get(n) instanceof AppendRequest &&
!((AppendRequest)batch.get(n)).returnResult()) {
resps[n++] = SUCCESS;
}
if (n != nrpcs) {
throw new InvalidResponseException("Expected " + nrpcs
+ " results but got " + n, resp);
}
}
return new Response(resps);
}
Expand Down
2 changes: 1 addition & 1 deletion src/RegionInfo.java
Expand Up @@ -44,7 +44,7 @@
/**
* Stores basic information about a region.
*/
final class RegionInfo implements Comparable<RegionInfo> {
public final class RegionInfo implements Comparable<RegionInfo> {

private static final Logger LOG = LoggerFactory.getLogger(RegionInfo.class);

Expand Down
68 changes: 68 additions & 0 deletions src/RegionLocation.java
@@ -0,0 +1,68 @@
/*
* Copyright (C) 2010-2012 The Async HBase Authors. All rights reserved.
* This file is part of Async HBase.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the StumbleUpon nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
package org.hbase.async;

public final class RegionLocation {

private final RegionInfo region_info;
private final String host;
private final int port;
private final byte[] start_key;

public RegionLocation(RegionInfo region_info, byte[] start_key, String host, int port) {
this.region_info = region_info;
this.start_key = start_key;
this.host = host;
this.port = port;
}

public RegionInfo getRegionInfo() {
return this.region_info;
}

public byte[] startKey() {
return this.start_key;
}

public String getHostname() {
return this.host;
}

public int getPort() {
return this.port;
}


@Override
public String toString() {
return this.getClass().getCanonicalName() +
"{RegionInfo: " + region_info +
", hostport: " + host +
":" + port +
", startKey: " + Bytes.pretty(start_key);
}
}
49 changes: 48 additions & 1 deletion test/TestIntegration.java
Expand Up @@ -34,7 +34,6 @@
import java.util.ArrayList;
import java.util.Collection;

import org.hbase.async.KeyOnlyFilter;
import org.slf4j.Logger;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -259,6 +258,7 @@ public void putReadDeleteAtTimestamp() throws Exception {
@Test
public void appendOnceNoReturnRead() throws Exception {
client.setFlushInterval(FAST_FLUSH);
truncateTable(table);
final double write_time = System.currentTimeMillis();
final AppendRequest append = new AppendRequest(table, "a", family, "q", "val");
final GetRequest get = new GetRequest(table, "a", family, "q");
Expand All @@ -281,6 +281,7 @@ public void appendOnceNoReturnRead() throws Exception {
@Test
public void appendTwiceNoReturnRead() throws Exception {
client.setFlushInterval(FAST_FLUSH);
truncateTable(table);
final double write_time = System.currentTimeMillis();
final AppendRequest append = new AppendRequest(table, "a2", family, "q", "val");
final AppendRequest append2 = new AppendRequest(table, "a2", family, "q", "2ndv");
Expand All @@ -297,6 +298,52 @@ public void appendTwiceNoReturnRead() throws Exception {
final double kvts = kv.timestamp();
assertEquals(write_time, kvts, 5000.0); // Within five seconds.
}

@Test
public void appendTwiceBatchedNoReturnRead() throws Exception {
truncateTable(table);
final double write_time = System.currentTimeMillis();
final AppendRequest append = new AppendRequest(table, "a2", family, "q", "val");
final AppendRequest append2 = new AppendRequest(table, "a2", family, "q", "2ndv");
final GetRequest get = new GetRequest(table, "a2", family, "q");
final ArrayList<Deferred<Object>> deferreds = new ArrayList<Deferred<Object>>(2);
deferreds.add(client.append(append));
deferreds.add(client.append(append2));
Deferred.group(deferreds).join();
final ArrayList<KeyValue> kvs = client.get(get).join();
assertSizeIs(1, kvs);
final KeyValue kv = kvs.get(0);
assertEq("a2", kv.key());
assertEq(family, kv.family());
assertEq("q", kv.qualifier());
assertEq("val2ndv", kv.value());
final double kvts = kv.timestamp();
assertEquals(write_time, kvts, 5000.0); // Within five seconds.
}

@Test
public void appendTwiceBatchedReturnRead() throws Exception {
truncateTable(table);
final double write_time = System.currentTimeMillis();
final AppendRequest append = new AppendRequest(table, "a2", family, "q", "val");
append.returnResult(true);
final AppendRequest append2 = new AppendRequest(table, "a2", family, "q", "2ndv");
append2.returnResult(true);
final GetRequest get = new GetRequest(table, "a2", family, "q");
final ArrayList<Deferred<Object>> deferreds = new ArrayList<Deferred<Object>>(2);
deferreds.add(client.append(append));
deferreds.add(client.append(append2));
Deferred.group(deferreds).join();
final ArrayList<KeyValue> kvs = client.get(get).join();
assertSizeIs(1, kvs);
final KeyValue kv = kvs.get(0);
assertEq("a2", kv.key());
assertEq(family, kv.family());
assertEq("q", kv.qualifier());
assertEq("val2ndv", kv.value());
final double kvts = kv.timestamp();
assertEquals(write_time, kvts, 5000.0); // Within five seconds.
}

/** Basic scan test. */
@Test
Expand Down

0 comments on commit 8db357f

Please sign in to comment.