Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Setup ProvisionerState for rightsizing clusters #1646

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public final class ProvisionRecommendation {
protected final int _numBrokers;
protected final int _numRacks;
protected final int _numDisks;
//TODO: Update the ProvisionRecommendation to provide a set of <topic, numPartition> for batch provisioning of partitions
//See <a href="https://github.com/linkedin/cruise-control/issues/1650">Issue #1650</a>
protected final int _numPartitions;
// If the resource is partition, the name of the topic must be specified.
protected final String _topic;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.linkedin.kafka.cruisecontrol.servlet.parameters.RemoveBrokerParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.ReviewBoardParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.ReviewParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.RightsizeParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.StopProposalParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.TopicConfigurationParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.TrainParameters;
Expand Down Expand Up @@ -171,6 +172,13 @@ public final class CruiseControlParametersConfig {
public static final String DEFAULT_TOPIC_CONFIGURATION_PARAMETERS_CLASS = TopicConfigurationParameters.class.getName();
public static final String TOPIC_CONFIGURATION_PARAMETERS_CLASS_DOC = "The class for parameters of a topic configuration request.";

/**
* <code>rightsize.parameters.class</code>
*/
public static final String RIGHTSIZE_PARAMETERS_CLASS_CONFIG = "rightsize.parameters.class";
public static final String DEFAULT_RIGHTSIZE_PARAMETERS_CLASS = RightsizeParameters.class.getName();
public static final String RIGHTSIZE_PARAMETERS_CLASS_DOC = "The class for parameters of a provision rightsize request.";

private CruiseControlParametersConfig() {
}

Expand Down Expand Up @@ -280,6 +288,11 @@ public static ConfigDef define(ConfigDef configDef) {
ConfigDef.Type.CLASS,
DEFAULT_TOPIC_CONFIGURATION_PARAMETERS_CLASS,
ConfigDef.Importance.MEDIUM,
TOPIC_CONFIGURATION_PARAMETERS_CLASS_DOC);
TOPIC_CONFIGURATION_PARAMETERS_CLASS_DOC)
.define(RIGHTSIZE_PARAMETERS_CLASS_CONFIG,
ConfigDef.Type.CLASS,
DEFAULT_RIGHTSIZE_PARAMETERS_CLASS,
ConfigDef.Importance.MEDIUM,
RIGHTSIZE_PARAMETERS_CLASS_DOC);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.ProposalsRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.RebalanceRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.RemoveBrokerRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.sync.RightsizeRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.TopicConfigurationRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.sync.AdminRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.sync.BootstrapRequest;
Expand Down Expand Up @@ -173,6 +174,13 @@ public final class CruiseControlRequestConfig {
public static final String DEFAULT_TOPIC_CONFIGURATION_REQUEST_CLASS = TopicConfigurationRequest.class.getName();
public static final String TOPIC_CONFIGURATION_REQUEST_CLASS_DOC = "The class to handle a topic configuration request.";

/**
* <code>rightsize.request.class</code>
*/
public static final String RIGHTSIZE_REQUEST_CLASS_CONFIG = "rightsize.request.class";
public static final String DEFAULT_RIGHTSIZE_REQUEST_CLASS = RightsizeRequest.class.getName();
public static final String RIGHTSIZE_REQUEST_CLASS_DOC = "The class to handle a provision rightsize request.";

private CruiseControlRequestConfig() {
}

Expand Down Expand Up @@ -282,6 +290,11 @@ public static ConfigDef define(ConfigDef configDef) {
ConfigDef.Type.CLASS,
DEFAULT_TOPIC_CONFIGURATION_REQUEST_CLASS,
ConfigDef.Importance.MEDIUM,
TOPIC_CONFIGURATION_REQUEST_CLASS_DOC);
TOPIC_CONFIGURATION_REQUEST_CLASS_DOC)
.define(RIGHTSIZE_REQUEST_CLASS_CONFIG,
ConfigDef.Type.CLASS,
DEFAULT_RIGHTSIZE_REQUEST_CLASS,
ConfigDef.Importance.MEDIUM,
RIGHTSIZE_REQUEST_CLASS_DOC);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,10 @@ public void run() {
}
_provisionResponse = provisionResponse;
if (_isProvisionerEnabled) {
// Right-size the cluster (if needed)
boolean isRightsized = _provisioner.rightsize(_provisionResponse.recommendationByRecommender());
if (isRightsized) {
LOG.info("Actions have been taken on the cluster towards rightsizing.");
// Rightsize the cluster (if needed)
ProvisionerState isRightsized = _provisioner.rightsize(_provisionResponse.recommendationByRecommender(), new RightsizeOptions());
if (isRightsized != null) {
LOG.info(isRightsized.toString());
}
}
Map<Boolean, List<String>> violatedGoalsByFixability = goalViolations.violatedGoalsByFixability();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@
*/
public class NoopProvisioner implements Provisioner {
@Override
public ProvisionerState rightsize(Map<String, ProvisionRecommendation> recommendationByRecommender, RightsizeOptions rightsizeOptions) {
return null;
}

@Override
@Deprecated
public boolean rightsize(Map<String, ProvisionRecommendation> recommendationByRecommender) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,27 @@ public interface Provisioner extends CruiseControlConfigurable {
*
* @param recommendationByRecommender Provision recommendations provided by corresponding recommenders.
* @return {@code true} if actions have been taken on the cluster towards rightsizing, {@code false} otherwise.
* @deprecated Will be removed in a future release -- please use {@link #rightsize(Map, RightsizeOptions)}
*/
@Deprecated
boolean rightsize(Map<String, ProvisionRecommendation> recommendationByRecommender);

/**
* Rightsize the cluster using the given constraints. All given recommendations are expected to share the same {@link ProvisionStatus}.
* Implementations of this function are expected to be non-blocking -- i.e. starts the rightsizing, but does not block until the completion.
*
* <ul>
* <li>For {@link ProvisionStatus#UNDER_PROVISIONED} clusters, each recommender (e.g. goal name) indicates requested resources
* (e.g. number of brokers) along with relevant constraints (e.g. racks for which brokers should not be added). Typically, aggregating
* different recommendations for the same resource type requires considering the maximum value over all recommendations.</li>
* <li>For {@link ProvisionStatus#OVER_PROVISIONED} clusters, each recommender (e.g. goal name) indicates resources that can be
* released (e.g. number of brokers) along with relevant constraints (e.g. expected broker capacity). Typically, aggregating
* different recommendations for the same resource type requires considering the minimum value over all recommendations.</li>
* </ul>
*
* @param recommendationByRecommender Provision recommendations provided by corresponding recommenders.
* @param rightsizeOptions Rightsize options to take into account when rightsizing the cluster.
* @return {@link ProvisionerState} of actions taken on the cluster towards rightsizing or null if no actions were taken.
*/
ProvisionerState rightsize(Map<String, ProvisionRecommendation> recommendationByRecommender, RightsizeOptions rightsizeOptions);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright 2021 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
*/

package com.linkedin.kafka.cruisecontrol.detector;

import com.linkedin.cruisecontrol.common.utils.Utils;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;


/**
* A class to indicate how a provisioning action is handled
*/
public class ProvisionerState {
kaitlynp1206 marked this conversation as resolved.
Show resolved Hide resolved
private static final Map<State, Set<State>> VALID_TRANSFER = new HashMap<>();
private State _state;
private String _summary;
private final long _createdMs;
private long _updatedMs;

static {
VALID_TRANSFER.put(State.IN_PROGRESS, new HashSet<>(Collections.singleton(State.IN_PROGRESS)));
VALID_TRANSFER.put(State.COMPLETED_WITH_ERROR, new HashSet<>(Arrays.asList(State.IN_PROGRESS, State.COMPLETED_WITH_ERROR)));
VALID_TRANSFER.put(State.COMPLETED, new HashSet<>(Arrays.asList(State.IN_PROGRESS, State.COMPLETED_WITH_ERROR, State.COMPLETED)));
}

public ProvisionerState(State state, String summary) {
_state = state;
_summary = Utils.validateNotNull(summary, "ProvisionerState summary cannot be null.");
_createdMs = System.currentTimeMillis();
_updatedMs = _createdMs;
}

/**
* Check if the state transfer is possible.
* @param targetState the state to transfer to.
* @return True if the transfer is valid, false otherwise.
*/
public boolean canTransferToState(ProvisionerState.State targetState) {
return VALID_TRANSFER.get(_state).contains(targetState);
}

/**
* @return The state of the provisioning action.
*/
public State state() {
return _state;
}

/**
* @return The summary of the provisioning action status.
*/
public String summary() {
return _summary;
}

/**
* @return The time the provisioner state was created in milliseconds.
*/
public long createdMs() {
return _createdMs;
}

/**
* @return The status update time of the provision state in milliseconds.
*/
public long updatedMs() {
return _updatedMs;
}

/**
* Update the state and summary of the provisioning action
*
* @param state The new state of the provisioning action.
* @param summary The new summary of the provisioning action status.
* @throws IllegalArgumentException if the summary is null.
* @throws IllegalStateException if the target state is not a valid target state.
*/
public void update(State state, String summary) {
if (canTransferToState(state)) {
_state = state;
_summary = Utils.validateNotNull(summary, "ProvisionerState summary cannot be null.");
_updatedMs = System.currentTimeMillis();
} else {
throw new IllegalStateException("Cannot set the provisioner state from " + _state.toString() + " to " + state.toString()
+ ". The valid target states are " + Collections.unmodifiableSet(VALID_TRANSFER.get(_state)));
}
}

public enum State {
COMPLETED, COMPLETED_WITH_ERROR, IN_PROGRESS
}

@Override
public String toString() {
return String.format("ProvisionerState:{state: %s, summary: %s, createdMs: %d, updated: %d}",
_state.toString(),
_summary,
_createdMs,
_updatedMs);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2021 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
*/

package com.linkedin.kafka.cruisecontrol.detector;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;

import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.maybeIncreasePartitionCount;

/**
* A util class for provisions.
*/
public final class ProvisionerUtils {

private ProvisionerUtils() {
}

/**
* Determine the status of increasing the partition count
*
* @param adminClient AdminClient to handle partition count increases.
* @param topicToAddPartitions Existing topic to add more partitions if needed
* @return The state {@link ProvisionerState.State#COMPLETED} when true, {@link ProvisionerState.State#COMPLETED_WITH_ERROR} when false
efeg marked this conversation as resolved.
Show resolved Hide resolved
*/
public static ProvisionerState partitionIncreaseStatus(AdminClient adminClient, NewTopic topicToAddPartitions) {
if (maybeIncreasePartitionCount(adminClient, topicToAddPartitions)) {
String summary = String.format("Provisioning the partition count to increase to %d for the topic %s was successfully completed.",
topicToAddPartitions.numPartitions(),
topicToAddPartitions.name());
return new ProvisionerState(ProvisionerState.State.COMPLETED, summary);
} else {
String summary = String.format("Provisioning the partition count to increase to %d for the topic %s was rejected.",
topicToAddPartitions.numPartitions(),
topicToAddPartitions.name());
return new ProvisionerState(ProvisionerState.State.COMPLETED_WITH_ERROR, summary);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright 2021 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
*/

package com.linkedin.kafka.cruisecontrol.detector;

import java.util.Map;
import org.apache.kafka.common.annotation.InterfaceStability;


/**
* A class to indicate options intended to be used during application of a {@link Provisioner#rightsize(Map, RightsizeOptions)}.
*/
@InterfaceStability.Evolving
public final class RightsizeOptions {
efeg marked this conversation as resolved.
Show resolved Hide resolved
public RightsizeOptions() {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public enum CruiseControlEndPoint implements EndPoint {
REVIEW_BOARD(CRUISE_CONTROL_MONITOR),
ADMIN(CRUISE_CONTROL_ADMIN),
REVIEW(CRUISE_CONTROL_ADMIN),
TOPIC_CONFIGURATION(KAFKA_ADMIN);
TOPIC_CONFIGURATION(KAFKA_ADMIN),
RIGHTSIZE(KAFKA_ADMIN);
lmr3796 marked this conversation as resolved.
Show resolved Hide resolved

private static final List<CruiseControlEndPoint> CACHED_VALUES = Collections.unmodifiableList(Arrays.asList(values()));
private static final List<CruiseControlEndPoint> GET_ENDPOINTS = Arrays.asList(BOOTSTRAP,
Expand All @@ -55,7 +56,8 @@ public enum CruiseControlEndPoint implements EndPoint {
DEMOTE_BROKER,
ADMIN,
REVIEW,
TOPIC_CONFIGURATION);
TOPIC_CONFIGURATION,
RIGHTSIZE);

private final EndpointType _endpointType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ public final class KafkaCruiseControlServletUtils {
RequestParameterWrapper topicConfiguration = new RequestParameterWrapper(TOPIC_CONFIGURATION_PARAMETERS_CLASS_CONFIG,
TOPIC_CONFIGURATION_PARAMETER_OBJECT_CONFIG,
TOPIC_CONFIGURATION_REQUEST_CLASS_CONFIG);
RequestParameterWrapper rightsize = new RequestParameterWrapper(RIGHTSIZE_PARAMETERS_CLASS_CONFIG,
RIGHTSIZE_PARAMETER_OBJECT_CONFIG,
RIGHTSIZE_REQUEST_CLASS_CONFIG);

requestParameterConfigs.put(BOOTSTRAP, bootstrap);
requestParameterConfigs.put(TRAIN, train);
Expand All @@ -132,6 +135,7 @@ public final class KafkaCruiseControlServletUtils {
requestParameterConfigs.put(REVIEW, review);
requestParameterConfigs.put(REVIEW_BOARD, reviewBoard);
requestParameterConfigs.put(TOPIC_CONFIGURATION, topicConfiguration);
requestParameterConfigs.put(RIGHTSIZE, rightsize);

REQUEST_PARAMETER_CONFIGS = Collections.unmodifiableMap(requestParameterConfigs);
}
Expand Down
Loading