Skip to content

Commit

Permalink
[apache#1711][FOLLOWUP] feat(coordinator): refactor the reconfigurabl…
Browse files Browse the repository at this point in the history
…e conf (apache#1741)

Refactor the reconfigurable conf for coordinator side.

Follow up: apache#1711

No.

Unit tests
  • Loading branch information
zuston committed May 29, 2024
1 parent 8e6d2a9 commit 52ccda9
Show file tree
Hide file tree
Showing 10 changed files with 18 additions and 267 deletions.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.config.Reconfigurable;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.coordinator.access.AccessCheckResult;
import org.apache.uniffle.coordinator.access.AccessInfo;
import org.apache.uniffle.coordinator.access.checker.AccessChecker;
import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;

public class AccessManager implements Reconfigurable {
public class AccessManager {

private static final Logger LOG = LoggerFactory.getLogger(AccessManager.class);

Expand Down Expand Up @@ -110,22 +108,4 @@ public void close() throws IOException {
checker.close();
}
}

public boolean isPropertyReconfigurable(String property) {
for (AccessChecker checker : accessCheckers) {
if (checker instanceof Reconfigurable
&& ((Reconfigurable) checker).isPropertyReconfigurable(property)) {
return true;
}
}
return false;
}

public void reconfigure(RssConf conf) {
for (AccessChecker checker : accessCheckers) {
if (checker instanceof Reconfigurable) {
((Reconfigurable) checker).reconfigure(conf);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
import java.util.List;
import java.util.Set;

import org.apache.uniffle.common.config.Reconfigurable;

public interface ClusterManager extends Closeable, Reconfigurable {
public interface ClusterManager extends Closeable {

/**
* Add a server to the cluster.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@
import picocli.CommandLine;

import org.apache.uniffle.common.Arguments;
import org.apache.uniffle.common.config.ReconfigurableBase;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.ReconfigurableConfManager;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.metrics.GRPCMetrics;
import org.apache.uniffle.common.metrics.JvmMetrics;
Expand Down Expand Up @@ -55,7 +54,7 @@
import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KRB5_CONF_FILE;

/** The main entrance of coordinator service */
public class CoordinatorServer extends ReconfigurableBase {
public class CoordinatorServer {

private static final Logger LOG = LoggerFactory.getLogger(CoordinatorServer.class);

Expand All @@ -72,7 +71,6 @@ public class CoordinatorServer extends ReconfigurableBase {
private String id;

public CoordinatorServer(CoordinatorConf coordinatorConf) throws Exception {
super(coordinatorConf);
this.coordinatorConf = coordinatorConf;
try {
initialization();
Expand All @@ -91,8 +89,7 @@ public static void main(String[] args) throws Exception {

// Load configuration from config files
final CoordinatorConf coordinatorConf = new CoordinatorConf(configFile);

coordinatorConf.setString(ReconfigurableBase.RECONFIGURABLE_FILE_NAME, configFile);
ReconfigurableConfManager.init(coordinatorConf, configFile);

// Start the coordinator service
final CoordinatorServer coordinatorServer = new CoordinatorServer(coordinatorConf);
Expand All @@ -102,7 +99,6 @@ public static void main(String[] args) throws Exception {
}

public void start() throws Exception {
startReconfigureThread();
jettyServer.start();
server.start();
if (metricReporter != null) {
Expand Down Expand Up @@ -145,7 +141,6 @@ public void stopServer() throws Exception {
metricReporter.stop();
LOG.info("Metric Reporter Stopped!");
}
stopReconfigureThread();
SecurityContextFactory.get().getSecurityContext().close();
server.stop();
}
Expand Down Expand Up @@ -272,27 +267,4 @@ public GRPCMetrics getGrpcMetrics() {
protected void blockUntilShutdown() throws InterruptedException {
server.blockUntilShutdown();
}

@Override
public void reconfigure(RssConf conf) {
clusterManager.reconfigure(conf);
accessManager.reconfigure(conf);
}

@Override
public boolean isPropertyReconfigurable(String property) {
if (clusterManager.isPropertyReconfigurable(property)) {
return true;
}
if (accessManager.isPropertyReconfigurable(property)) {
return true;
}
return false;
}

@Override
public RssConf reloadConfiguration() {
return new CoordinatorConf(
coordinatorConf.getString(ReconfigurableBase.RECONFIGURABLE_FILE_NAME, ""));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@
import org.apache.uniffle.client.impl.grpc.ShuffleServerInternalGrpcClient;
import org.apache.uniffle.client.request.RssCancelDecommissionRequest;
import org.apache.uniffle.client.request.RssDecommissionRequest;
import org.apache.uniffle.common.ReconfigurableConfManager;
import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.InvalidRequestException;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
Expand All @@ -72,7 +72,7 @@ public class SimpleClusterManager implements ClusterManager {
private Map<String, Set<ServerNode>> tagToNodes = JavaUtils.newConcurrentMap();
private AtomicLong excludeLastModify = new AtomicLong(0L);
private long heartbeatTimeout;
private volatile int shuffleNodesMax;
private ReconfigurableConfManager.Reconfigurable<Integer> shuffleNodesMax;
private ScheduledExecutorService scheduledExecutorService;
private ScheduledExecutorService checkNodesExecutorService;
private FileSystem hadoopFileSystem;
Expand All @@ -86,7 +86,8 @@ public class SimpleClusterManager implements ClusterManager {
private boolean readyForServe = false;

public SimpleClusterManager(CoordinatorConf conf, Configuration hadoopConf) throws Exception {
this.shuffleNodesMax = conf.getInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
this.shuffleNodesMax =
ReconfigurableConfManager.register(conf, CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
this.heartbeatTimeout = conf.getLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT);
// the thread for checking if shuffle server report heartbeat in time
scheduledExecutorService =
Expand Down Expand Up @@ -311,7 +312,7 @@ public void clear() {

@Override
public int getShuffleNodesMax() {
return shuffleNodesMax;
return shuffleNodesMax.get();
}

@Override
Expand Down Expand Up @@ -392,18 +393,4 @@ public void setStartupSilentPeriodEnabled(boolean startupSilentPeriodEnabled) {
public Map<String, ServerNode> getServers() {
return servers;
}

@Override
public void reconfigure(RssConf conf) {
int nodeMax = conf.getInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
if (nodeMax != shuffleNodesMax) {
LOG.warn("Coordinator update new shuffleNodesMax {}", nodeMax);
shuffleNodesMax = nodeMax;
}
}

@Override
public boolean isPropertyReconfigurable(String property) {
return CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX.key().equals(property);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.ReconfigurableConfManager;
import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.common.config.Reconfigurable;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.coordinator.AccessManager;
import org.apache.uniffle.coordinator.ClusterManager;
Expand All @@ -42,15 +41,16 @@
* AccessClusterLoadChecker use the cluster load metrics including memory and healthy to filter and
* count available nodes numbers and reject if the number do not reach the threshold.
*/
public class AccessClusterLoadChecker extends AbstractAccessChecker implements Reconfigurable {
public class AccessClusterLoadChecker extends AbstractAccessChecker {

private static final Logger LOG = LoggerFactory.getLogger(AccessClusterLoadChecker.class);

private final ClusterManager clusterManager;
private final double memoryPercentThreshold;
// The hard constraint number of available shuffle servers
private final int availableServerNumThreshold;
private volatile int defaultRequiredShuffleServerNumber;
private volatile ReconfigurableConfManager.Reconfigurable<Integer>
defaultRequiredShuffleServerNumber;

public AccessClusterLoadChecker(AccessManager accessManager) throws Exception {
super(accessManager);
Expand All @@ -61,7 +61,7 @@ public AccessClusterLoadChecker(AccessManager accessManager) throws Exception {
this.availableServerNumThreshold =
conf.getInteger(CoordinatorConf.COORDINATOR_ACCESS_LOADCHECKER_SERVER_NUM_THRESHOLD, -1);
this.defaultRequiredShuffleServerNumber =
conf.get(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
ReconfigurableConfManager.register(conf, CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
}

@Override
Expand All @@ -85,7 +85,7 @@ public AccessCheckResult check(AccessInfo accessInfo) {
if (availableServerNumThreshold == -1) {
String requiredNodesNumRaw =
accessInfo.getExtraProperties().get(ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM);
int requiredNodesNum = defaultRequiredShuffleServerNumber;
int requiredNodesNum = defaultRequiredShuffleServerNumber.get();
if (StringUtils.isNotEmpty(requiredNodesNumRaw)
&& Integer.parseInt(requiredNodesNumRaw) > 0) {
requiredNodesNum = Integer.parseInt(requiredNodesNumRaw);
Expand Down Expand Up @@ -126,18 +126,4 @@ public int getAvailableServerNumThreshold() {
}

public void close() {}

@Override
public void reconfigure(RssConf conf) {
int nodeMax = conf.get(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
if (nodeMax != defaultRequiredShuffleServerNumber) {
LOG.warn("Coordinator update new defaultRequiredShuffleServerNumber {}.", nodeMax);
defaultRequiredShuffleServerNumber = nodeMax;
}
}

@Override
public boolean isPropertyReconfigurable(String property) {
return CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX.key().equals(property);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import picocli.CommandLine;

import org.apache.uniffle.common.Arguments;
import org.apache.uniffle.common.config.ReconfigurableBase;
import org.apache.uniffle.common.ReconfigurableConfManager;
import org.apache.uniffle.coordinator.ApplicationManager;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.coordinator.CoordinatorServer;
Expand Down Expand Up @@ -50,8 +50,7 @@ public static void main(String[] args) throws Exception {

// Load configuration from config files
final CoordinatorConf coordinatorConf = new CoordinatorConf(configFile);

coordinatorConf.setString(ReconfigurableBase.RECONFIGURABLE_FILE_NAME, configFile);
ReconfigurableConfManager.init(coordinatorConf, configFile);

// Start the coordinator service
final CoordinatorTestServer coordinatorServer = new CoordinatorTestServer(coordinatorConf);
Expand Down
Loading

0 comments on commit 52ccda9

Please sign in to comment.