Skip to content

Commit

Permalink
Provide recommendations on the estimated resource requirements (#1470)
Browse files Browse the repository at this point in the history
  • Loading branch information
efeg committed Feb 22, 2021
1 parent fe4999d commit fce5139
Show file tree
Hide file tree
Showing 38 changed files with 561 additions and 232 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ public static Set<ExecutionProposal> getDiff(Map<TopicPartition, List<ReplicaPla
finalReplicas.set(leaderPos, finalReplicas.get(0));
finalReplicas.set(0, finalLeaderPlacementInfo);
}
Double partitionSize = optimizedClusterModel.partition(tp).leader().load().expectedUtilizationFor(Resource.DISK);
diff.add(new ExecutionProposal(tp, partitionSize.intValue(), initialLeaderDistribution.get(tp), initialReplicas, finalReplicas));
double partitionSize = optimizedClusterModel.partition(tp).leader().load().expectedUtilizationFor(Resource.DISK);
diff.add(new ExecutionProposal(tp, (int) partitionSize, initialLeaderDistribution.get(tp), initialReplicas, finalReplicas));
}
return diff;
}
Expand All @@ -128,41 +128,6 @@ public static ActionAcceptance isProposalAcceptableForOptimizedGoals(Set<Goal> o
return ACCEPT;
}

/**
* Retrieve the aggregate of the given provision statuses using the following rules: Aggregating ...
* <ul>
* <li>any provision status with {@link ProvisionStatus#UNDER_PROVISIONED} is {@link ProvisionStatus#UNDER_PROVISIONED}.</li>
* <li>a provision status {@code P} with {@link ProvisionStatus#UNDECIDED} is {@code P}.</li>
* <li>{@link ProvisionStatus#RIGHT_SIZED} with {@link ProvisionStatus#RIGHT_SIZED} or {@link ProvisionStatus#OVER_PROVISIONED}
* is {@link ProvisionStatus#RIGHT_SIZED}.</li>
* <li>{@link ProvisionStatus#OVER_PROVISIONED} with {@link ProvisionStatus#OVER_PROVISIONED} yields itself.</li>
* </ul>
*
* @param status1 The first provision status to aggregate.
* @param status2 The second provision status to aggregate.
* @return The aggregate of the given provision statuses.
*/
public static ProvisionStatus aggregateProvisionStatus(ProvisionStatus status1, ProvisionStatus status2) {
if (status1 == ProvisionStatus.UNDER_PROVISIONED) {
return ProvisionStatus.UNDER_PROVISIONED;
}
switch (status2) {
case UNDER_PROVISIONED:
return ProvisionStatus.UNDER_PROVISIONED;
case RIGHT_SIZED:
return ProvisionStatus.RIGHT_SIZED;
case OVER_PROVISIONED:
if (status1 == ProvisionStatus.RIGHT_SIZED) {
return ProvisionStatus.RIGHT_SIZED;
}
return ProvisionStatus.OVER_PROVISIONED;
case UNDECIDED:
return status1;
default:
throw new IllegalArgumentException("Unsupported provision status " + status2 + " is provided.");
}
}

/**
* Compare the given values. Return 1 if first &gt; second, -1 if first &lt; second, 0 otherwise.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ public OptimizerResult optimizations(ClusterModel clusterModel,
Map<TopicPartition, List<ReplicaPlacementInfo>> preOptimizedReplicaDistribution = null;
Map<TopicPartition, ReplicaPlacementInfo> preOptimizedLeaderDistribution = null;

ProvisionStatus provisionStatus = ProvisionStatus.UNDECIDED;
ProvisionResponse provisionResponse = new ProvisionResponse(ProvisionStatus.UNDECIDED);
for (Goal goal : goalsByPriority) {
preOptimizedReplicaDistribution = preOptimizedReplicaDistribution == null ? initReplicaDistribution : clusterModel.getReplicaDistribution();
preOptimizedLeaderDistribution = preOptimizedLeaderDistribution == null ? initLeaderDistribution : clusterModel.getLeaderDistribution();
Expand All @@ -458,7 +458,7 @@ public OptimizerResult optimizations(ClusterModel clusterModel,
if (LOG.isDebugEnabled()) {
LOG.debug("Broker level stats after optimization: {}", clusterModel.brokerStats(null));
}
provisionStatus = AnalyzerUtils.aggregateProvisionStatus(provisionStatus, goal.provisionStatus());
provisionResponse.aggregate(goal.provisionResponse());
}

// Broker level stats in the final cluster state.
Expand All @@ -485,7 +485,7 @@ public OptimizerResult optimizations(ClusterModel clusterModel,
clusterModel.capacityEstimationInfoByBrokerId(),
optimizationOptions,
balancednessCostByGoal(goalsByPriority, _priorityWeight, _strictnessWeight),
provisionStatus);
provisionResponse);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package com.linkedin.kafka.cruisecontrol.analyzer;

import com.linkedin.cruisecontrol.common.CruiseControlConfigurable;
import com.linkedin.kafka.cruisecontrol.exception.KafkaCruiseControlException;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import java.util.Set;
import org.apache.kafka.common.annotation.InterfaceStability;
Expand All @@ -27,23 +26,19 @@ public interface OptimizationOptionsGenerator extends CruiseControlConfigurable
* @param excludedBrokersForReplicaMove The brokers to be specified to not considered for replica movement in
* generated optimization options.
* @return An object of {@link OptimizationOptions}.
* @throws KafkaCruiseControlException If anything goes wrong.
*/
OptimizationOptions optimizationOptionsForGoalViolationDetection(ClusterModel clusterModel,
Set<String> excludedTopics,
Set<Integer> excludedBrokersForLeadership,
Set<Integer> excludedBrokersForReplicaMove)
throws KafkaCruiseControlException;
Set<Integer> excludedBrokersForReplicaMove);

/**
* Generate optimization options used to calculate cached optimization proposal.
*
* @param clusterModel The cluster model used to generate optimization options.
* @param excludedTopics The topics to be specified to exclude in generated optimization options.
* @return An object of {@link OptimizationOptions}.
* @throws KafkaCruiseControlException If anything goes wrong.
*/
OptimizationOptions optimizationOptionsForCachedProposalCalculation(ClusterModel clusterModel,
Set<String> excludedTopics)
throws KafkaCruiseControlException;
Set<String> excludedTopics);
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public class OptimizerResult {
private static final String ON_DEMAND_BALANCEDNESS_SCORE_BEFORE = "onDemandBalancednessScoreBefore";
@JsonResponseField
private static final String PROVISION_STATUS = "provisionStatus";
@JsonResponseField
private static final String PROVISION_RECOMMENDATION = "provisionRecommendation";
private static final String VIOLATED = "VIOLATED";
private static final String FIXED = "FIXED";
private static final String NO_ACTION = "NO-ACTION";
Expand All @@ -71,7 +73,7 @@ public class OptimizerResult {
private final OptimizationOptions _optimizationOptions;
private final double _onDemandBalancednessScoreBefore;
private final double _onDemandBalancednessScoreAfter;
private final ProvisionStatus _provisionStatus;
private final ProvisionResponse _provisionResponse;

OptimizerResult(LinkedHashMap<Goal, ClusterModelStats> statsByGoalPriority,
Set<String> violatedGoalNamesBeforeOptimization,
Expand All @@ -84,7 +86,7 @@ public class OptimizerResult {
Map<Integer, String> capacityEstimationInfoByBrokerId,
OptimizationOptions optimizationOptions,
Map<String, Double> balancednessCostByGoal,
ProvisionStatus provisionStatus) {
ProvisionResponse provisionResponse) {
_clusterModelStatsComparatorByGoalName = new LinkedHashMap<>(statsByGoalPriority.size());
_statsByGoalName = new LinkedHashMap<>(statsByGoalPriority.size());
for (Map.Entry<Goal, ClusterModelStats> entry : statsByGoalPriority.entrySet()) {
Expand All @@ -106,7 +108,7 @@ public class OptimizerResult {
// Populate on-demand balancedness score before and after.
_onDemandBalancednessScoreBefore = onDemandBalancednessScore(balancednessCostByGoal, _violatedGoalNamesBeforeOptimization);
_onDemandBalancednessScoreAfter = onDemandBalancednessScore(balancednessCostByGoal, _violatedGoalNamesAfterOptimization);
_provisionStatus = provisionStatus;
_provisionResponse = provisionResponse;
}

private double onDemandBalancednessScore(Map<String, Double> balancednessCostByGoal, Set<String> violatedGoals) {
Expand Down Expand Up @@ -261,15 +263,17 @@ private List<Number> getMovementStats() {
*/
public String getProposalSummary() {
List<Number> moveStats = getMovementStats();
String recommendation = _provisionResponse.recommendation();
return String.format("%n%nOptimization has %d inter-broker replica(%d MB) moves, %d intra-broker replica(%d MB) moves"
+ " and %d leadership moves with a cluster model of %d recent windows and %.3f%% of the partitions"
+ " covered.%nExcluded Topics: %s.%nExcluded Brokers For Leadership: %s.%nExcluded Brokers For "
+ "Replica Move: %s.%nCounts: %s%nOn-demand Balancedness Score Before (%.3f) After(%.3f).%nProvision Status: %s.",
+ "Replica Move: %s.%nCounts: %s%nOn-demand Balancedness Score Before (%.3f) After(%.3f).%nProvision Status: %s.%s",
moveStats.get(0).intValue(), moveStats.get(1).longValue(), moveStats.get(2).intValue(),
moveStats.get(3).longValue(), moveStats.get(4).intValue(), _clusterModelStats.numWindows(),
_clusterModelStats.monitoredPartitionsPercentage(), excludedTopics(),
excludedBrokersForLeadership(), excludedBrokersForReplicaMove(), _clusterModelStats.toStringCounts(),
_onDemandBalancednessScoreBefore, _onDemandBalancednessScoreAfter, _provisionStatus);
_onDemandBalancednessScoreBefore, _onDemandBalancednessScoreAfter, _provisionResponse.status(),
recommendation.isEmpty() ? "" : String.format("%nProvision Recommendation: %s.", recommendation));
}

/**
Expand All @@ -290,7 +294,8 @@ public Map<String, Object> getProposalSummaryForJson() {
ret.put(EXCLUDED_BROKERS_FOR_REPLICA_MOVE, excludedBrokersForReplicaMove());
ret.put(ON_DEMAND_BALANCEDNESS_SCORE_BEFORE, _onDemandBalancednessScoreBefore);
ret.put(ON_DEMAND_BALANCEDNESS_SCORE_AFTER, _onDemandBalancednessScoreAfter);
ret.put(PROVISION_STATUS, _provisionStatus);
ret.put(PROVISION_STATUS, _provisionResponse.status());
ret.put(PROVISION_RECOMMENDATION, _provisionResponse.recommendation());
return ret;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright 2021 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
*/

package com.linkedin.kafka.cruisecontrol.analyzer;

import static com.linkedin.cruisecontrol.common.utils.Utils.validateNotNull;


/**
* Indicates the {@link ProvisionStatus} along with the recommended actions regarding the relevant status.
* Recommendations are only relevant to {@link ProvisionStatus#UNDER_PROVISIONED} and {@link ProvisionStatus#OVER_PROVISIONED}.
*/
public class ProvisionResponse {
private ProvisionStatus _status;
private final StringBuilder _recommendation;

/**
* Constructor to be used for provision statuses, for which the recommendations are relevant.
* Recommendation and recommender are expected to be human-readable strings.
*
* @param status The current provision status.
* @param recommendation Recommended action regarding the given provision status.
* @param recommender The source of the recommended action to be used in aggregate recommendation.
*/
public ProvisionResponse(ProvisionStatus status, String recommendation, String recommender) {
this(status);
if (!(status == ProvisionStatus.UNDER_PROVISIONED || status == ProvisionStatus.OVER_PROVISIONED)) {
throw new IllegalArgumentException(String.format("Recommendation is irrelevant for provision status %s.", status));
}
validateNotNull(recommendation, "The recommendation cannot be null.");
validateNotNull(recommender, "The recommender cannot be null.");
_recommendation.append(String.format("[%s] %s", recommender, recommendation));
}

/**
* Constructor to be used for provision statuses, for which the recommendations are irrelevant.
*/
public ProvisionResponse(ProvisionStatus status) {
validateNotNull(status, "The provision status cannot be null.");
_status = status;
_recommendation = new StringBuilder();
}

/**
* @return The current provision status.
*/
public ProvisionStatus status() {
return _status;
}

/**
* @return Recommended actions regarding the current provision status along with the recommender of each action.
*/
public String recommendation() {
return _recommendation.toString();
}

/**
* Aggregate the given provision response to this provision response using the following rules: Aggregating ...
* <ul>
* <li>any provision status with {@link ProvisionStatus#UNDER_PROVISIONED} is {@link ProvisionStatus#UNDER_PROVISIONED}.</li>
* <li>a provision status {@code P} with {@link ProvisionStatus#UNDECIDED} is {@code P}.</li>
* <li>{@link ProvisionStatus#RIGHT_SIZED} with {@link ProvisionStatus#RIGHT_SIZED} or {@link ProvisionStatus#OVER_PROVISIONED}
* is {@link ProvisionStatus#RIGHT_SIZED}.</li>
* <li>{@link ProvisionStatus#OVER_PROVISIONED} with {@link ProvisionStatus#OVER_PROVISIONED} yields itself.</li>
* </ul>
*
* Note that these rules enforce that once a state changes from {@link ProvisionStatus#OVER_PROVISIONED} to another state, it cannot go
* back to this state. Similarly, once a state goes into {@link ProvisionStatus#UNDER_PROVISIONED}, no other followup state is possible.
* Hence, {@link #_recommendation} for over- or under-provisioned status can be updated without losing relevant information.
*
* @param other Provision response to aggregate into this response.
* @return This provision response after the aggregation.
*/
public ProvisionResponse aggregate(ProvisionResponse other) {
if (_status == ProvisionStatus.UNDER_PROVISIONED) {
if (other.status() == ProvisionStatus.UNDER_PROVISIONED) {
aggregateRecommendations(other);
}
} else {
switch (other.status()) {
case UNDER_PROVISIONED:
_status = ProvisionStatus.UNDER_PROVISIONED;
clearRecommendation();
aggregateRecommendations(other);
break;
case RIGHT_SIZED:
_status = ProvisionStatus.RIGHT_SIZED;
clearRecommendation();
break;
case OVER_PROVISIONED:
if (_status == ProvisionStatus.OVER_PROVISIONED || _status == ProvisionStatus.UNDECIDED) {
_status = ProvisionStatus.OVER_PROVISIONED;
aggregateRecommendations(other);
break;
}
// Keep the status as right-sized if it was right-sized before.
break;
case UNDECIDED:
// Nothing to do.
break;
default:
throw new IllegalArgumentException("Unsupported provision status " + other + " is provided.");
}
}
return this;
}

private void clearRecommendation() {
if (_recommendation.length() > 0) {
_recommendation.setLength(0);
}
}

private void aggregateRecommendations(ProvisionResponse other) {
String otherRecommendation = other.recommendation();
if (!otherRecommendation.isEmpty()) {
_recommendation.append(_recommendation.length() == 0 ? "" : " ").append(otherRecommendation);
}
}

@Override
public String toString() {
return String.format("%s%s", _status, _recommendation.length() == 0 ? "" : String.format(" (%s)", recommendation()));
}
}
Loading

0 comments on commit fce5139

Please sign in to comment.