Skip to content

Commit

Permalink
HBASE-26878 TableInputFormatBase should cache RegionSizeCalculator (a…
Browse files Browse the repository at this point in the history
…pache#4271)

Signed-off-by: Andrew Purtell <apurtell@apache.org>
  • Loading branch information
bbeaudreault authored and apurtell committed Mar 24, 2022
1 parent ce5c7a4 commit 83d5a9e
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ public abstract class TableInputFormatBase
private TableRecordReader tableRecordReader = null;
/** The underlying {@link Connection} of the table. */
private Connection connection;
/** Used to generate splits based on region size. */
private RegionSizeCalculator regionSizeCalculator;


/** The reverse DNS lookup cache mapping: IPAddress => HostName */
Expand Down Expand Up @@ -288,8 +290,11 @@ public List<InputSplit> getSplits(JobContext context) throws IOException {
* @throws IOException throws IOException
*/
private List<InputSplit> oneInputSplitPerRegion() throws IOException {
RegionSizeCalculator sizeCalculator =
createRegionSizeCalculator(getRegionLocator(), getAdmin());
if (regionSizeCalculator == null) {
// Initialize here rather than with the other resources because this involves
// a full scan of meta, which can be heavy. We might as well only do it if/when necessary.
regionSizeCalculator = createRegionSizeCalculator(getRegionLocator(), getAdmin());
}

TableName tableName = getTable().getName();

Expand All @@ -302,7 +307,7 @@ private List<InputSplit> oneInputSplitPerRegion() throws IOException {
throw new IOException("Expecting at least one region.");
}
List<InputSplit> splits = new ArrayList<>(1);
long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName());
long regionSize = regionSizeCalculator.getRegionSize(regLoc.getRegion().getRegionName());
// In the table input format for single table we do not need to
// store the scan object in table split because it can be memory intensive and redundant
// information to what is already stored in conf SCAN. See HBASE-25212
Expand Down Expand Up @@ -343,9 +348,9 @@ private List<InputSplit> oneInputSplitPerRegion() throws IOException {
String regionLocation;
regionLocation = reverseDNS(regionAddress);

byte[] regionName = location.getRegionInfo().getRegionName();
String encodedRegionName = location.getRegionInfo().getEncodedName();
long regionSize = sizeCalculator.getRegionSize(regionName);
byte[] regionName = location.getRegion().getRegionName();
String encodedRegionName = location.getRegion().getEncodedName();
long regionSize = regionSizeCalculator.getRegionSize(regionName);
// In the table input format for single table we do not need to
// store the scan object in table split because it can be memory intensive and redundant
// information to what is already stored in conf SCAN. See HBASE-25212
Expand Down Expand Up @@ -597,6 +602,7 @@ protected void initializeTable(Connection connection, TableName tableName) throw
this.regionLocator = connection.getRegionLocator(tableName);
this.admin = connection.getAdmin();
this.connection = connection;
this.regionSizeCalculator = null;
}

@InterfaceAudience.Private
Expand Down Expand Up @@ -664,6 +670,7 @@ protected void closeTable() throws IOException {
table = null;
regionLocator = null;
connection = null;
regionSizeCalculator = null;
}

private void close(Closeable... closables) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.net.Inet6Address;
import java.net.InetAddress;
Expand Down Expand Up @@ -55,6 +54,7 @@
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

Expand All @@ -65,6 +65,34 @@ public class TestTableInputFormatBase {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestTableInputFormatBase.class);

@Test
public void testReuseRegionSizeCalculator() throws IOException {
JobContext context = mock(JobContext.class);
Configuration conf = HBaseConfiguration.create();
conf.set(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL,
ConnectionForMergeTesting.class.getName());
conf.set(TableInputFormat.INPUT_TABLE, "testTable");
conf.setBoolean(TableInputFormatBase.MAPREDUCE_INPUT_AUTOBALANCE, true);
when(context.getConfiguration()).thenReturn(conf);

TableInputFormat format = Mockito.spy(new TableInputFormatForMergeTesting());
format.setConf(conf);
// initialize so that table is set, otherwise cloneOnFinish
// will be true and each getSplits call will re-initialize everything
format.initialize(context);
format.getSplits(context);
format.getSplits(context);

// re-initialize which will cause the next getSplits call to create a new RegionSizeCalculator
format.initialize(context);
format.getSplits(context);
format.getSplits(context);

// should only be 2 despite calling getSplits 4 times
Mockito.verify(format, Mockito.times(2))
.createRegionSizeCalculator(Mockito.any(), Mockito.any());
}

@Test
public void testTableInputFormatBaseReverseDNSForIPv6()
throws UnknownHostException {
Expand Down

0 comments on commit 83d5a9e

Please sign in to comment.