Skip to content

Commit

Permalink
Setup a testing endpoint for Rightsizing (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaitlynp1206 authored and Kaitlyn Paglia committed Aug 12, 2021
1 parent a875b17 commit b382625
Show file tree
Hide file tree
Showing 11 changed files with 398 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.linkedin.kafka.cruisecontrol.servlet.parameters.RemoveBrokerParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.ReviewBoardParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.ReviewParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.RightsizeParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.StopProposalParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.TopicConfigurationParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.TrainParameters;
Expand Down Expand Up @@ -171,6 +172,13 @@ public final class CruiseControlParametersConfig {
public static final String DEFAULT_TOPIC_CONFIGURATION_PARAMETERS_CLASS = TopicConfigurationParameters.class.getName();
public static final String TOPIC_CONFIGURATION_PARAMETERS_CLASS_DOC = "The class for parameters of a topic configuration request.";

/**
* <code>rightsize.parameters.class</code>
*/
public static final String RIGHTSIZE_PARAMETERS_CLASS_CONFIG = "rightsize.parameters.class";
public static final String DEFAULT_RIGHTSIZE_PARAMETERS_CLASS = RightsizeParameters.class.getName();
public static final String RIGHTSIZE_PARAMETERS_CLASS_DOC = "The class for parameters of a provision rightsize request.";

private CruiseControlParametersConfig() {
}

Expand Down Expand Up @@ -280,6 +288,11 @@ public static ConfigDef define(ConfigDef configDef) {
ConfigDef.Type.CLASS,
DEFAULT_TOPIC_CONFIGURATION_PARAMETERS_CLASS,
ConfigDef.Importance.MEDIUM,
TOPIC_CONFIGURATION_PARAMETERS_CLASS_DOC);
TOPIC_CONFIGURATION_PARAMETERS_CLASS_DOC)
.define(RIGHTSIZE_PARAMETERS_CLASS_CONFIG,
ConfigDef.Type.CLASS,
DEFAULT_RIGHTSIZE_PARAMETERS_CLASS,
ConfigDef.Importance.MEDIUM,
RIGHTSIZE_PARAMETERS_CLASS_DOC);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.ProposalsRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.RebalanceRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.RemoveBrokerRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.sync.RightsizeRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.TopicConfigurationRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.sync.AdminRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.sync.BootstrapRequest;
Expand Down Expand Up @@ -173,6 +174,13 @@ public final class CruiseControlRequestConfig {
public static final String DEFAULT_TOPIC_CONFIGURATION_REQUEST_CLASS = TopicConfigurationRequest.class.getName();
public static final String TOPIC_CONFIGURATION_REQUEST_CLASS_DOC = "The class to handle a topic configuration request.";

/**
* <code>rightsize.request.class</code>
*/
public static final String RIGHTSIZE_REQUEST_CLASS_CONFIG = "rightsize.request.class";
public static final String DEFAULT_RIGHTSIZE_REQUEST_CLASS = RightsizeRequest.class.getName();
public static final String RIGHTSIZE_REQUEST_CLASS_DOC = "The class to handle a provision rightsize request.";

private CruiseControlRequestConfig() {
}

Expand Down Expand Up @@ -282,6 +290,11 @@ public static ConfigDef define(ConfigDef configDef) {
ConfigDef.Type.CLASS,
DEFAULT_TOPIC_CONFIGURATION_REQUEST_CLASS,
ConfigDef.Importance.MEDIUM,
TOPIC_CONFIGURATION_REQUEST_CLASS_DOC);
TOPIC_CONFIGURATION_REQUEST_CLASS_DOC)
.define(RIGHTSIZE_REQUEST_CLASS_CONFIG,
ConfigDef.Type.CLASS,
DEFAULT_RIGHTSIZE_REQUEST_CLASS,
ConfigDef.Importance.MEDIUM,
RIGHTSIZE_REQUEST_CLASS_DOC);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public enum CruiseControlEndPoint implements EndPoint {
REVIEW_BOARD(CRUISE_CONTROL_MONITOR),
ADMIN(CRUISE_CONTROL_ADMIN),
REVIEW(CRUISE_CONTROL_ADMIN),
TOPIC_CONFIGURATION(KAFKA_ADMIN);
TOPIC_CONFIGURATION(KAFKA_ADMIN),
RIGHTSIZE(KAFKA_ADMIN);

private static final List<CruiseControlEndPoint> CACHED_VALUES = Collections.unmodifiableList(Arrays.asList(values()));
private static final List<CruiseControlEndPoint> GET_ENDPOINTS = Arrays.asList(BOOTSTRAP,
Expand All @@ -55,7 +56,8 @@ public enum CruiseControlEndPoint implements EndPoint {
DEMOTE_BROKER,
ADMIN,
REVIEW,
TOPIC_CONFIGURATION);
TOPIC_CONFIGURATION,
RIGHTSIZE);

private final EndpointType _endpointType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ public final class KafkaCruiseControlServletUtils {
RequestParameterWrapper topicConfiguration = new RequestParameterWrapper(TOPIC_CONFIGURATION_PARAMETERS_CLASS_CONFIG,
TOPIC_CONFIGURATION_PARAMETER_OBJECT_CONFIG,
TOPIC_CONFIGURATION_REQUEST_CLASS_CONFIG);
RequestParameterWrapper rightsize = new RequestParameterWrapper(RIGHTSIZE_PARAMETERS_CLASS_CONFIG,
RIGHTSIZE_PARAMETER_OBJECT_CONFIG,
RIGHTSIZE_REQUEST_CLASS_CONFIG);

requestParameterConfigs.put(BOOTSTRAP, bootstrap);
requestParameterConfigs.put(TRAIN, train);
Expand All @@ -132,6 +135,7 @@ public final class KafkaCruiseControlServletUtils {
requestParameterConfigs.put(REVIEW, review);
requestParameterConfigs.put(REVIEW_BOARD, reviewBoard);
requestParameterConfigs.put(TOPIC_CONFIGURATION, topicConfiguration);
requestParameterConfigs.put(RIGHTSIZE, rightsize);

REQUEST_PARAMETER_CONFIGS = Collections.unmodifiableMap(requestParameterConfigs);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2021 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
*/

package com.linkedin.kafka.cruisecontrol.servlet.handler.sync;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.analyzer.ProvisionRecommendation;
import com.linkedin.kafka.cruisecontrol.analyzer.ProvisionStatus;
import com.linkedin.kafka.cruisecontrol.common.Utils;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.config.constants.AnomalyDetectorConfig;
import com.linkedin.kafka.cruisecontrol.detector.Provisioner;
import com.linkedin.kafka.cruisecontrol.detector.ProvisionerState;
import com.linkedin.kafka.cruisecontrol.detector.RightsizeOptions;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.RightsizeParameters;
import com.linkedin.kafka.cruisecontrol.servlet.response.RightsizeResult;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;

import static com.linkedin.cruisecontrol.common.utils.Utils.validateNotNull;
import static com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectorUtils.KAFKA_CRUISE_CONTROL_OBJECT_CONFIG;
import static com.linkedin.kafka.cruisecontrol.servlet.parameters.ParameterUtils.RIGHTSIZE_PARAMETER_OBJECT_CONFIG;


public class RightsizeRequest extends AbstractSyncRequest {
private static final String RECOMMENDER_UP = "Recommender-Under-Provisioned";
protected KafkaCruiseControl _kafkaCruiseControl;
protected RightsizeParameters _parameters;

public RightsizeRequest() {
super();
}

@Override
protected RightsizeResult handle() {
KafkaCruiseControlConfig config = _kafkaCruiseControl.config();
Map<String, Object> overrideConfigs;
overrideConfigs = Collections.singletonMap(KAFKA_CRUISE_CONTROL_OBJECT_CONFIG, _kafkaCruiseControl);
Provisioner provisioner = config.getConfiguredInstance(AnomalyDetectorConfig.PROVISIONER_CLASS_CONFIG,
Provisioner.class,
overrideConfigs);
Supplier<Set<String>> topicNameSupplier = () -> _kafkaCruiseControl.kafkaCluster().topics();
Set<String> topicNamesMatchedWithPattern = Utils.getTopicNamesMatchedWithPattern(_parameters.topic(), topicNameSupplier);
String topicName;
if (topicNamesMatchedWithPattern.size() > 1) {
throw new IllegalArgumentException(String.format("The RightsizeEndpoint does not support provisioning for multiple topics {%s}.",
String.join(" ,", topicNamesMatchedWithPattern)));
} else {
topicName = topicNamesMatchedWithPattern.iterator().next();
}
ProvisionRecommendation recommendation =
new ProvisionRecommendation.Builder(ProvisionStatus.UNDER_PROVISIONED).numBrokers(_parameters.numBrokersToAdd())
.numPartitions(_parameters.partitionCount())
.topic(topicName)
.build();
Map<String, ProvisionRecommendation> provisionRecommendation;
provisionRecommendation = Collections.singletonMap(RECOMMENDER_UP, recommendation);

ProvisionerState provisionerState = provisioner.rightsize(provisionRecommendation, new RightsizeOptions());

return new RightsizeResult(_parameters.numBrokersToAdd(), _parameters.partitionCount(), topicName, provisionerState, config);
}

@Override
public RightsizeParameters parameters() {
return _parameters;
}

@Override
public String name() {
return RightsizeRequest.class.getSimpleName();
}

@Override
public void configure(Map<String, ?> configs) {
super.configure(configs);
_kafkaCruiseControl = _servlet.asyncKafkaCruiseControl();
_parameters = (RightsizeParameters) validateNotNull(configs.get(RIGHTSIZE_PARAMETER_OBJECT_CONFIG),
"Parameter configuration is missing from the request.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import static com.linkedin.kafka.cruisecontrol.servlet.response.ResponseUtils.writeErrorResponse;
import static javax.servlet.http.HttpServletResponse.SC_BAD_REQUEST;


/**
* The util class for Kafka Cruise Control parameters.
*/
Expand Down Expand Up @@ -138,6 +137,8 @@ public final class ParameterUtils {
public static final String TOPIC_BY_REPLICATION_FACTOR = "topic_by_replication_factor";
public static final String NO_REASON_PROVIDED = "No reason provided";
public static final String DO_AS = "doAs";
public static final String NUM_BROKERS_TO_ADD = "num_brokers_to_add";
public static final String PARTITION_COUNT = "partition_count";

public static final String STOP_PROPOSAL_PARAMETER_OBJECT_CONFIG = "stop.proposal.parameter.object";
public static final String BOOTSTRAP_PARAMETER_OBJECT_CONFIG = "bootstrap.parameter.object";
Expand All @@ -158,6 +159,7 @@ public final class ParameterUtils {
public static final String ADMIN_PARAMETER_OBJECT_CONFIG = "admin.parameter.object";
public static final String REVIEW_PARAMETER_OBJECT_CONFIG = "review.parameter.object";
public static final String TOPIC_CONFIGURATION_PARAMETER_OBJECT_CONFIG = "topic.configuration.parameter.object";
public static final String RIGHTSIZE_PARAMETER_OBJECT_CONFIG = "rightsize.parameter.object";

private ParameterUtils() {
}
Expand Down Expand Up @@ -921,6 +923,44 @@ static int partitionBoundary(HttpServletRequest request, boolean isUpperBound) {
return Integer.parseInt(boundaries[isUpperBound ? 1 : 0]);
}

/**
* Get the {@link #NUM_BROKERS_TO_ADD} from the request.
*
* Default: -1
* @return The value of {@link #NUM_BROKERS_TO_ADD} parameter.
* @throws UserRequestException if the number of brokers to add is not a positive integer.
*/
static int numBrokersToAdd(HttpServletRequest request) {
String parameterString = caseSensitiveParameterName(request.getParameterMap(), NUM_BROKERS_TO_ADD);
if (parameterString == null) {
return -1;
}
int numBrokersToAdd = Integer.parseInt(request.getParameter(parameterString));
if (numBrokersToAdd <= 0) {
throw new UserRequestException("The requested number of brokers to add must be positive (Requested: " + numBrokersToAdd + ").");
}
return numBrokersToAdd;
}

/**
* Get the {@link #PARTITION_COUNT} from the request.
*
* Default: -1
* @return The value of {@link #PARTITION_COUNT} parameter.
* @throws UserRequestException if the targeted partition count is not a positive integer.
*/
static Integer partitionCount(HttpServletRequest request) {
String parameterString = caseSensitiveParameterName(request.getParameterMap(), PARTITION_COUNT);
if (parameterString == null) {
return -1;
}
int targetPartitionCount = Integer.parseInt(request.getParameter(parameterString));
if (targetPartitionCount <= 0) {
throw new UserRequestException("The requested targeted partition count must be positive (Requested: " + targetPartitionCount + ").");
}
return targetPartitionCount;
}

static Set<Integer> brokerIds(HttpServletRequest request, boolean isOptional) throws UnsupportedEncodingException {
Set<Integer> brokerIds = parseParamToIntegerSet(request, BROKER_ID_PARAM);
if (!isOptional && brokerIds.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2021 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
*/

package com.linkedin.kafka.cruisecontrol.servlet.parameters;

import java.io.UnsupportedEncodingException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.regex.Pattern;

import static com.linkedin.kafka.cruisecontrol.servlet.parameters.ParameterUtils.NUM_BROKERS_TO_ADD;
import static com.linkedin.kafka.cruisecontrol.servlet.parameters.ParameterUtils.PARTITION_COUNT;
import static com.linkedin.kafka.cruisecontrol.servlet.parameters.ParameterUtils.TOPIC_PARAM;


public class RightsizeParameters extends AbstractParameters {
protected static final SortedSet<String> CASE_INSENSITIVE_PARAMETER_NAMES;
static {
SortedSet<String> validParameterNames = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
validParameterNames.add(NUM_BROKERS_TO_ADD);
validParameterNames.add(PARTITION_COUNT);
validParameterNames.add(TOPIC_PARAM);
validParameterNames.addAll(KafkaOptimizationParameters.CASE_INSENSITIVE_PARAMETER_NAMES);
CASE_INSENSITIVE_PARAMETER_NAMES = Collections.unmodifiableSortedSet(validParameterNames);
}
protected int _numBrokersToAdd;
protected int _partitionCount;
protected Pattern _topic;

public RightsizeParameters() {
super();
}

@Override
protected void initParameters() throws UnsupportedEncodingException {
super.initParameters();
_numBrokersToAdd = ParameterUtils.numBrokersToAdd(_request);
_partitionCount = ParameterUtils.partitionCount(_request);
_topic = ParameterUtils.topic(_request);
}

public int numBrokersToAdd() {
return _numBrokersToAdd;
}

public int partitionCount() {
return _partitionCount;
}

public Pattern topic() {
return _topic;
}

@Override
public SortedSet<String> caseInsensitiveParameterNames() {
return CASE_INSENSITIVE_PARAMETER_NAMES;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2021 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
*/

package com.linkedin.kafka.cruisecontrol.servlet.response;

import com.google.gson.Gson;
import com.linkedin.cruisecontrol.servlet.parameters.CruiseControlParameters;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.detector.ProvisionerState;
import java.util.HashMap;
import java.util.Map;


@JsonResponseClass
public class RightsizeResult extends AbstractCruiseControlResponse {
@JsonResponseField
protected static final String NUM_BROKERS_TO_ADD = "num_brokers_to_add";
@JsonResponseField
protected static final String PARTITION_COUNT = "partition_count";
@JsonResponseField
protected static final String TOPIC = "topic";
@JsonResponseField
protected static final String PROVISION_STATE = "provision_state";
@JsonResponseField
protected static final String PROVISION_SUMMARY = "provision_summary";

protected final int _numBrokersToAdd;
protected final int _partitionCount;
protected String _topic;
protected String _provisionState;
protected String _provisionSummary;

public RightsizeResult(int numBrokersToAdd,
int partitionCount,
String topic,
ProvisionerState provisionerState,
KafkaCruiseControlConfig config) {
super(config);

_numBrokersToAdd = numBrokersToAdd;
_partitionCount = partitionCount;
_topic = topic;
_provisionState = provisionerState.state().toString();
_provisionSummary = provisionerState.summary();
}

protected String getJSONString() {
Map<String, Object> jsonStructure = new HashMap<>(5);
jsonStructure.put(NUM_BROKERS_TO_ADD, _numBrokersToAdd);
jsonStructure.put(PARTITION_COUNT, _partitionCount);
if (_topic != null && !_topic.isEmpty()) {
jsonStructure.put(TOPIC, _topic);
}
if (_provisionState != null && !_provisionState.isEmpty()) {
jsonStructure.put(PROVISION_STATE, _provisionState);
}
if (_provisionSummary != null && !_provisionSummary.isEmpty()) {
jsonStructure.put(PROVISION_SUMMARY, _provisionSummary);
}
Gson gson = new Gson();
return gson.toJson(jsonStructure);
}

@Override
protected void discardIrrelevantAndCacheRelevant(CruiseControlParameters parameters) {
_cachedResponse = getJSONString();
// Discard irrelevant response.
_topic = null;
_provisionState = null;
_provisionSummary = null;
}
}
Loading

0 comments on commit b382625

Please sign in to comment.