Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Kafka 2.6 brokers #1311

Merged
merged 3 commits into from
Aug 12, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

tomncooper marked this conversation as resolved.
Show resolved Hide resolved
import com.yammer.metrics.core.MetricsRegistry;
import kafka.log.LogConfig;
import kafka.metrics.KafkaYammerMetrics;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
Expand Down Expand Up @@ -332,10 +335,30 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) {
});
}

/**
* Starting with Kafka 2.6.0, a new class KafkaYammerMetrics provides the default Metrics Registry. The old default
tomncooper marked this conversation as resolved.
Show resolved Hide resolved
* registry does not work with 2.6+. Therfore if the new class exists, we use it and if it doesn't exist we use the
tomncooper marked this conversation as resolved.
Show resolved Hide resolved
* old one. More details can be found here: https://github.com/apache/kafka/blob/2.6.0/core/src/main/java/kafka/metrics/KafkaYammerMetrics.java
*
* Once CC supports only 2.6.0 and newer, we can clean this up and use only KafkaYammerMetrics all the time.
tomncooper marked this conversation as resolved.
Show resolved Hide resolved
*
* @return MetricsRegistry with Kafka metrics
*/
private MetricsRegistry metricsRegistry() {
try {
Class.forName("kafka.metrics.KafkaYammerMetrics");
tomncooper marked this conversation as resolved.
Show resolved Hide resolved
LOG.info("KafkaYammerMetrics found and will be used.");
return KafkaYammerMetrics.defaultRegistry();
} catch (ClassNotFoundException e) {
LOG.info("KafkaYammerMetrics not found. Metrics will be used.");
return Metrics.defaultRegistry();
}
}

private void reportYammerMetrics(long now) throws Exception {
LOG.debug("Reporting yammer metrics.");
YammerMetricProcessor.Context context = new YammerMetricProcessor.Context(this, now, _brokerId, _reportingIntervalMs);
for (Map.Entry<com.yammer.metrics.core.MetricName, Metric> entry : Metrics.defaultRegistry().allMetrics().entrySet()) {
for (Map.Entry<com.yammer.metrics.core.MetricName, Metric> entry : metricsRegistry().allMetrics().entrySet()) {
LOG.trace("Processing yammer metric {}, scope = {}", entry.getKey(), entry.getKey().getScope());
entry.getValue().processWith(_yammerMetricProcessor, entry.getKey(), context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@
import java.util.List;
import java.util.Map;
import java.util.Set;

tomncooper marked this conversation as resolved.
Show resolved Hide resolved
import com.linkedin.kafka.cruisecontrol.servlet.response.JsonResponseField;
import com.linkedin.kafka.cruisecontrol.servlet.response.JsonResponseClass;

import static com.linkedin.kafka.cruisecontrol.executor.ExecutionTask.State.*;


/**
* A class that wraps the execution information of a balancing proposal
*
Expand Down Expand Up @@ -52,25 +50,26 @@ public class ExecutionTask implements Comparable<ExecutionTask> {
private static final String PROPOSAL = "proposal";
@JsonResponseField
private static final String BROKER_ID = "brokerId";
private static final Map<State, Set<State>> VALID_TRANSFER = new HashMap<>();
private static final Map<ExecutionTaskState, Set<ExecutionTaskState>> VALID_TRANSFER = new HashMap<>();
private final TaskType _type;
private final long _executionId;
private final ExecutionProposal _proposal;
// _brokerId is only relevant for intra-broker replica action, otherwise it will be -1.
private final int _brokerId;
private State _state;
private ExecutionTaskState _state;
private long _startTimeMs;
private long _endTimeMs;
private long _alertTimeMs;
private boolean _slowExecutionReported;

static {
VALID_TRANSFER.put(PENDING, new HashSet<>(Collections.singleton(IN_PROGRESS)));
VALID_TRANSFER.put(IN_PROGRESS, new HashSet<>(Arrays.asList(ABORTING, DEAD, COMPLETED)));
VALID_TRANSFER.put(ABORTING, new HashSet<>(Arrays.asList(ABORTED, DEAD)));
VALID_TRANSFER.put(COMPLETED, Collections.emptySet());
VALID_TRANSFER.put(DEAD, Collections.emptySet());
VALID_TRANSFER.put(ABORTED, Collections.emptySet());
VALID_TRANSFER.put(ExecutionTaskState.PENDING, new HashSet<>(Collections.singleton(ExecutionTaskState.IN_PROGRESS)));
VALID_TRANSFER.put(ExecutionTaskState.IN_PROGRESS,
new HashSet<>(Arrays.asList(ExecutionTaskState.ABORTING, ExecutionTaskState.DEAD, ExecutionTaskState.COMPLETED)));
VALID_TRANSFER.put(ExecutionTaskState.ABORTING, new HashSet<>(Arrays.asList(ExecutionTaskState.ABORTED, ExecutionTaskState.DEAD)));
VALID_TRANSFER.put(ExecutionTaskState.COMPLETED, Collections.emptySet());
VALID_TRANSFER.put(ExecutionTaskState.DEAD, Collections.emptySet());
VALID_TRANSFER.put(ExecutionTaskState.ABORTED, Collections.emptySet());
}

/**
Expand All @@ -94,7 +93,7 @@ public ExecutionTask(long executionId, ExecutionProposal proposal, Integer broke
_executionId = executionId;
_proposal = proposal;
_brokerId = brokerId == null ? -1 : brokerId;
_state = State.PENDING;
_state = ExecutionTaskState.PENDING;
_type = type;
_startTimeMs = -1L;
_endTimeMs = -1L;
Expand All @@ -111,14 +110,14 @@ public ExecutionTask(long executionId, ExecutionProposal proposal, TaskType type
* @param targetState the state to transfer to.
* @return True if the transfer is valid, false otherwise.
*/
public boolean canTransferToState(State targetState) {
public boolean canTransferToState(ExecutionTaskState targetState) {
return VALID_TRANSFER.get(_state).contains(targetState);
}

/**
* @return The valid target state to transfer to.
*/
public Set<State> validTargetState() {
public Set<ExecutionTaskState> validTargetState() {
return Collections.unmodifiableSet(VALID_TRANSFER.get(_state));
}

Expand Down Expand Up @@ -146,7 +145,7 @@ public TaskType type() {
/**
* @return The state of the task.
*/
public State state() {
public ExecutionTaskState state() {
return this._state;
}

Expand Down Expand Up @@ -177,8 +176,8 @@ public int brokerId() {
* @param now Current system time.
*/
public void inProgress(long now) {
ensureValidTransfer(IN_PROGRESS);
this._state = IN_PROGRESS;
ensureValidTransfer(ExecutionTaskState.IN_PROGRESS);
this._state = ExecutionTaskState.IN_PROGRESS;
_startTimeMs = now;
_alertTimeMs += now;
}
Expand All @@ -189,17 +188,17 @@ public void inProgress(long now) {
* @param now Current system time.
*/
public void kill(long now) {
ensureValidTransfer(DEAD);
this._state = DEAD;
ensureValidTransfer(ExecutionTaskState.DEAD);
this._state = ExecutionTaskState.DEAD;
_endTimeMs = now;
}

/**
* Abort the task.
*/
public void abort() {
ensureValidTransfer(ABORTING);
this._state = ABORTING;
ensureValidTransfer(ExecutionTaskState.ABORTING);
this._state = ExecutionTaskState.ABORTING;
}

/**
Expand All @@ -208,8 +207,8 @@ public void abort() {
* @param now Current system time.
*/
public void aborted(long now) {
ensureValidTransfer(ABORTED);
this._state = ABORTED;
ensureValidTransfer(ExecutionTaskState.ABORTED);
this._state = ExecutionTaskState.ABORTED;
_endTimeMs = now;
}

Expand All @@ -219,8 +218,8 @@ public void aborted(long now) {
* @param now Current system time.
*/
public void completed(long now) {
ensureValidTransfer(COMPLETED);
this._state = COMPLETED;
ensureValidTransfer(ExecutionTaskState.COMPLETED);
this._state = ExecutionTaskState.COMPLETED;
_endTimeMs = now;
}

Expand All @@ -234,7 +233,7 @@ public void completed(long now) {
* @param tasksToReport A list of tasks for which a slow execution alert will be sent out.
*/
public void maybeReportExecutionTooSlow(long now, List<ExecutionTask> tasksToReport) {
if (!_slowExecutionReported && (_state == IN_PROGRESS || _state == ABORTING) && now > _alertTimeMs) {
if (!_slowExecutionReported && (_state == ExecutionTaskState.IN_PROGRESS || _state == ExecutionTaskState.ABORTING) && now > _alertTimeMs) {
tasksToReport.add(this);
// Mute the task to prevent sending the same alert repeatedly.
_slowExecutionReported = true;
Expand Down Expand Up @@ -266,7 +265,7 @@ public Map<String, Object> getJsonStructure() {
return executionStatsMap;
}

private void ensureValidTransfer(State targetState) {
private void ensureValidTransfer(ExecutionTaskState targetState) {
if (!canTransferToState(targetState)) {
throw new IllegalStateException("Cannot mark a task in " + _state + " to" + targetState + "state. The "
+ "valid target state are " + validTargetState());
Expand All @@ -287,20 +286,6 @@ public static List<TaskType> cachedValues() {
}
}

public enum State {
PENDING, IN_PROGRESS, ABORTING, ABORTED, DEAD, COMPLETED;

private static final List<State> CACHED_VALUES = Collections.unmodifiableList(Arrays.asList(values()));

/**
* Use this instead of values() because values() creates a new array each time.
* @return enumerated values in the same order as values()
*/
public static List<State> cachedValues() {
return CACHED_VALUES;
}
}

@Override
public String toString() {
switch (_type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.slf4j.LoggerFactory;

import static com.linkedin.kafka.cruisecontrol.executor.ExecutionTask.TaskType;
import static com.linkedin.kafka.cruisecontrol.executor.ExecutionTask.State;
import static com.linkedin.kafka.cruisecontrol.executor.ExecutionTaskTracker.ExecutionTasksSummary;
/**
* The class that helps track the execution status for the balancing.
Expand Down Expand Up @@ -261,7 +260,7 @@ public synchronized void setExecutionModeForTaskTracker(boolean isKafkaAssignerM
*/
public synchronized void markTasksInProgress(List<ExecutionTask> tasks) {
for (ExecutionTask task : tasks) {
_executionTaskTracker.markTaskState(task, State.IN_PROGRESS);
_executionTaskTracker.markTaskState(task, ExecutionTaskState.IN_PROGRESS);
switch (task.type()) {
case INTER_BROKER_REPLICA_ACTION:
_inProgressPartitionsForInterBrokerMovement.add(task.proposal().topicPartition());
Expand All @@ -287,11 +286,11 @@ public synchronized void markTasksInProgress(List<ExecutionTask> tasks) {
* Aborting execution will yield Aborted completion.
*/
public synchronized void markTaskDone(ExecutionTask task) {
if (task.state() == State.IN_PROGRESS) {
_executionTaskTracker.markTaskState(task, State.COMPLETED);
if (task.state() == ExecutionTaskState.IN_PROGRESS) {
_executionTaskTracker.markTaskState(task, ExecutionTaskState.COMPLETED);
completeTask(task);
} else if (task.state() == State.ABORTING) {
_executionTaskTracker.markTaskState(task, State.ABORTED);
} else if (task.state() == ExecutionTaskState.ABORTING) {
_executionTaskTracker.markTaskState(task, ExecutionTaskState.ABORTED);
completeTask(task);
}
}
Expand All @@ -300,17 +299,17 @@ public synchronized void markTaskDone(ExecutionTask task) {
* Mark an in-progress task as aborting (1) if an error is encountered and (2) the rollback is possible.
*/
public synchronized void markTaskAborting(ExecutionTask task) {
if (task.state() == State.IN_PROGRESS) {
_executionTaskTracker.markTaskState(task, State.ABORTING);
if (task.state() == ExecutionTaskState.IN_PROGRESS) {
_executionTaskTracker.markTaskState(task, ExecutionTaskState.ABORTING);
}
}

/**
* Mark an in-progress task as aborting (1) if an error is encountered and (2) the rollback is not possible.
*/
public synchronized void markTaskDead(ExecutionTask task) {
if (task.state() != State.DEAD) {
_executionTaskTracker.markTaskState(task, State.DEAD);
if (task.state() != ExecutionTaskState.DEAD) {
_executionTaskTracker.markTaskState(task, ExecutionTaskState.DEAD);
completeTask(task);
}
}
Expand Down Expand Up @@ -369,15 +368,15 @@ public synchronized long finishedInterBrokerDataMovementInMB() {
}

/**
* @return The tasks that are {@link State#IN_PROGRESS} or {@link State#ABORTING} for all task types.
* @return The tasks that are {@link ExecutionTaskState#IN_PROGRESS} or {@link ExecutionTaskState#ABORTING} for all task types.
*/
public synchronized Set<ExecutionTask> inExecutionTasks() {
return inExecutionTasks(TaskType.cachedValues());
}

/**
* @param types Task type.
* @return The tasks that are {@link State#IN_PROGRESS} or {@link State#ABORTING} for the given task type.
* @return The tasks that are {@link ExecutionTaskState#IN_PROGRESS} or {@link ExecutionTaskState#ABORTING} for the given task type.
*/
public synchronized Set<ExecutionTask> inExecutionTasks(Collection<TaskType> types) {
return _executionTaskTracker.inExecutionTasks(types);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2017 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
tomncooper marked this conversation as resolved.
Show resolved Hide resolved
*/
package com.linkedin.kafka.cruisecontrol.executor;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

public enum ExecutionTaskState {
PENDING, IN_PROGRESS, ABORTING, ABORTED, DEAD, COMPLETED;

private static final List<ExecutionTaskState> CACHED_VALUES = Collections.unmodifiableList(Arrays.asList(values()));

/**
* Use this instead of values() because values() creates a new array each time.
* @return enumerated values in the same order as values()
*/
public static List<ExecutionTaskState> cachedValues() {
return CACHED_VALUES;
}
}
Loading