Skip to content

Commit

Permalink
HBASE-28342 Decommissioned hosts should be rejected by the HMaster (a…
Browse files Browse the repository at this point in the history
…pache#5681)

Signed-off by: Nick Dimiduk <ndimiduk@apache.org>
  • Loading branch information
aalhour authored and ndimiduk committed Feb 23, 2024
1 parent ea1c057 commit ed7c850
Show file tree
Hide file tree
Showing 6 changed files with 215 additions and 23 deletions.
14 changes: 14 additions & 0 deletions hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -1709,6 +1709,20 @@ public enum OperationStatusCode {
*/
public final static boolean HBASE_SERVER_USEIP_ENABLED_DEFAULT = false;

/**
* Should the HMaster reject hosts of decommissioned RegionServers, bypass matching their port and
* startcode parts of their ServerName or not? When True, the HMaster will reject a RegionServer's
* request to `reportForDuty` if it's hostname exists in the list of decommissioned RegionServers
* it maintains internally. Added in HBASE-28342.
*/
public final static String REJECT_DECOMMISSIONED_HOSTS_KEY =
"hbase.master.reject.decommissioned.hosts";

/**
* Default value of {@link #REJECT_DECOMMISSIONED_HOSTS_KEY}
*/
public final static boolean REJECT_DECOMMISSIONED_HOSTS_DEFAULT = false;

private HConstants() {
// Can't be instantiated with this ctor.
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;

import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public class DecommissionedHostRejectedException extends HBaseIOException {
public DecommissionedHostRejectedException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,6 @@ public HMaster(final Configuration conf) throws IOException {
HConstants.DEFAULT_HBASE_MASTER_BALANCER_MAX_RIT_PERCENT);

// Do we publish the status?

boolean shouldPublish =
conf.getBoolean(HConstants.STATUS_PUBLISHED, HConstants.STATUS_PUBLISHED_DEFAULT);
Class<? extends ClusterStatusPublisher.Publisher> publisherClass =
Expand Down Expand Up @@ -967,7 +966,10 @@ private void finishActiveMasterInitialization() throws IOException, InterruptedE
masterRegion = MasterRegionFactory.create(this);
rsListStorage = new MasterRegionServerList(masterRegion, this);

// Initialize the ServerManager and register it as a configuration observer
this.serverManager = createServerManager(this, rsListStorage);
this.configurationManager.registerObserver(this.serverManager);

if (
!conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
Expand All @@ -44,6 +45,7 @@
import org.apache.hadoop.hbase.YouAreDeadException;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
Expand Down Expand Up @@ -88,7 +90,7 @@
* only after the handler is fully enabled and has completed the handling.
*/
@InterfaceAudience.Private
public class ServerManager {
public class ServerManager implements ConfigurationObserver {
public static final String WAIT_ON_REGIONSERVERS_MAXTOSTART =
"hbase.master.wait.on.regionservers.maxtostart";

Expand Down Expand Up @@ -140,6 +142,9 @@ public class ServerManager {
/** Listeners that are called on server events. */
private List<ServerListener> listeners = new CopyOnWriteArrayList<>();

/** Configured value of HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY */
private volatile boolean rejectDecommissionedHostsConfig;

/**
* Constructor.
*/
Expand All @@ -152,6 +157,35 @@ public ServerManager(final MasterServices master, RegionServerList storage) {
this.connection = master.getClusterConnection();
this.rpcControllerFactory =
this.connection == null ? null : connection.getRpcControllerFactory();
rejectDecommissionedHostsConfig = getRejectDecommissionedHostsConfig(c);
}

/**
* Implementation of the ConfigurationObserver interface. We are interested in live-loading the
* configuration value of HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY
* @param conf Server configuration instance
*/
@Override
public void onConfigurationChange(Configuration conf) {
final boolean newValue = getRejectDecommissionedHostsConfig(conf);
if (rejectDecommissionedHostsConfig == newValue) {
// no-op
return;
}

LOG.info("Config Reload for RejectDecommissionedHosts. previous value: {}, new value: {}",
rejectDecommissionedHostsConfig, newValue);

rejectDecommissionedHostsConfig = newValue;
}

/**
* Reads the value of HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY from the config and returns it
* @param conf Configuration instance of the Master
*/
public boolean getRejectDecommissionedHostsConfig(Configuration conf) {
return conf.getBoolean(HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY,
HConstants.REJECT_DECOMMISSIONED_HOSTS_DEFAULT);
}

/**
Expand Down Expand Up @@ -196,11 +230,14 @@ ServerName regionServerStartup(RegionServerStartupRequest request, int versionNu
final String hostname =
request.hasUseThisHostnameInstead() ? request.getUseThisHostnameInstead() : isaHostName;
ServerName sn = ServerName.valueOf(hostname, request.getPort(), request.getServerStartCode());

// Check if the host should be rejected based on it's decommissioned status
checkRejectableDecommissionedStatus(sn);

checkClockSkew(sn, request.getServerCurrentTime());
checkIsDead(sn, "STARTUP");
if (!checkAndRecordNewServer(sn, ServerMetricsBuilder.of(sn, versionNumber, version))) {
LOG.warn(
"THIS SHOULD NOT HAPPEN, RegionServerStartup" + " could not record the server: " + sn);
LOG.warn("THIS SHOULD NOT HAPPEN, RegionServerStartup could not record the server: {}", sn);
}
storage.started(sn);
return sn;
Expand Down Expand Up @@ -262,6 +299,42 @@ public void regionServerReport(ServerName sn, ServerMetrics sl) throws YouAreDea
updateLastFlushedSequenceIds(sn, sl);
}

/**
* Checks if the Master is configured to reject decommissioned hosts or not. When it's configured
* to do so, any RegionServer trying to join the cluster will have it's host checked against the
* list of hosts of currently decommissioned servers and potentially get prevented from reporting
* for duty; otherwise, we do nothing and we let them pass to the next check. See HBASE-28342 for
* details.
* @param sn The ServerName to check for
* @throws DecommissionedHostRejectedException if the Master is configured to reject
* decommissioned hosts and this host exists in the
* list of the decommissioned servers
*/
private void checkRejectableDecommissionedStatus(ServerName sn)
throws DecommissionedHostRejectedException {
LOG.info("Checking decommissioned status of RegionServer {}", sn.getServerName());

// If the Master is not configured to reject decommissioned hosts, return early.
if (!rejectDecommissionedHostsConfig) {
return;
}

// Look for a match for the hostname in the list of decommissioned servers
for (ServerName server : getDrainingServersList()) {
if (Objects.equals(server.getHostname(), sn.getHostname())) {
// Found a match and master is configured to reject decommissioned hosts, throw exception!
LOG.warn(
"Rejecting RegionServer {} from reporting for duty because Master is configured "
+ "to reject decommissioned hosts and this host was marked as such in the past.",
sn.getServerName());
throw new DecommissionedHostRejectedException(String.format(
"Host %s exists in the list of decommissioned servers and Master is configured to "
+ "reject decommissioned hosts",
sn.getHostname()));
}
}
}

/**
* Check is a server of same host and port already exists, if not, or the existed one got a
* smaller start code, record it.
Expand Down Expand Up @@ -607,13 +680,8 @@ public synchronized void moveFromOnlineToDeadServers(final ServerName sn) {
* Remove the server from the drain list.
*/
public synchronized boolean removeServerFromDrainList(final ServerName sn) {
// Warn if the server (sn) is not online. ServerName is of the form:
// <hostname> , <port> , <startcode>
LOG.info("Removing server {} from the draining list.", sn);

if (!this.isServerOnline(sn)) {
LOG.warn("Server " + sn + " is not currently online. "
+ "Removing from draining list anyway, as requested.");
}
// Remove the server from the draining servers lists.
return this.drainingServers.remove(sn);
}
Expand All @@ -623,22 +691,23 @@ public synchronized boolean removeServerFromDrainList(final ServerName sn) {
* @return True if the server is added or the server is already on the drain list.
*/
public synchronized boolean addServerToDrainList(final ServerName sn) {
// Warn if the server (sn) is not online. ServerName is of the form:
// <hostname> , <port> , <startcode>

if (!this.isServerOnline(sn)) {
LOG.warn("Server " + sn + " is not currently online. "
+ "Ignoring request to add it to draining list.");
// If master is not rejecting decommissioned hosts, warn if the server (sn) is not online.
// However, we want to add servers even if they're not online if the master is configured
// to reject decommissioned hosts
if (!rejectDecommissionedHostsConfig && !this.isServerOnline(sn)) {
LOG.warn("Server {} is not currently online. Ignoring request to add it to draining list.",
sn);
return false;
}
// Add the server to the draining servers lists, if it's not already in
// it.

// Add the server to the draining servers lists, if it's not already in it.
if (this.drainingServers.contains(sn)) {
LOG.warn("Server " + sn + " is already in the draining server list."
+ "Ignoring request to add it again.");
LOG.warn(
"Server {} is already in the draining server list. Ignoring request to add it again.", sn);
return true;
}
LOG.info("Server " + sn + " added to draining server list.");

LOG.info("Server {} added to draining server list.", sn);
return this.drainingServers.add(sn);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.master.DecommissionedHostRejectedException;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.mob.MobFileCache;
Expand Down Expand Up @@ -3073,6 +3074,11 @@ private RegionServerStartupResponse reportForDuty() throws IOException {
LOG.error(HBaseMarkers.FATAL, "Master rejected startup because clock is out of sync", ioe);
// Re-throw IOE will cause RS to abort
throw ioe;
} else if (ioe instanceof DecommissionedHostRejectedException) {
LOG.error(HBaseMarkers.FATAL,
"Master rejected startup because the host is considered decommissioned", ioe);
// Re-throw IOE will cause RS to abort
throw ioe;
} else if (ioe instanceof ServerNotRunningYetException) {
LOG.debug("Master is not running yet");
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@
*/
package org.apache.hadoop.hbase.regionserver;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.*;

import java.io.IOException;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -30,9 +35,11 @@
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.LocalHBaseCluster;
import org.apache.hadoop.hbase.MatcherPredicate;
import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.master.DecommissionedHostRejectedException;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.master.ServerManager;
Expand Down Expand Up @@ -253,6 +260,72 @@ public void run() {
waitForClusterOnline(master);
}

/**
* Tests that the RegionServer's reportForDuty gets rejected by the master when the master is
* configured to reject decommissioned hosts and when there is a match for the joining
* RegionServer in the list of decommissioned servers. Test case for HBASE-28342.
*/
@Test
public void testReportForDutyGetsRejectedByMasterWhenConfiguredToRejectDecommissionedHosts()
throws Exception {
// Start a master and wait for it to become the active/primary master.
// Use a random unique port
cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort());
cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1);

// Set the cluster to reject decommissioned hosts
cluster.getConfiguration().setBoolean(HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY, true);

master = cluster.addMaster();
rs = cluster.addRegionServer();
master.start();
rs.start();
waitForClusterOnline(master);

// Add a second decommissioned region server to the cluster, wait for it to fail reportForDuty
LogCapturer capturer =
new LogCapturer((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager
.getLogger(HRegionServer.class));

rs2 = cluster.addRegionServer();
master.getMaster().decommissionRegionServers(
Collections.singletonList(rs2.getRegionServer().getServerName()), false);
rs2.start();

// Assert that the second regionserver has aborted
testUtil.waitFor(TimeUnit.SECONDS.toMillis(90),
new MatcherPredicate<>(() -> rs2.getRegionServer().isAborted(), is(true)));

// Assert that the log messages for DecommissionedHostRejectedException exist in the logs
capturer.stopCapturing();

assertThat(capturer.getOutput(),
containsString("Master rejected startup because the host is considered decommissioned"));

/**
* Assert that the following log message occurred (one line):
* "org.apache.hadoop.hbase.master.DecommissionedHostRejectedException:
* org.apache.hadoop.hbase.master.DecommissionedHostRejectedException: Host localhost exists in
* the list of decommissioned servers and Master is configured to reject decommissioned hosts"
*/
assertThat(Arrays.asList(capturer.getOutput().split("\n")),
hasItem(allOf(containsString(DecommissionedHostRejectedException.class.getSimpleName()),
containsString(DecommissionedHostRejectedException.class.getSimpleName()),
containsString("Host " + rs2.getRegionServer().getServerName().getHostname()
+ " exists in the list of decommissioned servers and Master is configured to reject"
+ " decommissioned hosts"))));

assertThat(Arrays.asList(capturer.getOutput().split("\n")),
hasItem(
allOf(containsString("ABORTING region server " + rs2.getRegionServer().getServerName()),
containsString("Unhandled"),
containsString(DecommissionedHostRejectedException.class.getSimpleName()),
containsString("Host " + rs2.getRegionServer().getServerName().getHostname()
+ " exists in the list of decommissioned servers and Master is configured to reject"
+ " decommissioned hosts"))));
}

/**
* Tests region sever reportForDuty with a non-default environment edge
*/
Expand Down

0 comments on commit ed7c850

Please sign in to comment.