Skip to content

Commit

Permalink
HBASE-22285 A normalizer which merges small size regions with adjacen…
Browse files Browse the repository at this point in the history
…t regions (apache#978)

Signed-off-by: Viraj Jasani <vjasani@apache.org>
Signed-off-by: stack <stack@apache.org>
  • Loading branch information
mnpoonia authored and Guanghao Zhang committed Mar 11, 2020
1 parent a13a16e commit 0665573
Show file tree
Hide file tree
Showing 5 changed files with 677 additions and 146 deletions.
11 changes: 11 additions & 0 deletions hbase-common/src/main/resources/hbase-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,17 @@ possible configurations would overwhelm and obscure the important.
<value>300000</value>
<description>Period at which the region normalizer runs in the Master.</description>
</property>
<property>
<name>hbase.normalizer.min.region.count</name>
<value>3</value>
<description>configure the minimum number of regions</description>
</property>
<property>
<name>hbase.normalizer.min.region.merge.age</name>
<value>3</value>
<description>configure the minimum age in days for region before it is considered for merge while
normalizing</description>
</property>
<property>
<name>hbase.regions.slop</name>
<value>0.001</value>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.normalizer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.RegionMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Size;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.MasterSwitchType;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.MasterRpcServices;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.xiaomi.infra.thirdparty.com.google.protobuf.ServiceException;

import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;

@InterfaceAudience.Private
public abstract class AbstractRegionNormalizer implements RegionNormalizer {
private static final Logger LOG = LoggerFactory.getLogger(AbstractRegionNormalizer.class);
protected MasterServices masterServices;
protected MasterRpcServices masterRpcServices;

/**
* Set the master service.
* @param masterServices inject instance of MasterServices
*/
@Override
public void setMasterServices(MasterServices masterServices) {
this.masterServices = masterServices;
}

@Override
public void setMasterRpcServices(MasterRpcServices masterRpcServices) {
this.masterRpcServices = masterRpcServices;
}

/**
* @param hri regioninfo
* @return size of region in MB and if region is not found than -1
*/
protected long getRegionSize(RegionInfo hri) {
ServerName sn =
masterServices.getAssignmentManager().getRegionStates().getRegionServerOfRegion(hri);
RegionMetrics regionLoad =
masterServices.getServerManager().getLoad(sn).getRegionMetrics().get(hri.getRegionName());
if (regionLoad == null) {
LOG.debug("{} was not found in RegionsLoad", hri.getRegionNameAsString());
return -1;
}
return (long) regionLoad.getStoreFileSize().get(Size.Unit.MEGABYTE);
}

protected boolean isMergeEnabled() {
boolean mergeEnabled = true;
try {
mergeEnabled = masterRpcServices
.isSplitOrMergeEnabled(null,
RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.MERGE))
.getEnabled();
} catch (ServiceException e) {
LOG.warn("Unable to determine whether merge is enabled", e);
}
return mergeEnabled;
}

protected boolean isSplitEnabled() {
boolean splitEnabled = true;
try {
splitEnabled = masterRpcServices
.isSplitOrMergeEnabled(null,
RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.SPLIT))
.getEnabled();
} catch (ServiceException se) {
LOG.warn("Unable to determine whether split is enabled", se);
}
return splitEnabled;
}

/**
* @param tableRegions regions of table to normalize
* @return average region size depending on
* @see org.apache.hadoop.hbase.client.TableDescriptor#getNormalizerTargetRegionCount()
* Also make sure tableRegions contains regions of the same table
*/
protected double getAverageRegionSize(List<RegionInfo> tableRegions) {
long totalSizeMb = 0;
int acutalRegionCnt = 0;
for (RegionInfo hri : tableRegions) {
long regionSize = getRegionSize(hri);
// don't consider regions that are in bytes for averaging the size.
if (regionSize > 0) {
acutalRegionCnt++;
totalSizeMb += regionSize;
}
}
TableName table = tableRegions.get(0).getTable();
int targetRegionCount = -1;
long targetRegionSize = -1;
try {
TableDescriptor tableDescriptor = masterServices.getTableDescriptors().get(table);
if (tableDescriptor != null) {
targetRegionCount = tableDescriptor.getNormalizerTargetRegionCount();
targetRegionSize = tableDescriptor.getNormalizerTargetRegionSize();
LOG.debug("Table {}: target region count is {}, target region size is {}", table,
targetRegionCount, targetRegionSize);
}
} catch (IOException e) {
LOG.warn(
"cannot get the target number and target size of table {}, they will be default value -1.",
table, e);
}

double avgRegionSize;
if (targetRegionSize > 0) {
avgRegionSize = targetRegionSize;
} else if (targetRegionCount > 0) {
avgRegionSize = totalSizeMb / (double) targetRegionCount;
} else {
avgRegionSize = acutalRegionCnt == 0 ? 0 : totalSizeMb / (double) acutalRegionCnt;
}

LOG.debug("Table {}, total aggregated regions size: {} and average region size {}", table,
totalSizeMb, avgRegionSize);
return avgRegionSize;
}

/**
* Computes the merge plans that should be executed for this table to converge average region
* towards target average or target region count
* @param table table to normalize
* @return list of merge normalization plans
*/
protected List<NormalizationPlan> getMergeNormalizationPlan(TableName table) {
List<NormalizationPlan> plans = new ArrayList<>();
List<RegionInfo> tableRegions =
masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(table);
double avgRegionSize = getAverageRegionSize(tableRegions);
LOG.debug("Table {}, average region size: {}.\n Computing normalization plan for table: {}, "
+ "number of regions: {}",
table, avgRegionSize, table, tableRegions.size());

int candidateIdx = 0;
while (candidateIdx < tableRegions.size() - 1) {
RegionInfo hri = tableRegions.get(candidateIdx);
long regionSize = getRegionSize(hri);
RegionInfo hri2 = tableRegions.get(candidateIdx + 1);
long regionSize2 = getRegionSize(hri2);
if (regionSize >= 0 && regionSize2 >= 0 && regionSize + regionSize2 < avgRegionSize) {
// atleast one of the two regions should be older than MIN_REGION_DURATION days
plans.add(new MergeNormalizationPlan(hri, hri2));
candidateIdx++;
} else {
LOG.debug("Skipping region {} of table {} with size {}", hri.getRegionNameAsString(), table,
regionSize);
}
candidateIdx++;
}
return plans;
}

/**
* Computes the split plans that should be executed for this table to converge average region size
* towards target average or target region count
* @param table table to normalize
* @return list of split normalization plans
*/
protected List<NormalizationPlan> getSplitNormalizationPlan(TableName table) {
List<NormalizationPlan> plans = new ArrayList<>();
List<RegionInfo> tableRegions =
masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(table);
double avgRegionSize = getAverageRegionSize(tableRegions);
LOG.debug("Table {}, average region size: {}", table, avgRegionSize);

int candidateIdx = 0;
while (candidateIdx < tableRegions.size()) {
RegionInfo hri = tableRegions.get(candidateIdx);
long regionSize = getRegionSize(hri);
// if the region is > 2 times larger than average, we split it, split
// is more high priority normalization action than merge.
if (regionSize > 2 * avgRegionSize) {
LOG.info("Table {}, large region {} has size {}, more than twice avg size, splitting",
table, hri.getRegionNameAsString(), regionSize);
plans.add(new SplitNormalizationPlan(hri, null));
}
candidateIdx++;
}
return plans;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.normalizer;

import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Implementation of MergeNormalizer Logic in use:
* <ol>
* <li>get all regions of a given table
* <li>get avg size S of each region (by total size of store files reported in RegionLoad)
* <li>two regions R1 and its neighbour R2 are merged, if R1 + R2 &lt; S, and all such regions are
* returned to be merged
* <li>Otherwise, no action is performed
* </ol>
* <p>
* Considering the split policy takes care of splitting region we also want a way to merge when
* regions are too small. It is little different than what
* {@link org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer} does. Instead of doing
* splits and merge both to achieve average region size in cluster for a table. We only merge
* regions(older than defined age) and rely on Split policy for region splits. The goal of this
* normalizer is to merge small regions to make size of regions close to average size (which is
* either average size or depends on either target region size or count in that order). Consider
* region with size 1,2,3,4,10,10,10,5,4,3. If minimum merge age is set to 0 days this algorithm
* will find the average size as 7.2 assuming we haven't provided target region count or size. Now
* we will find all those adjacent region which if merged doesn't exceed the average size. so we
* will merge 1-2, 3-4, 4,3 in our first run. To get best results from this normalizer theoretically
* we should set target region size between 0.5 to 0.75 of configured maximum file size. If we set
* min merge age as 3 we create plan as above and see if we have a plan which has both regions as
* new(age less than 3) we discard such plans and we consider the regions even if one of the region
* is old enough to be merged.
* </p>
*/

@InterfaceAudience.Private
public class MergeNormalizer extends AbstractRegionNormalizer {
private static final Logger LOG = LoggerFactory.getLogger(MergeNormalizer.class);

private int minRegionCount;
private int minRegionAge;
private static long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];

public MergeNormalizer() {
Configuration conf = HBaseConfiguration.create();
minRegionCount = conf.getInt("hbase.normalizer.min.region.count", 3);
minRegionAge = conf.getInt("hbase.normalizer.min.region.merge.age", 3);
}

@Override
public void planSkipped(RegionInfo hri, NormalizationPlan.PlanType type) {
skippedCount[type.ordinal()]++;
}

@Override
public long getSkippedCount(NormalizationPlan.PlanType type) {
return skippedCount[type.ordinal()];
}

@Override
public List<NormalizationPlan> computePlanForTable(TableName table) throws HBaseIOException {
List<NormalizationPlan> plans = new ArrayList<>();
if (!shouldNormalize(table)) {
return null;
}
// at least one of the two regions should be older than MIN_REGION_AGE days
List<NormalizationPlan> normalizationPlans = getMergeNormalizationPlan(table);
for (NormalizationPlan plan : normalizationPlans) {
if (plan instanceof MergeNormalizationPlan) {
RegionInfo hri = ((MergeNormalizationPlan) plan).getFirstRegion();
RegionInfo hri2 = ((MergeNormalizationPlan) plan).getSecondRegion();
if (isOldEnoughToMerge(hri) || isOldEnoughToMerge(hri2)) {
plans.add(plan);
} else {
LOG.debug("Skipping region {} and {} as they are both new", hri.getEncodedName(),
hri2.getEncodedName());
}
}
}
if (plans.isEmpty()) {
LOG.debug("No normalization needed, regions look good for table: {}", table);
return null;
}
return plans;
}

private boolean isOldEnoughToMerge(RegionInfo hri) {
Timestamp currentTime = new Timestamp(System.currentTimeMillis());
Timestamp hriTime = new Timestamp(hri.getRegionId());
boolean isOld =
new Timestamp(hriTime.getTime() + TimeUnit.DAYS.toMillis(minRegionAge))
.before(currentTime);
return isOld;
}

private boolean shouldNormalize(TableName table) {
boolean normalize = false;
if (table == null || table.isSystemTable()) {
LOG.debug("Normalization of system table {} isn't allowed", table);
} else if (!isMergeEnabled()) {
LOG.debug("Merge disabled for table: {}", table);
} else {
List<RegionInfo> tableRegions =
masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(table);
if (tableRegions == null || tableRegions.size() < minRegionCount) {
int nrRegions = tableRegions == null ? 0 : tableRegions.size();
LOG.debug(
"Table {} has {} regions, required min number of regions for normalizer to run is {} , "
+ "not running normalizer",
table, nrRegions, minRegionCount);
} else {
normalize = true;
}
}
return normalize;
}
}
Loading

0 comments on commit 0665573

Please sign in to comment.