-
Notifications
You must be signed in to change notification settings - Fork 575
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Setup ProvisionerState for rightsizing clusters (#1646)
- Loading branch information
Kaitlyn Paglia
committed
Aug 13, 2021
1 parent
5a50333
commit cf416d4
Showing
20 changed files
with
671 additions
and
10 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
107 changes: 107 additions & 0 deletions
107
cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/ProvisionerState.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
/* | ||
* Copyright 2021 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. | ||
*/ | ||
|
||
package com.linkedin.kafka.cruisecontrol.detector; | ||
|
||
import com.linkedin.cruisecontrol.common.utils.Utils; | ||
import java.util.Arrays; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.Map; | ||
import java.util.Set; | ||
|
||
|
||
/** | ||
* A class to indicate how a provisioning action is handled | ||
*/ | ||
public class ProvisionerState { | ||
private static final Map<State, Set<State>> VALID_TRANSFER = new HashMap<>(); | ||
private State _state; | ||
private String _summary; | ||
private final long _createdMs; | ||
private long _updatedMs; | ||
|
||
static { | ||
VALID_TRANSFER.put(State.IN_PROGRESS, new HashSet<>(Collections.singleton(State.IN_PROGRESS))); | ||
VALID_TRANSFER.put(State.COMPLETED_WITH_ERROR, new HashSet<>(Arrays.asList(State.IN_PROGRESS, State.COMPLETED_WITH_ERROR))); | ||
VALID_TRANSFER.put(State.COMPLETED, new HashSet<>(Arrays.asList(State.IN_PROGRESS, State.COMPLETED_WITH_ERROR, State.COMPLETED))); | ||
} | ||
|
||
public ProvisionerState(State state, String summary) { | ||
_state = state; | ||
_summary = Utils.validateNotNull(summary, "ProvisionerState summary cannot be null."); | ||
_createdMs = System.currentTimeMillis(); | ||
_updatedMs = _createdMs; | ||
} | ||
|
||
/** | ||
* Check if the state transfer is possible. | ||
* @param targetState the state to transfer to. | ||
* @return True if the transfer is valid, false otherwise. | ||
*/ | ||
public boolean canTransferToState(ProvisionerState.State targetState) { | ||
return VALID_TRANSFER.get(_state).contains(targetState); | ||
} | ||
|
||
/** | ||
* @return The state of the provisioning action. | ||
*/ | ||
public State state() { | ||
return _state; | ||
} | ||
|
||
/** | ||
* @return The summary of the provisioning action status. | ||
*/ | ||
public String summary() { | ||
return _summary; | ||
} | ||
|
||
/** | ||
* @return The time the provisioner state was created in milliseconds. | ||
*/ | ||
public long createdMs() { | ||
return _createdMs; | ||
} | ||
|
||
/** | ||
* @return The status update time of the provision state in milliseconds. | ||
*/ | ||
public long updatedMs() { | ||
return _updatedMs; | ||
} | ||
|
||
/** | ||
* Update the state and summary of the provisioning action | ||
* | ||
* @param state The new state of the provisioning action. | ||
* @param summary The new summary of the provisioning action status. | ||
* @throws IllegalArgumentException if the summary is null. | ||
* @throws IllegalStateException if the target state is not a valid target state. | ||
*/ | ||
public void update(State state, String summary) { | ||
if (canTransferToState(state)) { | ||
_state = state; | ||
_summary = Utils.validateNotNull(summary, "ProvisionerState summary cannot be null."); | ||
_updatedMs = System.currentTimeMillis(); | ||
} else { | ||
throw new IllegalStateException("Cannot set the provisioner state from " + _state.toString() + " to " + state.toString() | ||
+ ". The valid target states are " + Collections.unmodifiableSet(VALID_TRANSFER.get(_state))); | ||
} | ||
} | ||
|
||
public enum State { | ||
COMPLETED, COMPLETED_WITH_ERROR, IN_PROGRESS | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return String.format("ProvisionerState:{state: %s, summary: %s, createdMs: %d, updated: %d}", | ||
_state.toString(), | ||
_summary, | ||
_createdMs, | ||
_updatedMs); | ||
} | ||
} |
40 changes: 40 additions & 0 deletions
40
cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/ProvisionerUtils.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
/* | ||
* Copyright 2021 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. | ||
*/ | ||
|
||
package com.linkedin.kafka.cruisecontrol.detector; | ||
|
||
import org.apache.kafka.clients.admin.AdminClient; | ||
import org.apache.kafka.clients.admin.NewTopic; | ||
|
||
import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.maybeIncreasePartitionCount; | ||
|
||
/** | ||
* A util class for provisions. | ||
*/ | ||
public final class ProvisionerUtils { | ||
|
||
private ProvisionerUtils() { | ||
} | ||
|
||
/** | ||
* Determine the status of increasing the partition count | ||
* | ||
* @param adminClient AdminClient to handle partition count increases. | ||
* @param topicToAddPartitions Existing topic to add more partitions if needed | ||
* @return The state {@link ProvisionerState.State#COMPLETED} when true, {@link ProvisionerState.State#COMPLETED_WITH_ERROR} when false | ||
*/ | ||
public static ProvisionerState partitionIncreaseStatus(AdminClient adminClient, NewTopic topicToAddPartitions) { | ||
if (maybeIncreasePartitionCount(adminClient, topicToAddPartitions)) { | ||
String summary = String.format("Provisioning the partition count to increase to %d for the topic %s was successfully completed.", | ||
topicToAddPartitions.numPartitions(), | ||
topicToAddPartitions.name()); | ||
return new ProvisionerState(ProvisionerState.State.COMPLETED, summary); | ||
} else { | ||
String summary = String.format("Provisioning the partition count to increase to %d for the topic %s was rejected.", | ||
topicToAddPartitions.numPartitions(), | ||
topicToAddPartitions.name()); | ||
return new ProvisionerState(ProvisionerState.State.COMPLETED_WITH_ERROR, summary); | ||
} | ||
} | ||
} |
19 changes: 19 additions & 0 deletions
19
cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/RightsizeOptions.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
/* | ||
* Copyright 2021 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. | ||
*/ | ||
|
||
package com.linkedin.kafka.cruisecontrol.detector; | ||
|
||
import java.util.Map; | ||
import org.apache.kafka.common.annotation.InterfaceStability; | ||
|
||
|
||
/** | ||
* A class to indicate options intended to be used during application of a {@link Provisioner#rightsize(Map, RightsizeOptions)}. | ||
*/ | ||
@InterfaceStability.Evolving | ||
public final class RightsizeOptions { | ||
public RightsizeOptions() { | ||
|
||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.