From a875b17ec919fcb52d490e9b4d69a50a4ec44776 Mon Sep 17 00:00:00 2001 From: kaitlynp1206 Date: Tue, 27 Jul 2021 17:50:02 -0400 Subject: [PATCH 1/5] Create a provisioner state for provision tracking (#4) Co-authored-by: Kaitlyn Paglia --- .../analyzer/ProvisionRecommendation.java | 2 + .../detector/GoalViolationDetector.java | 8 +- .../detector/NoopProvisioner.java | 6 + .../cruisecontrol/detector/Provisioner.java | 21 ++++ .../detector/ProvisionerState.java | 107 ++++++++++++++++++ .../detector/ProvisionerUtils.java | 40 +++++++ .../detector/RightsizeOptions.java | 19 ++++ .../detector/ProvisionerStateTest.java | 38 +++++++ 8 files changed, 237 insertions(+), 4 deletions(-) create mode 100644 cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/ProvisionerState.java create mode 100644 cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/ProvisionerUtils.java create mode 100644 cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/RightsizeOptions.java create mode 100644 cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/ProvisionerStateTest.java diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/ProvisionRecommendation.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/ProvisionRecommendation.java index 7945ef172..6c0c986d5 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/ProvisionRecommendation.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/ProvisionRecommendation.java @@ -39,6 +39,8 @@ public final class ProvisionRecommendation { protected final int _numBrokers; protected final int _numRacks; protected final int _numDisks; + //TODO: Update the ProvisionRecommendation to provide a set of for batch provisioning of partitions + //See Issue #1650 protected final int _numPartitions; // If the resource is partition, the name of the topic must be specified. protected final String _topic; diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/GoalViolationDetector.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/GoalViolationDetector.java index c1bee2f81..431e8638f 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/GoalViolationDetector.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/GoalViolationDetector.java @@ -227,10 +227,10 @@ public void run() { } _provisionResponse = provisionResponse; if (_isProvisionerEnabled) { - // Right-size the cluster (if needed) - boolean isRightsized = _provisioner.rightsize(_provisionResponse.recommendationByRecommender()); - if (isRightsized) { - LOG.info("Actions have been taken on the cluster towards rightsizing."); + // Rightsize the cluster (if needed) + ProvisionerState isRightsized = _provisioner.rightsize(_provisionResponse.recommendationByRecommender(), new RightsizeOptions()); + if (isRightsized != null) { + LOG.info(isRightsized.toString()); } } Map> violatedGoalsByFixability = goalViolations.violatedGoalsByFixability(); diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/NoopProvisioner.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/NoopProvisioner.java index 95042b9d8..c3ca295a0 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/NoopProvisioner.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/NoopProvisioner.java @@ -13,6 +13,12 @@ */ public class NoopProvisioner implements Provisioner { @Override + public ProvisionerState rightsize(Map recommendationByRecommender, RightsizeOptions rightsizeOptions) { + return null; + } + + @Override + @Deprecated public boolean rightsize(Map recommendationByRecommender) { return false; } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/Provisioner.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/Provisioner.java index 663f66ec6..553e5ba04 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/Provisioner.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/Provisioner.java @@ -32,6 +32,27 @@ public interface Provisioner extends CruiseControlConfigurable { * * @param recommendationByRecommender Provision recommendations provided by corresponding recommenders. * @return {@code true} if actions have been taken on the cluster towards rightsizing, {@code false} otherwise. + * @deprecated Will be removed in a future release -- please use {@link #rightsize(Map, RightsizeOptions)} */ + @Deprecated boolean rightsize(Map recommendationByRecommender); + + /** + * Rightsize the cluster using the given constraints. All given recommendations are expected to share the same {@link ProvisionStatus}. + * Implementations of this function are expected to be non-blocking -- i.e. starts the rightsizing, but does not block until the completion. + * + *
    + *
  • For {@link ProvisionStatus#UNDER_PROVISIONED} clusters, each recommender (e.g. goal name) indicates requested resources + * (e.g. number of brokers) along with relevant constraints (e.g. racks for which brokers should not be added). Typically, aggregating + * different recommendations for the same resource type requires considering the maximum value over all recommendations.
  • + *
  • For {@link ProvisionStatus#OVER_PROVISIONED} clusters, each recommender (e.g. goal name) indicates resources that can be + * released (e.g. number of brokers) along with relevant constraints (e.g. expected broker capacity). Typically, aggregating + * different recommendations for the same resource type requires considering the minimum value over all recommendations.
  • + *
+ * + * @param recommendationByRecommender Provision recommendations provided by corresponding recommenders. + * @param rightsizeOptions Rightsize options to take into account when rightsizing the cluster. + * @return {@link ProvisionerState} of actions taken on the cluster towards rightsizing or null if no actions were taken. + */ + ProvisionerState rightsize(Map recommendationByRecommender, RightsizeOptions rightsizeOptions); } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/ProvisionerState.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/ProvisionerState.java new file mode 100644 index 000000000..3cecbe98c --- /dev/null +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/ProvisionerState.java @@ -0,0 +1,107 @@ +/* + * Copyright 2021 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. + */ + +package com.linkedin.kafka.cruisecontrol.detector; + +import com.linkedin.cruisecontrol.common.utils.Utils; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + + +/** + * A class to indicate how a provisioning action is handled + */ +public class ProvisionerState { + private static final Map> VALID_TRANSFER = new HashMap<>(); + private State _state; + private String _summary; + private final long _createdMs; + private long _updatedMs; + + static { + VALID_TRANSFER.put(State.IN_PROGRESS, new HashSet<>(Collections.singleton(State.IN_PROGRESS))); + VALID_TRANSFER.put(State.COMPLETED_WITH_ERROR, new HashSet<>(Arrays.asList(State.IN_PROGRESS, State.COMPLETED_WITH_ERROR))); + VALID_TRANSFER.put(State.COMPLETED, new HashSet<>(Arrays.asList(State.IN_PROGRESS, State.COMPLETED_WITH_ERROR, State.COMPLETED))); + } + + public ProvisionerState(State state, String summary) { + _state = state; + _summary = Utils.validateNotNull(summary, "ProvisionerState summary cannot be null."); + _createdMs = System.currentTimeMillis(); + _updatedMs = _createdMs; + } + + /** + * Check if the state transfer is possible. + * @param targetState the state to transfer to. + * @return True if the transfer is valid, false otherwise. + */ + public boolean canTransferToState(ProvisionerState.State targetState) { + return VALID_TRANSFER.get(_state).contains(targetState); + } + + /** + * @return The state of the provisioning action. + */ + public State state() { + return _state; + } + + /** + * @return The summary of the provisioning action status. + */ + public String summary() { + return _summary; + } + + /** + * @return The time the provisioner state was created in milliseconds. + */ + public long createdMs() { + return _createdMs; + } + + /** + * @return The status update time of the provision state in milliseconds. + */ + public long updatedMs() { + return _updatedMs; + } + + /** + * Update the state and summary of the provisioning action + * + * @param state The new state of the provisioning action. + * @param summary The new summary of the provisioning action status. + * @throws IllegalArgumentException if the summary is null. + * @throws IllegalStateException if the target state is not a valid target state. + */ + public void update(State state, String summary) { + if (canTransferToState(state)) { + _state = state; + _summary = Utils.validateNotNull(summary, "ProvisionerState summary cannot be null."); + _updatedMs = System.currentTimeMillis(); + } else { + throw new IllegalStateException("Cannot set the provisioner state from " + _state.toString() + " to " + state.toString() + + ". The valid target states are " + Collections.unmodifiableSet(VALID_TRANSFER.get(_state))); + } + } + + public enum State { + COMPLETED, COMPLETED_WITH_ERROR, IN_PROGRESS + } + + @Override + public String toString() { + return String.format("ProvisionerState:{state: %s, summary: %s, createdMs: %d, updated: %d}", + _state.toString(), + _summary, + _createdMs, + _updatedMs); + } +} diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/ProvisionerUtils.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/ProvisionerUtils.java new file mode 100644 index 000000000..88d1ab3ec --- /dev/null +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/ProvisionerUtils.java @@ -0,0 +1,40 @@ +/* + * Copyright 2021 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. + */ + +package com.linkedin.kafka.cruisecontrol.detector; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; + +import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.maybeIncreasePartitionCount; + +/** + * A util class for provisions. + */ +public final class ProvisionerUtils { + + private ProvisionerUtils() { + } + + /** + * Determine the status of increasing the partition count + * + * @param adminClient AdminClient to handle partition count increases. + * @param topicToAddPartitions Existing topic to add more partitions if needed + * @return The state {@link ProvisionerState.State#COMPLETED} when true, {@link ProvisionerState.State#COMPLETED_WITH_ERROR} when false + */ + public static ProvisionerState partitionIncreaseStatus(AdminClient adminClient, NewTopic topicToAddPartitions) { + if (maybeIncreasePartitionCount(adminClient, topicToAddPartitions)) { + String summary = String.format("Provisioning the partition count to increase to %d for the topic %s was successfully completed.", + topicToAddPartitions.numPartitions(), + topicToAddPartitions.name()); + return new ProvisionerState(ProvisionerState.State.COMPLETED, summary); + } else { + String summary = String.format("Provisioning the partition count to increase to %d for the topic %s was rejected.", + topicToAddPartitions.numPartitions(), + topicToAddPartitions.name()); + return new ProvisionerState(ProvisionerState.State.COMPLETED_WITH_ERROR, summary); + } + } +} diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/RightsizeOptions.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/RightsizeOptions.java new file mode 100644 index 000000000..c346051b8 --- /dev/null +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/RightsizeOptions.java @@ -0,0 +1,19 @@ +/* + * Copyright 2021 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. + */ + +package com.linkedin.kafka.cruisecontrol.detector; + +import java.util.Map; +import org.apache.kafka.common.annotation.InterfaceStability; + + +/** + * A class to indicate options intended to be used during application of a {@link Provisioner#rightsize(Map, RightsizeOptions)}. + */ +@InterfaceStability.Evolving +public final class RightsizeOptions { + public RightsizeOptions() { + + } +} diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/ProvisionerStateTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/ProvisionerStateTest.java new file mode 100644 index 000000000..cbd009779 --- /dev/null +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/ProvisionerStateTest.java @@ -0,0 +1,38 @@ +/* + * Copyright 2021 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. + */ + +package com.linkedin.kafka.cruisecontrol.detector; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + + +public class ProvisionerStateTest { + @Test + public void testProvisionerStateValidUpdate() { + ProvisionerState.State originalState = ProvisionerState.State.COMPLETED; + String originalSummary = "Test summary."; + ProvisionerState provisionerState = new ProvisionerState(originalState, originalSummary); + + ProvisionerState.State updatedState = ProvisionerState.State.IN_PROGRESS; + String updatedSummary = "Test summary."; + provisionerState.update(updatedState, updatedSummary); + + assertEquals(updatedState, provisionerState.state()); + assertEquals(updatedSummary, provisionerState.summary()); + } + + @Test + public void testProvisionerStateInvalidUpdateThrowsException() { + ProvisionerState.State originalState = ProvisionerState.State.IN_PROGRESS; + String originalSummary = "Test summary."; + ProvisionerState provisionerState = new ProvisionerState(originalState, originalSummary); + ProvisionerState.State updatedState = ProvisionerState.State.COMPLETED_WITH_ERROR; + + assertThrows(IllegalArgumentException.class, () -> provisionerState.update(originalState, null)); + assertThrows(IllegalStateException.class, () -> provisionerState.update(updatedState, originalSummary)); + } +} From 8230a0513495b9e97afa6917ab22fd59c1deff8c Mon Sep 17 00:00:00 2001 From: kaitlynp1206 Date: Fri, 6 Aug 2021 18:50:14 -0400 Subject: [PATCH 2/5] Setup a testing endpoint for Rightsizing (#5) --- .../CruiseControlParametersConfig.java | 15 +++- .../constants/CruiseControlRequestConfig.java | 15 +++- .../servlet/CruiseControlEndPoint.java | 6 +- .../KafkaCruiseControlServletUtils.java | 4 + .../handler/sync/RightsizeRequest.java | 84 +++++++++++++++++++ .../servlet/parameters/ParameterUtils.java | 43 +++++++++- .../parameters/RightsizeParameters.java | 60 +++++++++++++ .../servlet/response/RightsizeResult.java | 76 +++++++++++++++++ cruise-control/src/yaml/base.yaml | 2 + .../src/yaml/endpoints/rightsize.yaml | 63 ++++++++++++++ .../src/yaml/responses/rightsizeResult.yaml | 25 ++++++ docs/wiki/User Guide/REST-APIs.md | 19 ++++- 12 files changed, 406 insertions(+), 6 deletions(-) create mode 100644 cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/sync/RightsizeRequest.java create mode 100644 cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/RightsizeParameters.java create mode 100644 cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/response/RightsizeResult.java create mode 100644 cruise-control/src/yaml/endpoints/rightsize.yaml create mode 100644 cruise-control/src/yaml/responses/rightsizeResult.yaml diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/CruiseControlParametersConfig.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/CruiseControlParametersConfig.java index 3eecaee83..5af3ec975 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/CruiseControlParametersConfig.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/CruiseControlParametersConfig.java @@ -19,6 +19,7 @@ import com.linkedin.kafka.cruisecontrol.servlet.parameters.RemoveBrokerParameters; import com.linkedin.kafka.cruisecontrol.servlet.parameters.ReviewBoardParameters; import com.linkedin.kafka.cruisecontrol.servlet.parameters.ReviewParameters; +import com.linkedin.kafka.cruisecontrol.servlet.parameters.RightsizeParameters; import com.linkedin.kafka.cruisecontrol.servlet.parameters.StopProposalParameters; import com.linkedin.kafka.cruisecontrol.servlet.parameters.TopicConfigurationParameters; import com.linkedin.kafka.cruisecontrol.servlet.parameters.TrainParameters; @@ -171,6 +172,13 @@ public final class CruiseControlParametersConfig { public static final String DEFAULT_TOPIC_CONFIGURATION_PARAMETERS_CLASS = TopicConfigurationParameters.class.getName(); public static final String TOPIC_CONFIGURATION_PARAMETERS_CLASS_DOC = "The class for parameters of a topic configuration request."; + /** + * rightsize.parameters.class + */ + public static final String RIGHTSIZE_PARAMETERS_CLASS_CONFIG = "rightsize.parameters.class"; + public static final String DEFAULT_RIGHTSIZE_PARAMETERS_CLASS = RightsizeParameters.class.getName(); + public static final String RIGHTSIZE_PARAMETERS_CLASS_DOC = "The class for parameters of a provision rightsize request."; + private CruiseControlParametersConfig() { } @@ -280,6 +288,11 @@ public static ConfigDef define(ConfigDef configDef) { ConfigDef.Type.CLASS, DEFAULT_TOPIC_CONFIGURATION_PARAMETERS_CLASS, ConfigDef.Importance.MEDIUM, - TOPIC_CONFIGURATION_PARAMETERS_CLASS_DOC); + TOPIC_CONFIGURATION_PARAMETERS_CLASS_DOC) + .define(RIGHTSIZE_PARAMETERS_CLASS_CONFIG, + ConfigDef.Type.CLASS, + DEFAULT_RIGHTSIZE_PARAMETERS_CLASS, + ConfigDef.Importance.MEDIUM, + RIGHTSIZE_PARAMETERS_CLASS_DOC); } } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/CruiseControlRequestConfig.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/CruiseControlRequestConfig.java index 4891c2e1a..a879e4bf6 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/CruiseControlRequestConfig.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/CruiseControlRequestConfig.java @@ -13,6 +13,7 @@ import com.linkedin.kafka.cruisecontrol.servlet.handler.async.ProposalsRequest; import com.linkedin.kafka.cruisecontrol.servlet.handler.async.RebalanceRequest; import com.linkedin.kafka.cruisecontrol.servlet.handler.async.RemoveBrokerRequest; +import com.linkedin.kafka.cruisecontrol.servlet.handler.sync.RightsizeRequest; import com.linkedin.kafka.cruisecontrol.servlet.handler.async.TopicConfigurationRequest; import com.linkedin.kafka.cruisecontrol.servlet.handler.sync.AdminRequest; import com.linkedin.kafka.cruisecontrol.servlet.handler.sync.BootstrapRequest; @@ -173,6 +174,13 @@ public final class CruiseControlRequestConfig { public static final String DEFAULT_TOPIC_CONFIGURATION_REQUEST_CLASS = TopicConfigurationRequest.class.getName(); public static final String TOPIC_CONFIGURATION_REQUEST_CLASS_DOC = "The class to handle a topic configuration request."; + /** + * rightsize.request.class + */ + public static final String RIGHTSIZE_REQUEST_CLASS_CONFIG = "rightsize.request.class"; + public static final String DEFAULT_RIGHTSIZE_REQUEST_CLASS = RightsizeRequest.class.getName(); + public static final String RIGHTSIZE_REQUEST_CLASS_DOC = "The class to handle a provision rightsize request."; + private CruiseControlRequestConfig() { } @@ -282,6 +290,11 @@ public static ConfigDef define(ConfigDef configDef) { ConfigDef.Type.CLASS, DEFAULT_TOPIC_CONFIGURATION_REQUEST_CLASS, ConfigDef.Importance.MEDIUM, - TOPIC_CONFIGURATION_REQUEST_CLASS_DOC); + TOPIC_CONFIGURATION_REQUEST_CLASS_DOC) + .define(RIGHTSIZE_REQUEST_CLASS_CONFIG, + ConfigDef.Type.CLASS, + DEFAULT_RIGHTSIZE_REQUEST_CLASS, + ConfigDef.Importance.MEDIUM, + RIGHTSIZE_REQUEST_CLASS_DOC); } } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/CruiseControlEndPoint.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/CruiseControlEndPoint.java index 8c0338393..bec1bee96 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/CruiseControlEndPoint.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/CruiseControlEndPoint.java @@ -33,7 +33,8 @@ public enum CruiseControlEndPoint implements EndPoint { REVIEW_BOARD(CRUISE_CONTROL_MONITOR), ADMIN(CRUISE_CONTROL_ADMIN), REVIEW(CRUISE_CONTROL_ADMIN), - TOPIC_CONFIGURATION(KAFKA_ADMIN); + TOPIC_CONFIGURATION(KAFKA_ADMIN), + RIGHTSIZE(KAFKA_ADMIN); private static final List CACHED_VALUES = Collections.unmodifiableList(Arrays.asList(values())); private static final List GET_ENDPOINTS = Arrays.asList(BOOTSTRAP, @@ -55,7 +56,8 @@ public enum CruiseControlEndPoint implements EndPoint { DEMOTE_BROKER, ADMIN, REVIEW, - TOPIC_CONFIGURATION); + TOPIC_CONFIGURATION, + RIGHTSIZE); private final EndpointType _endpointType; diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/KafkaCruiseControlServletUtils.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/KafkaCruiseControlServletUtils.java index 5fafb636e..9e0702d40 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/KafkaCruiseControlServletUtils.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/KafkaCruiseControlServletUtils.java @@ -111,6 +111,9 @@ public final class KafkaCruiseControlServletUtils { RequestParameterWrapper topicConfiguration = new RequestParameterWrapper(TOPIC_CONFIGURATION_PARAMETERS_CLASS_CONFIG, TOPIC_CONFIGURATION_PARAMETER_OBJECT_CONFIG, TOPIC_CONFIGURATION_REQUEST_CLASS_CONFIG); + RequestParameterWrapper rightsize = new RequestParameterWrapper(RIGHTSIZE_PARAMETERS_CLASS_CONFIG, + RIGHTSIZE_PARAMETER_OBJECT_CONFIG, + RIGHTSIZE_REQUEST_CLASS_CONFIG); requestParameterConfigs.put(BOOTSTRAP, bootstrap); requestParameterConfigs.put(TRAIN, train); @@ -132,6 +135,7 @@ public final class KafkaCruiseControlServletUtils { requestParameterConfigs.put(REVIEW, review); requestParameterConfigs.put(REVIEW_BOARD, reviewBoard); requestParameterConfigs.put(TOPIC_CONFIGURATION, topicConfiguration); + requestParameterConfigs.put(RIGHTSIZE, rightsize); REQUEST_PARAMETER_CONFIGS = Collections.unmodifiableMap(requestParameterConfigs); } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/sync/RightsizeRequest.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/sync/RightsizeRequest.java new file mode 100644 index 000000000..178c327ba --- /dev/null +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/sync/RightsizeRequest.java @@ -0,0 +1,84 @@ +/* + * Copyright 2021 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. + */ + +package com.linkedin.kafka.cruisecontrol.servlet.handler.sync; + +import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl; +import com.linkedin.kafka.cruisecontrol.analyzer.ProvisionRecommendation; +import com.linkedin.kafka.cruisecontrol.analyzer.ProvisionStatus; +import com.linkedin.kafka.cruisecontrol.common.Utils; +import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig; +import com.linkedin.kafka.cruisecontrol.config.constants.AnomalyDetectorConfig; +import com.linkedin.kafka.cruisecontrol.detector.Provisioner; +import com.linkedin.kafka.cruisecontrol.detector.ProvisionerState; +import com.linkedin.kafka.cruisecontrol.detector.RightsizeOptions; +import com.linkedin.kafka.cruisecontrol.servlet.parameters.RightsizeParameters; +import com.linkedin.kafka.cruisecontrol.servlet.response.RightsizeResult; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; + +import static com.linkedin.cruisecontrol.common.utils.Utils.validateNotNull; +import static com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectorUtils.KAFKA_CRUISE_CONTROL_OBJECT_CONFIG; +import static com.linkedin.kafka.cruisecontrol.servlet.parameters.ParameterUtils.RIGHTSIZE_PARAMETER_OBJECT_CONFIG; + + +public class RightsizeRequest extends AbstractSyncRequest { + private static final String RECOMMENDER_UP = "Recommender-Under-Provisioned"; + protected KafkaCruiseControl _kafkaCruiseControl; + protected RightsizeParameters _parameters; + + public RightsizeRequest() { + super(); + } + + @Override + protected RightsizeResult handle() { + KafkaCruiseControlConfig config = _kafkaCruiseControl.config(); + Map overrideConfigs; + overrideConfigs = Collections.singletonMap(KAFKA_CRUISE_CONTROL_OBJECT_CONFIG, _kafkaCruiseControl); + Provisioner provisioner = config.getConfiguredInstance(AnomalyDetectorConfig.PROVISIONER_CLASS_CONFIG, + Provisioner.class, + overrideConfigs); + Supplier> topicNameSupplier = () -> _kafkaCruiseControl.kafkaCluster().topics(); + Set topicNamesMatchedWithPattern = Utils.getTopicNamesMatchedWithPattern(_parameters.topic(), topicNameSupplier); + String topicName; + if (topicNamesMatchedWithPattern.size() > 1) { + throw new IllegalArgumentException(String.format("The RightsizeEndpoint does not support provisioning for multiple topics {%s}.", + String.join(" ,", topicNamesMatchedWithPattern))); + } else { + topicName = topicNamesMatchedWithPattern.iterator().next(); + } + ProvisionRecommendation recommendation = + new ProvisionRecommendation.Builder(ProvisionStatus.UNDER_PROVISIONED).numBrokers(_parameters.numBrokersToAdd()) + .numPartitions(_parameters.partitionCount()) + .topic(topicName) + .build(); + Map provisionRecommendation; + provisionRecommendation = Collections.singletonMap(RECOMMENDER_UP, recommendation); + + ProvisionerState provisionerState = provisioner.rightsize(provisionRecommendation, new RightsizeOptions()); + + return new RightsizeResult(_parameters.numBrokersToAdd(), _parameters.partitionCount(), topicName, provisionerState, config); + } + + @Override + public RightsizeParameters parameters() { + return _parameters; + } + + @Override + public String name() { + return RightsizeRequest.class.getSimpleName(); + } + + @Override + public void configure(Map configs) { + super.configure(configs); + _kafkaCruiseControl = _servlet.asyncKafkaCruiseControl(); + _parameters = (RightsizeParameters) validateNotNull(configs.get(RIGHTSIZE_PARAMETER_OBJECT_CONFIG), + "Parameter configuration is missing from the request."); + } +} diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/ParameterUtils.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/ParameterUtils.java index d0987c1e6..8bd2fd012 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/ParameterUtils.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/ParameterUtils.java @@ -8,6 +8,7 @@ import com.linkedin.cruisecontrol.detector.AnomalyType; import com.linkedin.cruisecontrol.servlet.EndPoint; import com.linkedin.cruisecontrol.servlet.parameters.CruiseControlParameters; +import com.linkedin.kafka.cruisecontrol.analyzer.ProvisionRecommendation; import com.linkedin.kafka.cruisecontrol.analyzer.goals.IntraBrokerDiskCapacityGoal; import com.linkedin.kafka.cruisecontrol.analyzer.goals.IntraBrokerDiskUsageDistributionGoal; import com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerDiskUsageDistributionGoal; @@ -59,7 +60,6 @@ import static com.linkedin.kafka.cruisecontrol.servlet.response.ResponseUtils.writeErrorResponse; import static javax.servlet.http.HttpServletResponse.SC_BAD_REQUEST; - /** * The util class for Kafka Cruise Control parameters. */ @@ -138,6 +138,8 @@ public final class ParameterUtils { public static final String TOPIC_BY_REPLICATION_FACTOR = "topic_by_replication_factor"; public static final String NO_REASON_PROVIDED = "No reason provided"; public static final String DO_AS = "doAs"; + public static final String NUM_BROKERS_TO_ADD = "num_brokers_to_add"; + public static final String PARTITION_COUNT = "partition_count"; public static final String STOP_PROPOSAL_PARAMETER_OBJECT_CONFIG = "stop.proposal.parameter.object"; public static final String BOOTSTRAP_PARAMETER_OBJECT_CONFIG = "bootstrap.parameter.object"; @@ -158,6 +160,7 @@ public final class ParameterUtils { public static final String ADMIN_PARAMETER_OBJECT_CONFIG = "admin.parameter.object"; public static final String REVIEW_PARAMETER_OBJECT_CONFIG = "review.parameter.object"; public static final String TOPIC_CONFIGURATION_PARAMETER_OBJECT_CONFIG = "topic.configuration.parameter.object"; + public static final String RIGHTSIZE_PARAMETER_OBJECT_CONFIG = "rightsize.parameter.object"; private ParameterUtils() { } @@ -921,6 +924,44 @@ static int partitionBoundary(HttpServletRequest request, boolean isUpperBound) { return Integer.parseInt(boundaries[isUpperBound ? 1 : 0]); } + /** + * Get the {@link #NUM_BROKERS_TO_ADD} from the request. + * + * Default: {@link ProvisionRecommendation#DEFAULT_OPTIONAL_INT} + * @return The value of {@link #NUM_BROKERS_TO_ADD} parameter. + * @throws UserRequestException if the number of brokers to add is not a positive integer. + */ + static int numBrokersToAdd(HttpServletRequest request) { + String parameterString = caseSensitiveParameterName(request.getParameterMap(), NUM_BROKERS_TO_ADD); + if (parameterString == null) { + return ProvisionRecommendation.DEFAULT_OPTIONAL_INT; + } + int numBrokersToAdd = Integer.parseInt(request.getParameter(parameterString)); + if (numBrokersToAdd <= 0) { + throw new UserRequestException("The requested number of brokers to add must be positive (Requested: " + numBrokersToAdd + ")."); + } + return numBrokersToAdd; + } + + /** + * Get the {@link #PARTITION_COUNT} from the request. + * + * Default: {@link ProvisionRecommendation#DEFAULT_OPTIONAL_INT} + * @return The value of {@link #PARTITION_COUNT} parameter. + * @throws UserRequestException if the targeted partition count is not a positive integer. + */ + static int partitionCount(HttpServletRequest request) { + String parameterString = caseSensitiveParameterName(request.getParameterMap(), PARTITION_COUNT); + if (parameterString == null) { + return ProvisionRecommendation.DEFAULT_OPTIONAL_INT; + } + int targetPartitionCount = Integer.parseInt(request.getParameter(parameterString)); + if (targetPartitionCount <= 0) { + throw new UserRequestException("The requested targeted partition count must be positive (Requested: " + targetPartitionCount + ")."); + } + return targetPartitionCount; + } + static Set brokerIds(HttpServletRequest request, boolean isOptional) throws UnsupportedEncodingException { Set brokerIds = parseParamToIntegerSet(request, BROKER_ID_PARAM); if (!isOptional && brokerIds.isEmpty()) { diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/RightsizeParameters.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/RightsizeParameters.java new file mode 100644 index 000000000..2a61ba1a5 --- /dev/null +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/parameters/RightsizeParameters.java @@ -0,0 +1,60 @@ +/* + * Copyright 2021 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. + */ + +package com.linkedin.kafka.cruisecontrol.servlet.parameters; + +import java.io.UnsupportedEncodingException; +import java.util.Collections; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.regex.Pattern; + +import static com.linkedin.kafka.cruisecontrol.servlet.parameters.ParameterUtils.NUM_BROKERS_TO_ADD; +import static com.linkedin.kafka.cruisecontrol.servlet.parameters.ParameterUtils.PARTITION_COUNT; +import static com.linkedin.kafka.cruisecontrol.servlet.parameters.ParameterUtils.TOPIC_PARAM; + + +public class RightsizeParameters extends AbstractParameters { + protected static final SortedSet CASE_INSENSITIVE_PARAMETER_NAMES; + static { + SortedSet validParameterNames = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); + validParameterNames.add(NUM_BROKERS_TO_ADD); + validParameterNames.add(PARTITION_COUNT); + validParameterNames.add(TOPIC_PARAM); + validParameterNames.addAll(AbstractParameters.CASE_INSENSITIVE_PARAMETER_NAMES); + CASE_INSENSITIVE_PARAMETER_NAMES = Collections.unmodifiableSortedSet(validParameterNames); + } + protected int _numBrokersToAdd; + protected int _partitionCount; + protected Pattern _topic; + + public RightsizeParameters() { + super(); + } + + @Override + protected void initParameters() throws UnsupportedEncodingException { + super.initParameters(); + _numBrokersToAdd = ParameterUtils.numBrokersToAdd(_request); + _partitionCount = ParameterUtils.partitionCount(_request); + _topic = ParameterUtils.topic(_request); + } + + public int numBrokersToAdd() { + return _numBrokersToAdd; + } + + public int partitionCount() { + return _partitionCount; + } + + public Pattern topic() { + return _topic; + } + + @Override + public SortedSet caseInsensitiveParameterNames() { + return CASE_INSENSITIVE_PARAMETER_NAMES; + } +} diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/response/RightsizeResult.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/response/RightsizeResult.java new file mode 100644 index 000000000..13fe2b683 --- /dev/null +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/response/RightsizeResult.java @@ -0,0 +1,76 @@ +/* + * Copyright 2021 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. + */ + +package com.linkedin.kafka.cruisecontrol.servlet.response; + +import com.google.gson.Gson; +import com.linkedin.cruisecontrol.servlet.parameters.CruiseControlParameters; +import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig; +import com.linkedin.kafka.cruisecontrol.detector.ProvisionerState; +import java.util.HashMap; +import java.util.Map; + +import static com.linkedin.kafka.cruisecontrol.servlet.response.ResponseUtils.*; + + +@JsonResponseClass +public class RightsizeResult extends AbstractCruiseControlResponse { + @JsonResponseField + protected static final String NUM_BROKERS_TO_ADD = "numBrokersToAdd"; + @JsonResponseField + protected static final String PARTITION_COUNT = "partitionCount"; + @JsonResponseField + protected static final String TOPIC = "topic"; + @JsonResponseField + protected static final String PROVISIONER_STATE = "provisionerState"; + @JsonResponseField + protected static final String PROVISIONER_SUMMARY = "provisionerSummary"; + + protected final int _numBrokersToAdd; + protected final int _partitionCount; + protected String _topic; + protected String _provisionState; + protected String _provisionSummary; + + public RightsizeResult(int numBrokersToAdd, + int partitionCount, + String topic, + ProvisionerState provisionerState, + KafkaCruiseControlConfig config) { + super(config); + + _numBrokersToAdd = numBrokersToAdd; + _partitionCount = partitionCount; + _topic = topic; + _provisionState = provisionerState.state().toString(); + _provisionSummary = provisionerState.summary(); + } + + protected String getJSONString() { + Map jsonStructure = new HashMap<>(5); + jsonStructure.put(NUM_BROKERS_TO_ADD, _numBrokersToAdd); + jsonStructure.put(PARTITION_COUNT, _partitionCount); + if (_topic != null && !_topic.isEmpty()) { + jsonStructure.put(TOPIC, _topic); + } + if (_provisionState != null && !_provisionState.isEmpty()) { + jsonStructure.put(PROVISIONER_STATE, _provisionState); + } + if (_provisionSummary != null && !_provisionSummary.isEmpty()) { + jsonStructure.put(PROVISIONER_SUMMARY, _provisionSummary); + } + jsonStructure.put(VERSION, JSON_VERSION); + Gson gson = new Gson(); + return gson.toJson(jsonStructure); + } + + @Override + protected void discardIrrelevantAndCacheRelevant(CruiseControlParameters parameters) { + _cachedResponse = getJSONString(); + // Discard irrelevant response. + _topic = null; + _provisionState = null; + _provisionSummary = null; + } +} diff --git a/cruise-control/src/yaml/base.yaml b/cruise-control/src/yaml/base.yaml index f1ee20a14..43fd0cf8b 100644 --- a/cruise-control/src/yaml/base.yaml +++ b/cruise-control/src/yaml/base.yaml @@ -28,6 +28,8 @@ paths: $ref: 'endpoints/rebalance.yaml#/RebalanceEndpoint' /kafkacruisecontrol/remove_broker: $ref: 'endpoints/removeBroker.yaml#/RemoveBrokerEndpoint' + /kafkacruisecontrol/rightsize: + $ref: 'endpoints/rightsize.yaml#/RightsizeEndpoint' /kafkacruisecontrol/resume_sampling: $ref: 'endpoints/resumeSampling.yaml#/ResumeSamplingEndpoint' /kafkacruisecontrol/review: diff --git a/cruise-control/src/yaml/endpoints/rightsize.yaml b/cruise-control/src/yaml/endpoints/rightsize.yaml new file mode 100644 index 000000000..94ddb8391 --- /dev/null +++ b/cruise-control/src/yaml/endpoints/rightsize.yaml @@ -0,0 +1,63 @@ +RightsizeEndpoint: + post: + operationId: rightsize + summary: Manually invoke provisioner rightsizing of the cluster. + parameters: + - name: num_brokers_to_add + in: query + description: The difference in broker count to rightsize towards. + schema: + type: integer + format: int32 + minimum: 1 + - name: partition_count + in: query + description: The target number of partitions to rightsize towards. + schema: + type: integer + format: int32 + minimum: 1 + - name: topic + in: query + description: Regular expression to specify subject topics. + schema: + type: string # topics regex + default: null + example: topic_%5B0-9%5D.%2A + - name: doAs + in: query + description: The user specified by a trusted proxy in that authentication model. + schema: + type: string + - name: get_response_schema + in: query + description: Whether to return JSON schema in response header or not. + schema: + type: boolean + default: false + - name: json + in: query + description: Whether to return in JSON format or not. + schema: + type: boolean + default: false + responses: + '200': + description: Successful rightsize response. + content: + application/json: + schema: + $ref: '../responses/rightsizeResult.yaml#/RightsizeResult' + text/plain: + schema: + type: string + # Response for all errors + default: + description: Error response. + content: + application/json: + schema: + $ref: '../responses/errorResponse.yaml#/ErrorResponse' + text/plain: + schema: + type: string diff --git a/cruise-control/src/yaml/responses/rightsizeResult.yaml b/cruise-control/src/yaml/responses/rightsizeResult.yaml new file mode 100644 index 000000000..b130d8f9b --- /dev/null +++ b/cruise-control/src/yaml/responses/rightsizeResult.yaml @@ -0,0 +1,25 @@ +RightsizeResult: + type: object + required: + - numBrokersToAdd + - partitionCount + - topic + - provisionerState + - provisionerSummary + - version + properties: + numBrokersToAdd: + type: integer + format: int32 + partitionCount: + type: integer + format: int32 + topic: + type: string + provisionerState: + type: string + provisionerSummary: + type: string + version: + type: integer + format: int32 diff --git a/docs/wiki/User Guide/REST-APIs.md b/docs/wiki/User Guide/REST-APIs.md index a2db25953..9d66fe0df 100644 --- a/docs/wiki/User Guide/REST-APIs.md +++ b/docs/wiki/User Guide/REST-APIs.md @@ -22,6 +22,7 @@ * [Change Kafka topic configuration](#change-kafka-topic-configuration) * [Change Cruise Control configuration](#change-cruise-control-configuration) * [2-step Verification](#2-step-verification) + * [Rightsize the cluster with the Provisioner](#rightsize-the-cluster-with-the-provisioner) ## Asynchronous Endpoints @@ -603,4 +604,20 @@ To drop recently removed/demoted brokers, send POST request like: ### 2-step Verification -2-step verification aims to help users verify the command they (or their peers) intend to run by letting them review requests explicitly to approve or discard them, and enable execution of only the approved requests. Read [2 step verification for POST requests](https://github.com/linkedin/cruise-control/wiki/2-step-verification-for-POST-requests) for more detail. \ No newline at end of file +2-step verification aims to help users verify the command they (or their peers) intend to run by letting them review requests explicitly to approve or discard them, and enable execution of only the approved requests. Read [2 step verification for POST requests](https://github.com/linkedin/cruise-control/wiki/2-step-verification-for-POST-requests) for more detail. + +### Rightsize the cluster with the Provisioner +The following POST request can create a request to the provisioner to rightsize the broker or partition of a cluster. + + POST /kafkacruisecontrol/rightsize + +Supported parameters are: + +| PARAMETER | TYPE | DESCRIPTION | DEFAULT | OPTIONAL | +|-------------------------------------|-----------|-----------------------------------------------------------------------------------|-----------|-----------| +| num_brokers_to_add | integer | difference in broker count to rightsize towards | -1 | yes | +| partition_count | integer | target number of partitions to rightsize towards | -1 | yes | +| topic | regex | regular expression to specify subject topics | null | yes | +| doAs | string | propagated user by the trusted proxy service | null | yes | +| get_response_schema | boolean | return JSON schema in response header or not | false | yes | +| json | boolean | return in JSON format or not | false | yes | From e2b89366eff2b45490aab29605af5b6ebf2cbc52 Mon Sep 17 00:00:00 2001 From: Kaitlyn Paglia Date: Thu, 12 Aug 2021 18:19:06 -0400 Subject: [PATCH 3/5] Add a sanity check to the rightsize endpoint resource --- .../handler/sync/RightsizeRequest.java | 68 +++++++++++++++---- .../servlet/response/RightsizeResult.java | 36 +++++----- .../src/yaml/responses/rightsizeResult.yaml | 6 -- 3 files changed, 69 insertions(+), 41 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/sync/RightsizeRequest.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/sync/RightsizeRequest.java index 178c327ba..c5668078b 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/sync/RightsizeRequest.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/sync/RightsizeRequest.java @@ -29,6 +29,7 @@ public class RightsizeRequest extends AbstractSyncRequest { private static final String RECOMMENDER_UP = "Recommender-Under-Provisioned"; protected KafkaCruiseControl _kafkaCruiseControl; protected RightsizeParameters _parameters; + protected String _topicName; public RightsizeRequest() { super(); @@ -42,26 +43,63 @@ protected RightsizeResult handle() { Provisioner provisioner = config.getConfiguredInstance(AnomalyDetectorConfig.PROVISIONER_CLASS_CONFIG, Provisioner.class, overrideConfigs); - Supplier> topicNameSupplier = () -> _kafkaCruiseControl.kafkaCluster().topics(); - Set topicNamesMatchedWithPattern = Utils.getTopicNamesMatchedWithPattern(_parameters.topic(), topicNameSupplier); - String topicName; - if (topicNamesMatchedWithPattern.size() > 1) { - throw new IllegalArgumentException(String.format("The RightsizeEndpoint does not support provisioning for multiple topics {%s}.", - String.join(" ,", topicNamesMatchedWithPattern))); - } else { - topicName = topicNamesMatchedWithPattern.iterator().next(); - } - ProvisionRecommendation recommendation = - new ProvisionRecommendation.Builder(ProvisionStatus.UNDER_PROVISIONED).numBrokers(_parameters.numBrokersToAdd()) - .numPartitions(_parameters.partitionCount()) - .topic(topicName) - .build(); + ProvisionRecommendation recommendation = sanityCheckResources(); Map provisionRecommendation; provisionRecommendation = Collections.singletonMap(RECOMMENDER_UP, recommendation); ProvisionerState provisionerState = provisioner.rightsize(provisionRecommendation, new RightsizeOptions()); - return new RightsizeResult(_parameters.numBrokersToAdd(), _parameters.partitionCount(), topicName, provisionerState, config); + return new RightsizeResult(_parameters.numBrokersToAdd(), _parameters.partitionCount(), _topicName, provisionerState, config); + } + + /** + * Ensure that + *
    + *
  • exactly one resource type is set
  • + *
  • if the resource type is partition, then the corresponding topic must be specified; otherwise, the topic cannot be specified
  • + *
  • if the resource type is partition, then only one topic must be specified
  • + *
+ * + * @return The {@link ProvisionRecommendation} to recommend for rightsizing + * @throws IllegalArgumentException when the sanity check fails. + */ + private ProvisionRecommendation sanityCheckResources() throws IllegalArgumentException { + ProvisionRecommendation recommendation; + + // Validate multiple resources are not set + if (_parameters.numBrokersToAdd() != ProvisionRecommendation.DEFAULT_OPTIONAL_INT + && _parameters.partitionCount() == ProvisionRecommendation.DEFAULT_OPTIONAL_INT) { + //Validate the topic cannot be specified when the resource type is not partition. + if (_parameters.topic() != null) { + throw new IllegalArgumentException("When the resource type is not partition, topic cannot be specified."); + } + recommendation = new ProvisionRecommendation.Builder(ProvisionStatus.UNDER_PROVISIONED).numBrokers(_parameters.numBrokersToAdd()) + .build(); + } else if (_parameters.numBrokersToAdd() == ProvisionRecommendation.DEFAULT_OPTIONAL_INT + && _parameters.partitionCount() != ProvisionRecommendation.DEFAULT_OPTIONAL_INT) { + // Validate the topic parameter is not null + if (_parameters.topic() == null) { + throw new IllegalArgumentException("When the resource type is partition, the corresponding topic must be specified."); + } + + // Validate multiple topics were not provided for provisioning + Supplier> topicNameSupplier = () -> _kafkaCruiseControl.kafkaCluster().topics(); + Set topicNamesMatchedWithPattern = Utils.getTopicNamesMatchedWithPattern(_parameters.topic(), topicNameSupplier); + if (topicNamesMatchedWithPattern.size() != 1) { + throw new IllegalArgumentException(String.format("The RightsizeEndpoint does not support provisioning for multiple topics {%s}.", + String.join(" ,", topicNamesMatchedWithPattern))); + } else { + _topicName = topicNamesMatchedWithPattern.iterator().next(); + } + recommendation = new ProvisionRecommendation.Builder(ProvisionStatus.UNDER_PROVISIONED).numPartitions(_parameters.partitionCount()) + .topic(_topicName) + .build(); + } else { + throw new IllegalArgumentException(String.format("Exactly one resource type must be set (Brokers:%d Partitions:%d))", + _parameters.numBrokersToAdd(), + _parameters.partitionCount())); + } + return recommendation; } @Override diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/response/RightsizeResult.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/response/RightsizeResult.java index 13fe2b683..506a36956 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/response/RightsizeResult.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/response/RightsizeResult.java @@ -6,32 +6,31 @@ import com.google.gson.Gson; import com.linkedin.cruisecontrol.servlet.parameters.CruiseControlParameters; +import com.linkedin.kafka.cruisecontrol.analyzer.ProvisionRecommendation; import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig; import com.linkedin.kafka.cruisecontrol.detector.ProvisionerState; import java.util.HashMap; import java.util.Map; -import static com.linkedin.kafka.cruisecontrol.servlet.response.ResponseUtils.*; +import static com.linkedin.kafka.cruisecontrol.servlet.response.ResponseUtils.VERSION; +import static com.linkedin.kafka.cruisecontrol.servlet.response.ResponseUtils.JSON_VERSION; @JsonResponseClass public class RightsizeResult extends AbstractCruiseControlResponse { - @JsonResponseField + @JsonResponseField(required = false) protected static final String NUM_BROKERS_TO_ADD = "numBrokersToAdd"; - @JsonResponseField + @JsonResponseField(required = false) protected static final String PARTITION_COUNT = "partitionCount"; - @JsonResponseField + @JsonResponseField(required = false) protected static final String TOPIC = "topic"; @JsonResponseField protected static final String PROVISIONER_STATE = "provisionerState"; - @JsonResponseField - protected static final String PROVISIONER_SUMMARY = "provisionerSummary"; protected final int _numBrokersToAdd; protected final int _partitionCount; protected String _topic; - protected String _provisionState; - protected String _provisionSummary; + protected String _provisionerState; public RightsizeResult(int numBrokersToAdd, int partitionCount, @@ -43,23 +42,21 @@ public RightsizeResult(int numBrokersToAdd, _numBrokersToAdd = numBrokersToAdd; _partitionCount = partitionCount; _topic = topic; - _provisionState = provisionerState.state().toString(); - _provisionSummary = provisionerState.summary(); + _provisionerState = provisionerState.toString(); } protected String getJSONString() { Map jsonStructure = new HashMap<>(5); - jsonStructure.put(NUM_BROKERS_TO_ADD, _numBrokersToAdd); - jsonStructure.put(PARTITION_COUNT, _partitionCount); - if (_topic != null && !_topic.isEmpty()) { - jsonStructure.put(TOPIC, _topic); + if (_numBrokersToAdd != ProvisionRecommendation.DEFAULT_OPTIONAL_INT) { + jsonStructure.put(NUM_BROKERS_TO_ADD, _numBrokersToAdd); } - if (_provisionState != null && !_provisionState.isEmpty()) { - jsonStructure.put(PROVISIONER_STATE, _provisionState); + if (_partitionCount != ProvisionRecommendation.DEFAULT_OPTIONAL_INT) { + jsonStructure.put(PARTITION_COUNT, _partitionCount); } - if (_provisionSummary != null && !_provisionSummary.isEmpty()) { - jsonStructure.put(PROVISIONER_SUMMARY, _provisionSummary); + if (_topic != null && !_topic.isEmpty()) { + jsonStructure.put(TOPIC, _topic); } + jsonStructure.put(PROVISIONER_STATE, _provisionerState); jsonStructure.put(VERSION, JSON_VERSION); Gson gson = new Gson(); return gson.toJson(jsonStructure); @@ -70,7 +67,6 @@ protected void discardIrrelevantAndCacheRelevant(CruiseControlParameters paramet _cachedResponse = getJSONString(); // Discard irrelevant response. _topic = null; - _provisionState = null; - _provisionSummary = null; + _provisionerState = null; } } diff --git a/cruise-control/src/yaml/responses/rightsizeResult.yaml b/cruise-control/src/yaml/responses/rightsizeResult.yaml index b130d8f9b..0111578af 100644 --- a/cruise-control/src/yaml/responses/rightsizeResult.yaml +++ b/cruise-control/src/yaml/responses/rightsizeResult.yaml @@ -1,11 +1,7 @@ RightsizeResult: type: object required: - - numBrokersToAdd - - partitionCount - - topic - provisionerState - - provisionerSummary - version properties: numBrokersToAdd: @@ -18,8 +14,6 @@ RightsizeResult: type: string provisionerState: type: string - provisionerSummary: - type: string version: type: integer format: int32 From 2598f16fd3c9341011d80d51310600a32293256a Mon Sep 17 00:00:00 2001 From: Kaitlyn Paglia Date: Thu, 12 Aug 2021 20:03:04 -0400 Subject: [PATCH 4/5] Update provision recommendation validation checks --- .../handler/sync/RightsizeRequest.java | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/sync/RightsizeRequest.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/sync/RightsizeRequest.java index c5668078b..b6dc131b0 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/sync/RightsizeRequest.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/sync/RightsizeRequest.java @@ -13,6 +13,7 @@ import com.linkedin.kafka.cruisecontrol.detector.Provisioner; import com.linkedin.kafka.cruisecontrol.detector.ProvisionerState; import com.linkedin.kafka.cruisecontrol.detector.RightsizeOptions; +import com.linkedin.kafka.cruisecontrol.servlet.UserRequestException; import com.linkedin.kafka.cruisecontrol.servlet.parameters.RightsizeParameters; import com.linkedin.kafka.cruisecontrol.servlet.response.RightsizeResult; import java.util.Collections; @@ -43,7 +44,7 @@ protected RightsizeResult handle() { Provisioner provisioner = config.getConfiguredInstance(AnomalyDetectorConfig.PROVISIONER_CLASS_CONFIG, Provisioner.class, overrideConfigs); - ProvisionRecommendation recommendation = sanityCheckResources(); + ProvisionRecommendation recommendation = createProvisionRecommendation(); Map provisionRecommendation; provisionRecommendation = Collections.singletonMap(RECOMMENDER_UP, recommendation); @@ -53,7 +54,7 @@ protected RightsizeResult handle() { } /** - * Ensure that + * Create a provision recommendation and ensure that *
    *
  • exactly one resource type is set
  • *
  • if the resource type is partition, then the corresponding topic must be specified; otherwise, the topic cannot be specified
  • @@ -61,9 +62,9 @@ protected RightsizeResult handle() { *
* * @return The {@link ProvisionRecommendation} to recommend for rightsizing - * @throws IllegalArgumentException when the sanity check fails. + * @throws UserRequestException when the sanity check fails. */ - private ProvisionRecommendation sanityCheckResources() throws IllegalArgumentException { + private ProvisionRecommendation createProvisionRecommendation() throws UserRequestException { ProvisionRecommendation recommendation; // Validate multiple resources are not set @@ -71,23 +72,25 @@ private ProvisionRecommendation sanityCheckResources() throws IllegalArgumentExc && _parameters.partitionCount() == ProvisionRecommendation.DEFAULT_OPTIONAL_INT) { //Validate the topic cannot be specified when the resource type is not partition. if (_parameters.topic() != null) { - throw new IllegalArgumentException("When the resource type is not partition, topic cannot be specified."); + throw new UserRequestException("When the resource type is not partition, topic cannot be specified."); } recommendation = new ProvisionRecommendation.Builder(ProvisionStatus.UNDER_PROVISIONED).numBrokers(_parameters.numBrokersToAdd()) .build(); } else if (_parameters.numBrokersToAdd() == ProvisionRecommendation.DEFAULT_OPTIONAL_INT && _parameters.partitionCount() != ProvisionRecommendation.DEFAULT_OPTIONAL_INT) { - // Validate the topic parameter is not null + // Validate the topic pattern is not null if (_parameters.topic() == null) { - throw new IllegalArgumentException("When the resource type is partition, the corresponding topic must be specified."); + throw new UserRequestException("When the resource type is partition, the corresponding topic must be specified."); } - // Validate multiple topics were not provided for provisioning Supplier> topicNameSupplier = () -> _kafkaCruiseControl.kafkaCluster().topics(); Set topicNamesMatchedWithPattern = Utils.getTopicNamesMatchedWithPattern(_parameters.topic(), topicNameSupplier); if (topicNamesMatchedWithPattern.size() != 1) { - throw new IllegalArgumentException(String.format("The RightsizeEndpoint does not support provisioning for multiple topics {%s}.", + throw new UserRequestException(String.format("The RightsizeEndpoint does not support provisioning for multiple topics {%s}.", String.join(" ,", topicNamesMatchedWithPattern))); + } else if (topicNamesMatchedWithPattern.iterator().next().isEmpty()) { + // Validate the topic is not empty + throw new UserRequestException("When the resource type is partition, the corresponding topic must be specified."); } else { _topicName = topicNamesMatchedWithPattern.iterator().next(); } @@ -95,7 +98,7 @@ private ProvisionRecommendation sanityCheckResources() throws IllegalArgumentExc .topic(_topicName) .build(); } else { - throw new IllegalArgumentException(String.format("Exactly one resource type must be set (Brokers:%d Partitions:%d))", + throw new UserRequestException(String.format("Exactly one resource type must be set (Brokers:%d Partitions:%d))", _parameters.numBrokersToAdd(), _parameters.partitionCount())); } From b6b711ee3309f7b95423b060f67ba990d328974c Mon Sep 17 00:00:00 2001 From: Adem Efe Gencer Date: Thu, 12 Aug 2021 17:22:57 -0700 Subject: [PATCH 5/5] Minor: If a topic is found, it cannot be empty. --- .../cruisecontrol/servlet/handler/sync/RightsizeRequest.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/sync/RightsizeRequest.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/sync/RightsizeRequest.java index b6dc131b0..54f2ac83f 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/sync/RightsizeRequest.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/sync/RightsizeRequest.java @@ -88,9 +88,6 @@ private ProvisionRecommendation createProvisionRecommendation() throws UserReque if (topicNamesMatchedWithPattern.size() != 1) { throw new UserRequestException(String.format("The RightsizeEndpoint does not support provisioning for multiple topics {%s}.", String.join(" ,", topicNamesMatchedWithPattern))); - } else if (topicNamesMatchedWithPattern.iterator().next().isEmpty()) { - // Validate the topic is not empty - throw new UserRequestException("When the resource type is partition, the corresponding topic must be specified."); } else { _topicName = topicNamesMatchedWithPattern.iterator().next(); }