Skip to content

Commit

Permalink
[apache#1709] feat(coordinator): Introduce pluggable `ClientConfApply…
Browse files Browse the repository at this point in the history
…Strategy` for `fetchClientConf` rpc (apache#1710)

### What changes were proposed in this pull request?

Introduce pluggable ClientConfApplyManager for fetchClientConf rpc

### Why are the changes needed?

For apache#1709 

### Does this PR introduce _any_ user-facing change?

Yes.

### How was this patch tested?

Unit tests
  • Loading branch information
zuston committed May 16, 2024
1 parent de4b261 commit 93f4347
Show file tree
Hide file tree
Showing 13 changed files with 388 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -29,6 +30,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.spark.MapOutputTracker;
import org.apache.spark.MapOutputTrackerMaster;
import org.apache.spark.SparkConf;
Expand Down Expand Up @@ -356,9 +358,16 @@ protected static void fetchAndApplyDynamicConf(SparkConf sparkConf) {
sparkConf.getInt(
RssSparkConfig.RSS_ACCESS_TIMEOUT_MS.key(),
RssSparkConfig.RSS_ACCESS_TIMEOUT_MS.defaultValue().get());
String user;
try {
user = UserGroupInformation.getCurrentUser().getShortUserName();
} catch (Exception e) {
throw new RssException("Errors on getting current user.", e);
}
RssFetchClientConfRequest request =
new RssFetchClientConfRequest(timeoutMs, user, Collections.emptyMap());
for (CoordinatorClient client : coordinatorClients) {
RssFetchClientConfResponse response =
client.fetchClientConf(new RssFetchClientConfRequest(timeoutMs));
RssFetchClientConfResponse response = client.fetchClientConf(request);
if (response.getStatusCode() == StatusCode.SUCCESS) {
LOG.info("Success to get conf from {}", client.getDesc());
RssSparkShuffleUtils.applyDynamicClientConf(sparkConf, response.getClientConf());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,14 @@ public static String generatePartitionKey(String appId, Integer shuffleId, Integ
Constants.KEY_SPLIT_CHAR, appId, String.valueOf(shuffleId), String.valueOf(partition));
}

public static <T> T loadExtension(Class<T> extCls, String clsPackage, Object obj) {
List<T> exts = loadExtensions(extCls, Arrays.asList(clsPackage), obj);
if (exts != null && exts.size() == 1) {
return exts.get(0);
}
throw new IllegalArgumentException("No such extension for " + clsPackage);
}

@SuppressWarnings("unchecked")
public static <T> List<T> loadExtensions(Class<T> extClass, List<String> classes, Object obj) {
if (classes == null || classes.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@
*/
public class CoordinatorConf extends RssBaseConf {

public static final ConfigOption<String> COORDINATOR_CLIENT_CONF_APPLY_STRATEGY =
ConfigOptions.key("rss.coordinator.client.confApplyStrategy")
.stringType()
.defaultValue("org.apache.uniffle.coordinator.conf.BypassRssClientConfApplyStrategy")
.withDescription(
"The client conf apply strategy which is used on fetchClientConf rpc interface.");

public static final ConfigOption<String> COORDINATOR_EXCLUDE_NODES_FILE_PATH =
ConfigOptions.key("rss.coordinator.exclude.nodes.file.path")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.apache.uniffle.common.storage.StorageInfoUtils;
import org.apache.uniffle.coordinator.access.AccessCheckResult;
import org.apache.uniffle.coordinator.access.AccessInfo;
import org.apache.uniffle.coordinator.conf.DynamicClientConfService;
import org.apache.uniffle.coordinator.conf.RssClientConfFetchInfo;
import org.apache.uniffle.coordinator.strategy.assignment.PartitionRangeAssignment;
import org.apache.uniffle.coordinator.util.CoordinatorUtils;
import org.apache.uniffle.proto.CoordinatorServerGrpc;
Expand All @@ -50,6 +50,7 @@
import org.apache.uniffle.proto.RssProtos.ApplicationInfoResponse;
import org.apache.uniffle.proto.RssProtos.CheckServiceAvailableResponse;
import org.apache.uniffle.proto.RssProtos.ClientConfItem;
import org.apache.uniffle.proto.RssProtos.FetchClientConfRequest;
import org.apache.uniffle.proto.RssProtos.FetchClientConfResponse;
import org.apache.uniffle.proto.RssProtos.FetchRemoteStorageRequest;
import org.apache.uniffle.proto.RssProtos.FetchRemoteStorageResponse;
Expand Down Expand Up @@ -297,9 +298,22 @@ public void accessCluster(
responseObserver.onCompleted();
}

/** To be compatible with the older client version. */
@Override
public void fetchClientConf(
Empty empty, StreamObserver<FetchClientConfResponse> responseObserver) {
fetchClientConfImpl(RssClientConfFetchInfo.EMPTY_CLIENT_CONF_FETCH_INFO, responseObserver);
}

@Override
public void fetchClientConfV2(
FetchClientConfRequest request, StreamObserver<FetchClientConfResponse> responseObserver) {
fetchClientConfImpl(RssClientConfFetchInfo.fromProto(request), responseObserver);
}

private void fetchClientConfImpl(
RssClientConfFetchInfo rssClientConfFetchInfo,
StreamObserver<FetchClientConfResponse> responseObserver) {
FetchClientConfResponse response;
FetchClientConfResponse.Builder builder =
FetchClientConfResponse.newBuilder().setStatus(StatusCode.SUCCESS);
Expand All @@ -308,9 +322,9 @@ public void fetchClientConf(
.getCoordinatorConf()
.getBoolean(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_ENABLED);
if (dynamicConfEnabled) {
DynamicClientConfService dynamicClientConfService =
coordinatorServer.getDynamicClientConfService();
for (Map.Entry<String, String> kv : dynamicClientConfService.getRssClientConf().entrySet()) {
Map<String, String> clientConfigs =
coordinatorServer.getClientConfApplyManager().apply(rssClientConfFetchInfo);
for (Map.Entry<String, String> kv : clientConfigs.entrySet()) {
builder.addClientConf(
ClientConfItem.newBuilder().setKey(kv.getKey()).setValue(kv.getValue()).build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.uniffle.common.web.JettyServer;
import org.apache.uniffle.coordinator.conf.ClientConf;
import org.apache.uniffle.coordinator.conf.DynamicClientConfService;
import org.apache.uniffle.coordinator.conf.RssClientConfApplyManager;
import org.apache.uniffle.coordinator.metric.CoordinatorGrpcMetrics;
import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
import org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategy;
Expand All @@ -63,7 +64,7 @@ public class CoordinatorServer extends ReconfigurableBase {
private ServerInterface server;
private ClusterManager clusterManager;
private AssignmentStrategy assignmentStrategy;
private DynamicClientConfService dynamicClientConfService;
private RssClientConfApplyManager clientConfApplyManager;
private AccessManager accessManager;
private ApplicationManager applicationManager;
private GRPCMetrics grpcMetrics;
Expand Down Expand Up @@ -137,8 +138,8 @@ public void stopServer() throws Exception {
if (accessManager != null) {
accessManager.close();
}
if (dynamicClientConfService != null) {
dynamicClientConfService.close();
if (clientConfApplyManager != null) {
clientConfApplyManager.close();
}
if (metricReporter != null) {
metricReporter.stop();
Expand Down Expand Up @@ -181,11 +182,15 @@ private void initialization() throws Exception {
new ClusterManagerFactory(coordinatorConf, hadoopConf);

this.clusterManager = clusterManagerFactory.getClusterManager();
this.dynamicClientConfService =

DynamicClientConfService dynamicClientConfService =
new DynamicClientConfService(
coordinatorConf,
hadoopConf,
new Consumer[] {(Consumer<ClientConf>) applicationManager::refreshRemoteStorages});
this.clientConfApplyManager =
new RssClientConfApplyManager(coordinatorConf, dynamicClientConfService);

AssignmentStrategyFactory assignmentStrategyFactory =
new AssignmentStrategyFactory(coordinatorConf, clusterManager);
this.assignmentStrategy = assignmentStrategyFactory.getAssignmentStrategy();
Expand Down Expand Up @@ -255,8 +260,8 @@ public AccessManager getAccessManager() {
return accessManager;
}

public DynamicClientConfService getDynamicClientConfService() {
return dynamicClientConfService;
public RssClientConfApplyManager getClientConfApplyManager() {
return clientConfApplyManager;
}

public GRPCMetrics getGrpcMetrics() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.coordinator.conf;

import java.util.Map;

public abstract class AbstractRssClientConfApplyStrategy {
protected DynamicClientConfService dynamicClientConfService;

protected AbstractRssClientConfApplyStrategy(DynamicClientConfService dynamicClientConfService) {
this.dynamicClientConfService = dynamicClientConfService;
}

abstract Map<String, String> apply(RssClientConfFetchInfo rssClientConfFetchInfo);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.coordinator.conf;

import java.util.Map;

public class BypassRssClientConfApplyStrategy extends AbstractRssClientConfApplyStrategy {

public BypassRssClientConfApplyStrategy(DynamicClientConfService dynamicClientConfService) {
super(dynamicClientConfService);
}

@Override
Map<String, String> apply(RssClientConfFetchInfo rssClientConfFetchInfo) {
return dynamicClientConfService.getRssClientConf();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.coordinator.conf;

import java.io.Closeable;
import java.io.IOException;
import java.util.Map;

import com.google.common.annotations.VisibleForTesting;

import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.coordinator.CoordinatorConf;

import static org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_CLIENT_CONF_APPLY_STRATEGY;

public class RssClientConfApplyManager implements Closeable {
private final AbstractRssClientConfApplyStrategy strategy;
private final DynamicClientConfService dynamicClientConfService;

public RssClientConfApplyManager(
CoordinatorConf conf, DynamicClientConfService dynamicClientConfService) {
this.dynamicClientConfService = dynamicClientConfService;

String strategyCls = conf.get(COORDINATOR_CLIENT_CONF_APPLY_STRATEGY);
this.strategy =
RssUtils.loadExtension(
AbstractRssClientConfApplyStrategy.class, strategyCls, dynamicClientConfService);
}

public Map<String, String> apply(RssClientConfFetchInfo rssClientConfFetchInfo) {
// to be compatible with the older client version.
if (rssClientConfFetchInfo.isEmpty()) {
return dynamicClientConfService.getRssClientConf();
}
return strategy.apply(rssClientConfFetchInfo);
}

@VisibleForTesting
protected AbstractRssClientConfApplyStrategy getStrategy() {
return strategy;
}

@Override
public void close() throws IOException {
// ignore.
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.coordinator.conf;

import java.util.Map;

import org.apache.uniffle.proto.RssProtos;

public class RssClientConfFetchInfo {
private String user;
private Map<String, String> properties;

public static final RssClientConfFetchInfo EMPTY_CLIENT_CONF_FETCH_INFO =
new RssClientConfFetchInfo(null, null);

public RssClientConfFetchInfo(String user, Map<String, String> properties) {
this.user = user;
this.properties = properties;
}

public String getUser() {
return user;
}

public Map<String, String> getProperties() {
return properties;
}

public boolean isEmpty() {
return user == null && (properties == null || properties.isEmpty());
}

public static RssClientConfFetchInfo fromProto(RssProtos.FetchClientConfRequest request) {
return new RssClientConfFetchInfo(request.getUser(), request.getPropertiesMap());
}
}
Loading

0 comments on commit 93f4347

Please sign in to comment.