From c9c22fedaa5763dc91b03d905f8a033f8b074643 Mon Sep 17 00:00:00 2001 From: Aman Poonia Date: Fri, 10 Jan 2020 01:19:49 +0530 Subject: [PATCH] HBASE-22285 A normalizer which merges small size regions with adjacent regions (#978) Signed-off-by: Viraj Jasani Signed-off-by: stack --- .../src/main/resources/hbase-default.xml | 11 + .../normalizer/AbstractRegionNormalizer.java | 213 ++++++++++++++ .../master/normalizer/MergeNormalizer.java | 143 ++++++++++ .../normalizer/SimpleRegionNormalizer.java | 186 +++--------- .../normalizer/TestMergeNormalizer.java | 270 ++++++++++++++++++ 5 files changed, 677 insertions(+), 146 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/AbstractRegionNormalizer.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizer.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestMergeNormalizer.java diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index a7d6898b2114..56fa7678c226 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -630,6 +630,17 @@ possible configurations would overwhelm and obscure the important. 300000 Period at which the region normalizer runs in the Master. + + hbase.normalizer.min.region.count + 3 + configure the minimum number of regions + + + hbase.normalizer.min.region.merge.age + 3 + configure the minimum age in days for region before it is considered for merge while + normalizing + hbase.regions.slop 0.001 diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/AbstractRegionNormalizer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/AbstractRegionNormalizer.java new file mode 100644 index 000000000000..36742a074e8e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/AbstractRegionNormalizer.java @@ -0,0 +1,213 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.normalizer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.hbase.RegionMetrics; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.Size; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.MasterSwitchType; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.master.MasterRpcServices; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; + +import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; + +@InterfaceAudience.Private +public abstract class AbstractRegionNormalizer implements RegionNormalizer { + private static final Logger LOG = LoggerFactory.getLogger(AbstractRegionNormalizer.class); + protected MasterServices masterServices; + protected MasterRpcServices masterRpcServices; + + /** + * Set the master service. + * @param masterServices inject instance of MasterServices + */ + @Override + public void setMasterServices(MasterServices masterServices) { + this.masterServices = masterServices; + } + + @Override + public void setMasterRpcServices(MasterRpcServices masterRpcServices) { + this.masterRpcServices = masterRpcServices; + } + + /** + * @param hri regioninfo + * @return size of region in MB and if region is not found than -1 + */ + protected long getRegionSize(RegionInfo hri) { + ServerName sn = + masterServices.getAssignmentManager().getRegionStates().getRegionServerOfRegion(hri); + RegionMetrics regionLoad = + masterServices.getServerManager().getLoad(sn).getRegionMetrics().get(hri.getRegionName()); + if (regionLoad == null) { + LOG.debug("{} was not found in RegionsLoad", hri.getRegionNameAsString()); + return -1; + } + return (long) regionLoad.getStoreFileSize().get(Size.Unit.MEGABYTE); + } + + protected boolean isMergeEnabled() { + boolean mergeEnabled = true; + try { + mergeEnabled = masterRpcServices + .isSplitOrMergeEnabled(null, + RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.MERGE)) + .getEnabled(); + } catch (ServiceException e) { + LOG.warn("Unable to determine whether merge is enabled", e); + } + return mergeEnabled; + } + + protected boolean isSplitEnabled() { + boolean splitEnabled = true; + try { + splitEnabled = masterRpcServices + .isSplitOrMergeEnabled(null, + RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.SPLIT)) + .getEnabled(); + } catch (ServiceException se) { + LOG.warn("Unable to determine whether split is enabled", se); + } + return splitEnabled; + } + + /** + * @param tableRegions regions of table to normalize + * @return average region size depending on + * @see org.apache.hadoop.hbase.client.TableDescriptor#getNormalizerTargetRegionCount() + * Also make sure tableRegions contains regions of the same table + */ + protected double getAverageRegionSize(List tableRegions) { + long totalSizeMb = 0; + int acutalRegionCnt = 0; + for (RegionInfo hri : tableRegions) { + long regionSize = getRegionSize(hri); + // don't consider regions that are in bytes for averaging the size. + if (regionSize > 0) { + acutalRegionCnt++; + totalSizeMb += regionSize; + } + } + TableName table = tableRegions.get(0).getTable(); + int targetRegionCount = -1; + long targetRegionSize = -1; + try { + TableDescriptor tableDescriptor = masterServices.getTableDescriptors().get(table); + if (tableDescriptor != null) { + targetRegionCount = tableDescriptor.getNormalizerTargetRegionCount(); + targetRegionSize = tableDescriptor.getNormalizerTargetRegionSize(); + LOG.debug("Table {}: target region count is {}, target region size is {}", table, + targetRegionCount, targetRegionSize); + } + } catch (IOException e) { + LOG.warn( + "cannot get the target number and target size of table {}, they will be default value -1.", + table, e); + } + + double avgRegionSize; + if (targetRegionSize > 0) { + avgRegionSize = targetRegionSize; + } else if (targetRegionCount > 0) { + avgRegionSize = totalSizeMb / (double) targetRegionCount; + } else { + avgRegionSize = acutalRegionCnt == 0 ? 0 : totalSizeMb / (double) acutalRegionCnt; + } + + LOG.debug("Table {}, total aggregated regions size: {} and average region size {}", table, + totalSizeMb, avgRegionSize); + return avgRegionSize; + } + + /** + * Computes the merge plans that should be executed for this table to converge average region + * towards target average or target region count + * @param table table to normalize + * @return list of merge normalization plans + */ + protected List getMergeNormalizationPlan(TableName table) { + List plans = new ArrayList<>(); + List tableRegions = + masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(table); + double avgRegionSize = getAverageRegionSize(tableRegions); + LOG.debug("Table {}, average region size: {}.\n Computing normalization plan for table: {}, " + + "number of regions: {}", + table, avgRegionSize, table, tableRegions.size()); + + int candidateIdx = 0; + while (candidateIdx < tableRegions.size() - 1) { + RegionInfo hri = tableRegions.get(candidateIdx); + long regionSize = getRegionSize(hri); + RegionInfo hri2 = tableRegions.get(candidateIdx + 1); + long regionSize2 = getRegionSize(hri2); + if (regionSize >= 0 && regionSize2 >= 0 && regionSize + regionSize2 < avgRegionSize) { + // atleast one of the two regions should be older than MIN_REGION_DURATION days + plans.add(new MergeNormalizationPlan(hri, hri2)); + candidateIdx++; + } else { + LOG.debug("Skipping region {} of table {} with size {}", hri.getRegionNameAsString(), table, + regionSize); + } + candidateIdx++; + } + return plans; + } + + /** + * Computes the split plans that should be executed for this table to converge average region size + * towards target average or target region count + * @param table table to normalize + * @return list of split normalization plans + */ + protected List getSplitNormalizationPlan(TableName table) { + List plans = new ArrayList<>(); + List tableRegions = + masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(table); + double avgRegionSize = getAverageRegionSize(tableRegions); + LOG.debug("Table {}, average region size: {}", table, avgRegionSize); + + int candidateIdx = 0; + while (candidateIdx < tableRegions.size()) { + RegionInfo hri = tableRegions.get(candidateIdx); + long regionSize = getRegionSize(hri); + // if the region is > 2 times larger than average, we split it, split + // is more high priority normalization action than merge. + if (regionSize > 2 * avgRegionSize) { + LOG.info("Table {}, large region {} has size {}, more than twice avg size, splitting", + table, hri.getRegionNameAsString(), regionSize); + plans.add(new SplitNormalizationPlan(hri, null)); + } + candidateIdx++; + } + return plans; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizer.java new file mode 100644 index 000000000000..444c27c28681 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizer.java @@ -0,0 +1,143 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.normalizer; + +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of MergeNormalizer Logic in use: + *
    + *
  1. get all regions of a given table + *
  2. get avg size S of each region (by total size of store files reported in RegionLoad) + *
  3. two regions R1 and its neighbour R2 are merged, if R1 + R2 < S, and all such regions are + * returned to be merged + *
  4. Otherwise, no action is performed + *
+ *

+ * Considering the split policy takes care of splitting region we also want a way to merge when + * regions are too small. It is little different than what + * {@link org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer} does. Instead of doing + * splits and merge both to achieve average region size in cluster for a table. We only merge + * regions(older than defined age) and rely on Split policy for region splits. The goal of this + * normalizer is to merge small regions to make size of regions close to average size (which is + * either average size or depends on either target region size or count in that order). Consider + * region with size 1,2,3,4,10,10,10,5,4,3. If minimum merge age is set to 0 days this algorithm + * will find the average size as 7.2 assuming we haven't provided target region count or size. Now + * we will find all those adjacent region which if merged doesn't exceed the average size. so we + * will merge 1-2, 3-4, 4,3 in our first run. To get best results from this normalizer theoretically + * we should set target region size between 0.5 to 0.75 of configured maximum file size. If we set + * min merge age as 3 we create plan as above and see if we have a plan which has both regions as + * new(age less than 3) we discard such plans and we consider the regions even if one of the region + * is old enough to be merged. + *

+ */ + +@InterfaceAudience.Private +public class MergeNormalizer extends AbstractRegionNormalizer { + private static final Logger LOG = LoggerFactory.getLogger(MergeNormalizer.class); + + private int minRegionCount; + private int minRegionAge; + private static long[] skippedCount = new long[NormalizationPlan.PlanType.values().length]; + + public MergeNormalizer() { + Configuration conf = HBaseConfiguration.create(); + minRegionCount = conf.getInt("hbase.normalizer.min.region.count", 3); + minRegionAge = conf.getInt("hbase.normalizer.min.region.merge.age", 3); + } + + @Override + public void planSkipped(RegionInfo hri, NormalizationPlan.PlanType type) { + skippedCount[type.ordinal()]++; + } + + @Override + public long getSkippedCount(NormalizationPlan.PlanType type) { + return skippedCount[type.ordinal()]; + } + + @Override + public List computePlanForTable(TableName table) throws HBaseIOException { + List plans = new ArrayList<>(); + if (!shouldNormalize(table)) { + return null; + } + // at least one of the two regions should be older than MIN_REGION_AGE days + List normalizationPlans = getMergeNormalizationPlan(table); + for (NormalizationPlan plan : normalizationPlans) { + if (plan instanceof MergeNormalizationPlan) { + RegionInfo hri = ((MergeNormalizationPlan) plan).getFirstRegion(); + RegionInfo hri2 = ((MergeNormalizationPlan) plan).getSecondRegion(); + if (isOldEnoughToMerge(hri) || isOldEnoughToMerge(hri2)) { + plans.add(plan); + } else { + LOG.debug("Skipping region {} and {} as they are both new", hri.getEncodedName(), + hri2.getEncodedName()); + } + } + } + if (plans.isEmpty()) { + LOG.debug("No normalization needed, regions look good for table: {}", table); + return null; + } + return plans; + } + + private boolean isOldEnoughToMerge(RegionInfo hri) { + Timestamp currentTime = new Timestamp(System.currentTimeMillis()); + Timestamp hriTime = new Timestamp(hri.getRegionId()); + boolean isOld = + new Timestamp(hriTime.getTime() + TimeUnit.DAYS.toMillis(minRegionAge)) + .before(currentTime); + return isOld; + } + + private boolean shouldNormalize(TableName table) { + boolean normalize = false; + if (table == null || table.isSystemTable()) { + LOG.debug("Normalization of system table {} isn't allowed", table); + } else if (!isMergeEnabled()) { + LOG.debug("Merge disabled for table: {}", table); + } else { + List tableRegions = + masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(table); + if (tableRegions == null || tableRegions.size() < minRegionCount) { + int nrRegions = tableRegions == null ? 0 : tableRegions.size(); + LOG.debug( + "Table {} has {} regions, required min number of regions for normalizer to run is {} , " + + "not running normalizer", + table, nrRegions, minRegionCount); + } else { + normalize = true; + } + } + return normalize; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java index 8d566f081fb4..bd90f5b76c33 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java @@ -18,68 +18,44 @@ */ package org.apache.hadoop.hbase.master.normalizer; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseIOException; -import org.apache.hadoop.hbase.RegionMetrics; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.Size; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.MasterSwitchType; import org.apache.hadoop.hbase.client.RegionInfo; -import org.apache.hadoop.hbase.client.TableDescriptor; -import org.apache.hadoop.hbase.master.MasterRpcServices; -import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; - /** - * Simple implementation of region normalizer. - * - * Logic in use: - * - *
    - *
  1. Get all regions of a given table - *
  2. Get avg size S of each region (by total size of store files reported in RegionMetrics) - *
  3. Seek every single region one by one. If a region R0 is bigger than S * 2, it is - * kindly requested to split. Thereon evaluate the next region R1 - *
  4. Otherwise, if R0 + R1 is smaller than S, R0 and R1 are kindly requested to merge. - * Thereon evaluate the next region R2 - *
  5. Otherwise, R1 is evaluated + * Simple implementation of region normalizer. Logic in use: + *
      + *
    1. Get all regions of a given table + *
    2. Get avg size S of each region (by total size of store files reported in RegionMetrics) + *
    3. Seek every single region one by one. If a region R0 is bigger than S * 2, it is kindly + * requested to split. Thereon evaluate the next region R1 + *
    4. Otherwise, if R0 + R1 is smaller than S, R0 and R1 are kindly requested to merge. Thereon + * evaluate the next region R2 + *
    5. Otherwise, R1 is evaluated *
    *

    - * Region sizes are coarse and approximate on the order of megabytes. Additionally, - * "empty" regions (less than 1MB, with the previous note) are not merged away. This - * is by design to prevent normalization from undoing the pre-splitting of a table. + * Region sizes are coarse and approximate on the order of megabytes. Additionally, "empty" regions + * (less than 1MB, with the previous note) are not merged away. This is by design to prevent + * normalization from undoing the pre-splitting of a table. */ @InterfaceAudience.Private -public class SimpleRegionNormalizer implements RegionNormalizer { +public class SimpleRegionNormalizer extends AbstractRegionNormalizer { private static final Logger LOG = LoggerFactory.getLogger(SimpleRegionNormalizer.class); - private static final int MIN_REGION_COUNT = 3; - private MasterServices masterServices; - private MasterRpcServices masterRpcServices; + private int minRegionCount; private static long[] skippedCount = new long[NormalizationPlan.PlanType.values().length]; - /** - * Set the master service. - * @param masterServices inject instance of MasterServices - */ - @Override - public void setMasterServices(MasterServices masterServices) { - this.masterServices = masterServices; - } - - @Override - public void setMasterRpcServices(MasterRpcServices masterRpcServices) { - this.masterRpcServices = masterRpcServices; + public SimpleRegionNormalizer() { + minRegionCount = HBaseConfiguration.create().getInt("hbase.normalizer.min.region.count", 3); } @Override @@ -115,138 +91,56 @@ public int compare(NormalizationPlan plan1, NormalizationPlan plan2) { private Comparator planComparator = new PlanComparator(); /** - * Computes next most "urgent" normalization action on the table. - * Action may be either a split, or a merge, or no action. - * + * Computes next most "urgent" normalization action on the table. Action may be either a split, or + * a merge, or no action. * @param table table to normalize * @return normalization plan to execute */ @Override public List computePlanForTable(TableName table) throws HBaseIOException { if (table == null || table.isSystemTable()) { - LOG.debug("Normalization of system table " + table + " isn't allowed"); + LOG.debug("Normalization of system table {} isn't allowed", table); return null; } - boolean splitEnabled = true, mergeEnabled = true; - try { - splitEnabled = masterRpcServices.isSplitOrMergeEnabled(null, - RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.SPLIT)).getEnabled(); - } catch (org.apache.hbase.thirdparty.com.google.protobuf.ServiceException e) { - LOG.debug("Unable to determine whether split is enabled", e); - } - try { - mergeEnabled = masterRpcServices.isSplitOrMergeEnabled(null, - RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.MERGE)).getEnabled(); - } catch (org.apache.hbase.thirdparty.com.google.protobuf.ServiceException e) { - LOG.debug("Unable to determine whether merge is enabled", e); - } + boolean splitEnabled = isSplitEnabled(); + boolean mergeEnabled = isMergeEnabled(); if (!mergeEnabled && !splitEnabled) { - LOG.debug("Both split and merge are disabled for table: " + table); + LOG.debug("Both split and merge are disabled for table: {}", table); return null; } List plans = new ArrayList<>(); - List tableRegions = masterServices.getAssignmentManager().getRegionStates(). - getRegionsOfTable(table); + List tableRegions = + masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(table); - //TODO: should we make min number of regions a config param? - if (tableRegions == null || tableRegions.size() < MIN_REGION_COUNT) { + if (tableRegions == null || tableRegions.size() < minRegionCount) { int nrRegions = tableRegions == null ? 0 : tableRegions.size(); - LOG.debug("Table " + table + " has " + nrRegions + " regions, required min number" - + " of regions for normalizer to run is " + MIN_REGION_COUNT + ", not running normalizer"); + LOG.debug("Table {} has {} regions, required min number of regions for normalizer to run is " + + "{}, not running normalizer", + table, nrRegions, minRegionCount); return null; } - LOG.debug("Computing normalization plan for table: " + table + - ", number of regions: " + tableRegions.size()); + LOG.debug("Computing normalization plan for table: {}, number of regions: {}", table, + tableRegions.size()); - long totalSizeMb = 0; - int acutalRegionCnt = 0; - - for (int i = 0; i < tableRegions.size(); i++) { - RegionInfo hri = tableRegions.get(i); - long regionSize = getRegionSize(hri); - if (regionSize > 0) { - acutalRegionCnt++; - totalSizeMb += regionSize; - } - } - int targetRegionCount = -1; - long targetRegionSize = -1; - try { - TableDescriptor tableDescriptor = masterServices.getTableDescriptors().get(table); - if(tableDescriptor != null) { - targetRegionCount = - tableDescriptor.getNormalizerTargetRegionCount(); - targetRegionSize = - tableDescriptor.getNormalizerTargetRegionSize(); - LOG.debug("Table {}: target region count is {}, target region size is {}", table, - targetRegionCount, targetRegionSize); + if (splitEnabled) { + List splitPlans = getSplitNormalizationPlan(table); + if (splitPlans != null) { + plans.addAll(splitPlans); } - } catch (IOException e) { - LOG.warn( - "cannot get the target number and target size of table {}, they will be default value -1.", - table); - } - - double avgRegionSize; - if (targetRegionSize > 0) { - avgRegionSize = targetRegionSize; - } else if (targetRegionCount > 0) { - avgRegionSize = totalSizeMb / (double) targetRegionCount; - } else { - avgRegionSize = acutalRegionCnt == 0 ? 0 : totalSizeMb / (double) acutalRegionCnt; } - LOG.debug("Table " + table + ", total aggregated regions size: " + totalSizeMb); - LOG.debug("Table " + table + ", average region size: " + avgRegionSize); - - int candidateIdx = 0; - while (candidateIdx < tableRegions.size()) { - RegionInfo hri = tableRegions.get(candidateIdx); - long regionSize = getRegionSize(hri); - // if the region is > 2 times larger than average, we split it, split - // is more high priority normalization action than merge. - if (regionSize > 2 * avgRegionSize) { - if (splitEnabled) { - LOG.info("Table " + table + ", large region " + hri.getRegionNameAsString() + " has size " - + regionSize + ", more than twice avg size, splitting"); - plans.add(new SplitNormalizationPlan(hri, null)); - } - } else { - if (candidateIdx == tableRegions.size()-1) { - break; - } - if (mergeEnabled) { - RegionInfo hri2 = tableRegions.get(candidateIdx+1); - long regionSize2 = getRegionSize(hri2); - if (regionSize >= 0 && regionSize2 >= 0 && regionSize + regionSize2 < avgRegionSize) { - LOG.info("Table " + table + ", small region size: " + regionSize - + " plus its neighbor size: " + regionSize2 - + ", less than the avg size " + avgRegionSize + ", merging them"); - plans.add(new MergeNormalizationPlan(hri, hri2)); - candidateIdx++; - } - } + if (mergeEnabled) { + List mergePlans = getMergeNormalizationPlan(table); + if (mergePlans != null) { + plans.addAll(mergePlans); } - candidateIdx++; } if (plans.isEmpty()) { - LOG.debug("No normalization needed, regions look good for table: " + table); + LOG.debug("No normalization needed, regions look good for table: {}", table); return null; } Collections.sort(plans, planComparator); return plans; } - - private long getRegionSize(RegionInfo hri) { - ServerName sn = masterServices.getAssignmentManager().getRegionStates(). - getRegionServerOfRegion(hri); - RegionMetrics regionLoad = masterServices.getServerManager().getLoad(sn). - getRegionMetrics().get(hri.getRegionName()); - if (regionLoad == null) { - LOG.debug(hri.getRegionNameAsString() + " was not found in RegionsLoad"); - return -1; - } - return (long) regionLoad.getStoreFileSize().get(Size.Unit.MEGABYTE); - } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestMergeNormalizer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestMergeNormalizer.java new file mode 100644 index 000000000000..0d74255d7fdd --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestMergeNormalizer.java @@ -0,0 +1,270 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.normalizer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.when; + +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.RegionMetrics; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.Size; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.master.MasterRpcServices; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; + +@Category({ MasterTests.class, SmallTests.class }) +public class TestMergeNormalizer { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestMergeNormalizer.class); + + private static final Logger LOG = LoggerFactory.getLogger(MergeNormalizer.class); + + private static RegionNormalizer normalizer; + + // mocks + private static MasterServices masterServices; + private static MasterRpcServices masterRpcServices; + + @BeforeClass + public static void beforeAllTests() throws Exception { + normalizer = new MergeNormalizer(); + } + + @Test + public void testNoNormalizationForMetaTable() throws HBaseIOException { + TableName testTable = TableName.META_TABLE_NAME; + List hris = new ArrayList<>(); + Map regionSizes = new HashMap<>(); + + setupMocksForNormalizer(regionSizes, hris); + List plans = normalizer.computePlanForTable(testTable); + assertNull(plans); + } + + @Test + public void testNoNormalizationIfTooFewRegions() throws HBaseIOException { + TableName testTable = TableName.valueOf("testSplitOfSmallRegion"); + List hris = new ArrayList<>(); + Map regionSizes = new HashMap<>(); + + RegionInfo hri1 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("aaa")) + .setEndKey(Bytes.toBytes("bbb")).build(); + regionSizes.put(hri1.getRegionName(), 10); + + RegionInfo hri2 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("bbb")) + .setEndKey(Bytes.toBytes("ccc")).build(); + hris.add(hri2); + regionSizes.put(hri2.getRegionName(), 15); + + setupMocksForNormalizer(regionSizes, hris); + List plans = normalizer.computePlanForTable(testTable); + assertNull(plans); + } + + @Test + public void testNoNormalizationOnNormalizedCluster() throws HBaseIOException { + TableName testTable = TableName.valueOf("testSplitOfSmallRegion"); + List hris = new ArrayList<>(); + Map regionSizes = new HashMap<>(); + + RegionInfo hri1 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("aaa")) + .setEndKey(Bytes.toBytes("bbb")).build(); + hris.add(hri1); + regionSizes.put(hri1.getRegionName(), 10); + + RegionInfo hri2 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("bbb")) + .setEndKey(Bytes.toBytes("ccc")).build(); + hris.add(hri2); + regionSizes.put(hri2.getRegionName(), 15); + + RegionInfo hri3 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ccc")) + .setEndKey(Bytes.toBytes("ddd")).build(); + hris.add(hri3); + regionSizes.put(hri3.getRegionName(), 8); + + RegionInfo hri4 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ddd")) + .setEndKey(Bytes.toBytes("eee")).build(); + hris.add(hri4); + regionSizes.put(hri4.getRegionName(), 10); + + setupMocksForNormalizer(regionSizes, hris); + List plans = normalizer.computePlanForTable(testTable); + assertNull(plans); + } + + @Test + public void testMergeOfSmallRegions() throws HBaseIOException { + TableName testTable = TableName.valueOf("testMergeOfSmallRegions"); + List hris = new ArrayList<>(); + Map regionSizes = new HashMap<>(); + + Timestamp currentTime = new Timestamp(System.currentTimeMillis()); + Timestamp threedaysBefore = new Timestamp(currentTime.getTime() - TimeUnit.DAYS.toMillis(3)); + + RegionInfo hri1 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("aaa")) + .setEndKey(Bytes.toBytes("bbb")).setRegionId(threedaysBefore.getTime()).build(); + hris.add(hri1); + regionSizes.put(hri1.getRegionName(), 15); + + RegionInfo hri2 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("bbb")) + .setEndKey(Bytes.toBytes("ccc")).setRegionId(threedaysBefore.getTime()).build(); + hris.add(hri2); + regionSizes.put(hri2.getRegionName(), 5); + + RegionInfo hri3 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ccc")) + .setEndKey(Bytes.toBytes("ddd")).setRegionId(threedaysBefore.getTime()).build(); + hris.add(hri3); + regionSizes.put(hri3.getRegionName(), 5); + + RegionInfo hri4 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ddd")) + .setEndKey(Bytes.toBytes("eee")).setRegionId(threedaysBefore.getTime()).build(); + hris.add(hri4); + regionSizes.put(hri4.getRegionName(), 15); + + RegionInfo hri5 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("eee")) + .setEndKey(Bytes.toBytes("fff")).build(); + hris.add(hri5); + regionSizes.put(hri5.getRegionName(), 16); + + RegionInfo hri6 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("fff")) + .setEndKey(Bytes.toBytes("ggg")).setRegionId(threedaysBefore.getTime()).build(); + hris.add(hri6); + regionSizes.put(hri6.getRegionName(), 0); + + RegionInfo hri7 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ggg")) + .setEndKey(Bytes.toBytes("hhh")).build(); + hris.add(hri7); + regionSizes.put(hri7.getRegionName(), 0); + + setupMocksForNormalizer(regionSizes, hris); + List plans = normalizer.computePlanForTable(testTable); + + NormalizationPlan plan = plans.get(0); + assertTrue(plan instanceof MergeNormalizationPlan); + assertEquals(hri2, ((MergeNormalizationPlan) plan).getFirstRegion()); + assertEquals(hri3, ((MergeNormalizationPlan) plan).getSecondRegion()); + + // to check last 0 sized regions are merged + plan = plans.get(1); + assertEquals(hri6, ((MergeNormalizationPlan) plan).getFirstRegion()); + assertEquals(hri7, ((MergeNormalizationPlan) plan).getSecondRegion()); + } + + @Test + public void testMergeOfNewSmallRegions() throws HBaseIOException { + TableName testTable = TableName.valueOf("testMergeOfNewSmallRegions"); + List hris = new ArrayList<>(); + Map regionSizes = new HashMap<>(); + + RegionInfo hri1 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("aaa")) + .setEndKey(Bytes.toBytes("bbb")).build(); + hris.add(hri1); + regionSizes.put(hri1.getRegionName(), 15); + + RegionInfo hri2 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("bbb")) + .setEndKey(Bytes.toBytes("ccc")).build(); + hris.add(hri2); + regionSizes.put(hri2.getRegionName(), 5); + + RegionInfo hri3 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ccc")) + .setEndKey(Bytes.toBytes("ddd")).build(); + hris.add(hri3); + regionSizes.put(hri3.getRegionName(), 16); + + RegionInfo hri4 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ddd")) + .setEndKey(Bytes.toBytes("eee")).build(); + hris.add(hri4); + regionSizes.put(hri4.getRegionName(), 15); + + RegionInfo hri5 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("eee")) + .setEndKey(Bytes.toBytes("fff")).build(); + hris.add(hri4); + regionSizes.put(hri5.getRegionName(), 5); + + setupMocksForNormalizer(regionSizes, hris); + List plans = normalizer.computePlanForTable(testTable); + + assertNull(plans); + } + + @SuppressWarnings("MockitoCast") + protected void setupMocksForNormalizer(Map regionSizes, + List RegionInfo) { + masterServices = Mockito.mock(MasterServices.class, RETURNS_DEEP_STUBS); + masterRpcServices = Mockito.mock(MasterRpcServices.class, RETURNS_DEEP_STUBS); + + // for simplicity all regions are assumed to be on one server; doesn't matter to us + ServerName sn = ServerName.valueOf("localhost", 0, 1L); + when(masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(any())) + .thenReturn(RegionInfo); + when(masterServices.getAssignmentManager().getRegionStates().getRegionServerOfRegion(any())) + .thenReturn(sn); + + for (Map.Entry region : regionSizes.entrySet()) { + RegionMetrics regionLoad = Mockito.mock(RegionMetrics.class); + when(regionLoad.getRegionName()).thenReturn(region.getKey()); + when(regionLoad.getStoreFileSize()) + .thenReturn(new Size(region.getValue(), Size.Unit.MEGABYTE)); + + // this is possibly broken with jdk9, unclear if false positive or not + // suppress it for now, fix it when we get to running tests on 9 + // see: http://errorprone.info/bugpattern/MockitoCast + when((Object) masterServices.getServerManager().getLoad(sn).getRegionMetrics() + .get(region.getKey())).thenReturn(regionLoad); + } + try { + when(masterRpcServices.isSplitOrMergeEnabled(any(), any())).thenReturn( + MasterProtos.IsSplitOrMergeEnabledResponse.newBuilder().setEnabled(true).build()); + } catch (ServiceException se) { + LOG.debug("error setting isSplitOrMergeEnabled switch", se); + } + + normalizer.setMasterServices(masterServices); + normalizer.setMasterRpcServices(masterRpcServices); + } +}