Skip to content

Commit

Permalink
HBASE-24991 Replace MovedRegionsCleaner with guava cache (apache#2357)
Browse files Browse the repository at this point in the history
Signed-off-by: stack <stack@apache.org>
Signed-off-by: Guanghao Zhang <zghao@apache.org>
(cherry picked from commit 21e9c8e)

Change-Id: I19a564ad05542c4a616a06c05dd348c8e80f4465
  • Loading branch information
ArthurSXL8 authored and infraio committed Sep 21, 2020
1 parent aa9b04f commit f3f0b04
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 210 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,13 @@ public class HRegionServer extends HasThread implements
// Cache flushing
protected MemStoreFlusher cacheFlusher;

/**
* Used to cache the moved-out regions
*/
private final Cache<String, MovedRegionInfo> movedRegionInfoCache =
CacheBuilder.newBuilder().expireAfterWrite(movedRegionCacheExpiredTime(),
TimeUnit.MILLISECONDS).build();

protected HeapMemoryManager hMemManager;

/**
Expand Down Expand Up @@ -487,11 +494,6 @@ public class HRegionServer extends HasThread implements
*/
protected String clusterId;

/**
* Chore to clean periodically the moved region list
*/
private MovedRegionsCleaner movedRegionsCleaner;

// chore for refreshing store files for secondary regions
private StorefileRefresherChore storefileRefresher;

Expand Down Expand Up @@ -1110,10 +1112,6 @@ public void run() {
mobFileCache.shutdown();
}

if (movedRegionsCleaner != null) {
movedRegionsCleaner.stop("Region Server stopping");
}

// Send interrupts to wake up threads if sleeping so they notice shutdown.
// TODO: Should we check they are alive? If OOME could have exited already
if (this.hMemManager != null) this.hMemManager.stop();
Expand Down Expand Up @@ -2010,13 +2008,24 @@ private void startServices() throws IOException {
Threads.setDaemonThreadRunning(this.procedureResultReporter,
getName() + ".procedureResultReporter", uncaughtExceptionHandler);

if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker);
if (this.periodicFlusher != null) choreService.scheduleChore(periodicFlusher);
if (this.healthCheckChore != null) choreService.scheduleChore(healthCheckChore);
if (this.nonceManagerChore != null) choreService.scheduleChore(nonceManagerChore);
if (this.storefileRefresher != null) choreService.scheduleChore(storefileRefresher);
if (this.movedRegionsCleaner != null) choreService.scheduleChore(movedRegionsCleaner);
if (this.fsUtilizationChore != null) choreService.scheduleChore(fsUtilizationChore);
if (this.compactionChecker != null) {
choreService.scheduleChore(compactionChecker);
}
if (this.periodicFlusher != null) {
choreService.scheduleChore(periodicFlusher);
}
if (this.healthCheckChore != null) {
choreService.scheduleChore(healthCheckChore);
}
if (this.nonceManagerChore != null) {
choreService.scheduleChore(nonceManagerChore);
}
if (this.storefileRefresher != null) {
choreService.scheduleChore(storefileRefresher);
}
if (this.fsUtilizationChore != null) {
choreService.scheduleChore(fsUtilizationChore);
}

// Leases is not a Thread. Internally it runs a daemon thread. If it gets
// an unhandled exception, it will just exit.
Expand Down Expand Up @@ -2062,9 +2071,6 @@ private void initializeThreads() throws IOException {
this.periodicFlusher = new PeriodicMemStoreFlusher(this.flushCheckFrequency, this);
this.leases = new Leases(this.threadWakeFrequency);

// Create the thread to clean the moved regions list
movedRegionsCleaner = MovedRegionsCleaner.create(this);

if (this.nonceManager != null) {
// Create the scheduled chore that cleans up nonces.
nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
Expand Down Expand Up @@ -2525,7 +2531,6 @@ protected void stopServiceThreads() {
choreService.cancelChore(periodicFlusher);
choreService.cancelChore(healthCheckChore);
choreService.cancelChore(storefileRefresher);
choreService.cancelChore(movedRegionsCleaner);
choreService.cancelChore(fsUtilizationChore);
// clean up the remaining scheduled chores (in case we missed out any)
choreService.shutdown();
Expand Down Expand Up @@ -3477,12 +3482,10 @@ public ServerNonceManager getNonceManager() {
private static class MovedRegionInfo {
private final ServerName serverName;
private final long seqNum;
private final long ts;

public MovedRegionInfo(ServerName serverName, long closeSeqNum) {
this.serverName = serverName;
this.seqNum = closeSeqNum;
ts = EnvironmentEdgeManager.currentTime();
}

public ServerName getServerName() {
Expand All @@ -3492,18 +3495,12 @@ public ServerName getServerName() {
public long getSeqNum() {
return seqNum;
}

public long getMoveTime() {
return ts;
}
}

// This map will contains all the regions that we closed for a move.
// We add the time it was moved as we don't want to keep too old information
protected Map<String, MovedRegionInfo> movedRegions = new ConcurrentHashMap<>(3000);

// We need a timeout. If not there is a risk of giving a wrong information: this would double
// the number of network calls instead of reducing them.
/**
* We need a timeout. If not there is a risk of giving a wrong information: this would double
* the number of network calls instead of reducing them.
*/
private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);

protected void addToMovedRegions(String encodedName, ServerName destination
Expand All @@ -3514,92 +3511,23 @@ protected void addToMovedRegions(String encodedName, ServerName destination
}
LOG.info("Adding " + encodedName + " move to " + destination + " record at close sequenceid=" +
closeSeqNum);
movedRegions.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
movedRegionInfoCache.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
}

void removeFromMovedRegions(String encodedName) {
movedRegions.remove(encodedName);
}

private MovedRegionInfo getMovedRegion(final String encodedRegionName) {
MovedRegionInfo dest = movedRegions.get(encodedRegionName);

long now = EnvironmentEdgeManager.currentTime();
if (dest != null) {
if (dest.getMoveTime() > (now - TIMEOUT_REGION_MOVED)) {
return dest;
} else {
movedRegions.remove(encodedRegionName);
}
}

return null;
movedRegionInfoCache.invalidate(encodedName);
}

/**
* Remove the expired entries from the moved regions list.
*/
protected void cleanMovedRegions() {
final long cutOff = System.currentTimeMillis() - TIMEOUT_REGION_MOVED;
Iterator<Entry<String, MovedRegionInfo>> it = movedRegions.entrySet().iterator();

while (it.hasNext()){
Map.Entry<String, MovedRegionInfo> e = it.next();
if (e.getValue().getMoveTime() < cutOff) {
it.remove();
}
}
@VisibleForTesting
public MovedRegionInfo getMovedRegion(String encodedRegionName) {
return movedRegionInfoCache.getIfPresent(encodedRegionName);
}

/*
* Use this to allow tests to override and schedule more frequently.
*/

protected int movedRegionCleanerPeriod() {
@VisibleForTesting
public int movedRegionCacheExpiredTime() {
return TIMEOUT_REGION_MOVED;
}

/**
* Creates a Chore thread to clean the moved region cache.
*/
protected final static class MovedRegionsCleaner extends ScheduledChore implements Stoppable {
private HRegionServer regionServer;
Stoppable stoppable;

private MovedRegionsCleaner(
HRegionServer regionServer, Stoppable stoppable){
super("MovedRegionsCleaner for region " + regionServer, stoppable,
regionServer.movedRegionCleanerPeriod());
this.regionServer = regionServer;
this.stoppable = stoppable;
}

static MovedRegionsCleaner create(HRegionServer rs){
Stoppable stoppable = new Stoppable() {
private volatile boolean isStopped = false;
@Override public void stop(String why) { isStopped = true;}
@Override public boolean isStopped() {return isStopped;}
};

return new MovedRegionsCleaner(rs, stoppable);
}

@Override
protected void chore() {
regionServer.cleanMovedRegions();
}

@Override
public void stop(String why) {
stoppable.stop(why);
}

@Override
public boolean isStopped() {
return stoppable.isStopped();
}
}

private String getMyEphemeralNodePath() {
return ZNodePaths.joinZNode(this.zooKeeper.getZNodePaths().rsZNode, getServerName().toString());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;

import java.io.IOException;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;

/**
* Test whether moved region cache is correct
*/
@Category({ MiscTests.class, MediumTests.class })
public class TestMovedRegionCache {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMovedRegionCache.class);

@Rule
public TestName name = new TestName();

private HBaseTestingUtility UTIL;
private MiniZooKeeperCluster zkCluster;
private HRegionServer source;
private HRegionServer dest;
private RegionInfo movedRegionInfo;

@Before
public void setup() throws Exception {
UTIL = new HBaseTestingUtility();
zkCluster = UTIL.startMiniZKCluster();
StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(2).build();
MiniHBaseCluster cluster = UTIL.startMiniHBaseCluster(option);
source = cluster.getRegionServer(0);
dest = cluster.getRegionServer(1);
assertEquals(2, cluster.getRegionServerThreads().size());
TableName tableName = TableName.valueOf(name.getMethodName());
UTIL.createTable(tableName, Bytes.toBytes("cf"));
UTIL.waitTableAvailable(tableName, 30_000);
movedRegionInfo = Iterables.getOnlyElement(cluster.getRegions(tableName)).getRegionInfo();
UTIL.getAdmin().move(movedRegionInfo.getEncodedNameAsBytes(), source.getServerName());
UTIL.waitFor(2000, new Waiter.Predicate<IOException>() {
@Override
public boolean evaluate() throws IOException {
return source.getOnlineRegion(movedRegionInfo.getRegionName()) != null;
}
});
}

@After
public void after() throws Exception {
UTIL.shutdownMiniCluster();
if (zkCluster != null) {
zkCluster.shutdown();
}
}

@Test
public void testMovedRegionsCache() throws IOException, InterruptedException {
UTIL.getAdmin().move(movedRegionInfo.getEncodedNameAsBytes(), dest.getServerName());
UTIL.waitFor(2000, new Waiter.Predicate<IOException>() {
@Override
public boolean evaluate() throws IOException {
return dest.getOnlineRegion(movedRegionInfo.getRegionName()) != null;
}
});
assertNotNull("Moved region NOT in the cache!", source.getMovedRegion(
movedRegionInfo.getEncodedName()));
Thread.sleep(source.movedRegionCacheExpiredTime());
assertNull("Expired moved region exist in the cache!", source.getMovedRegion(
movedRegionInfo.getEncodedName()));
}
}
Loading

0 comments on commit f3f0b04

Please sign in to comment.