-
Notifications
You must be signed in to change notification settings - Fork 585
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable Partition metric collection during ongoing execution to another k… #1538
Enable Partition metric collection during ongoing execution to another k… #1538
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! Left some comments.
Overall direction seems reasonable (i.e. having an additional sample store to keep partition metric samples during the ongoing execution). But there are issues around clarity and actual implementation logic -- unless I am missing something, the KafkaWriteOnlySampleStore.java
does not seem to be doing what it is supposed to do (i.e. store samples given to it in a new topic).
Given that this is a draft PR. I suspect the implementation of that class is in progress?
...e-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/MonitorConfig.java
Outdated
Show resolved
Hide resolved
...e-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/MonitorConfig.java
Outdated
Show resolved
Hide resolved
...e-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/MonitorConfig.java
Outdated
Show resolved
Hide resolved
...e-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/MetricSampler.java
Show resolved
Hide resolved
...c/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/KafkaWriteOnlySampleStore.java
Outdated
Show resolved
Hide resolved
...c/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/KafkaWriteOnlySampleStore.java
Outdated
Show resolved
Hide resolved
...c/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/KafkaWriteOnlySampleStore.java
Outdated
Show resolved
Hide resolved
...c/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/KafkaWriteOnlySampleStore.java
Outdated
Show resolved
Hide resolved
...c/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/KafkaWriteOnlySampleStore.java
Outdated
Show resolved
Hide resolved
...c/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/KafkaWriteOnlySampleStore.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates!
Left some comments.
...e-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/MonitorConfig.java
Outdated
Show resolved
Hide resolved
...din/kafka/cruisecontrol/monitor/sampling/KafkaPartitionMetricSampleDuringExecutionStore.java
Outdated
Show resolved
Hide resolved
...din/kafka/cruisecontrol/monitor/sampling/KafkaPartitionMetricSampleDuringExecutionStore.java
Outdated
Show resolved
Hide resolved
...din/kafka/cruisecontrol/monitor/sampling/KafkaPartitionMetricSampleDuringExecutionStore.java
Outdated
Show resolved
Hide resolved
...din/kafka/cruisecontrol/monitor/sampling/KafkaPartitionMetricSampleDuringExecutionStore.java
Outdated
Show resolved
Hide resolved
...din/kafka/cruisecontrol/monitor/sampling/KafkaPartitionMetricSampleDuringExecutionStore.java
Outdated
Show resolved
Hide resolved
short numberOfBrokersInCluster; | ||
try { | ||
numberOfBrokersInCluster = (short) adminClient.describeCluster().nodes().get(CLIENT_REQUEST_TIMEOUT_MS, | ||
TimeUnit.MILLISECONDS).size(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bad indentation -- please import the project style file in IntelliJ that was shared in an earlier feedback. After the import, if you cut and paste the code changes, it will update the indentations to use the expected format.
...din/kafka/cruisecontrol/monitor/sampling/KafkaPartitionMetricSampleDuringExecutionStore.java
Outdated
Show resolved
Hide resolved
...din/kafka/cruisecontrol/monitor/sampling/KafkaPartitionMetricSampleDuringExecutionStore.java
Outdated
Show resolved
Hide resolved
...e-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/MetricFetcher.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates -- left more comments.
protected static final Duration CONSUMER_CLOSE_TIMEOUT = Duration.ofSeconds(10); | ||
// Keep additional windows in case some of the windows do not have enough samples. | ||
protected static final int ADDITIONAL_WINDOW_TO_RETAIN_FACTOR = 2; | ||
protected static final ConsumerRecords<byte[], byte[]> SHUTDOWN_RECORDS = new ConsumerRecords<>(Collections.emptyMap()); | ||
protected static final Duration SAMPLE_POLL_TIMEOUT = Duration.ofMillis(1000L); | ||
|
||
protected static final int DEFAULT_NUM_SAMPLE_LOADING_THREADS = 8; | ||
protected static final short DEFAULT_SAMPLE_STORE_TOPIC_REPLICATION_FACTOR = 2; | ||
protected static final int DEFAULT_PARTITION_SAMPLE_STORE_TOPIC_PARTITION_COUNT = 32; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't protected static final int DEFAULT_PARTITION_SAMPLE_STORE_TOPIC_PARTITION_COUNT = 32;
be removed from this class? -- i.e. it is in the abstract class already.
protected static final Duration PRODUCER_CLOSE_TIMEOUT = Duration.ofMinutes(3); | ||
protected static final short DEFAULT_SAMPLE_STORE_TOPIC_REPLICATION_FACTOR = 2; | ||
protected static final int DEFAULT_PARTITION_SAMPLE_STORE_TOPIC_PARTITION_COUNT = 32; | ||
protected static final long DEFAULT_PARTITION_SAMPLE_STORE_TOPIC_RETENTION_TIME_MS = TimeUnit.HOURS.toMillis(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems only relevant to KafkaPartitionMetricSampleOngoingExecutionStore
. Should we move it to that class?
} | ||
|
||
@SuppressWarnings("unchecked") | ||
protected void ensureTopicsCreated(Map<String, ?> config) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this function create 1 topic? Should we name it ensureTopicCreated
?
...in/kafka/cruisecontrol/monitor/sampling/KafkaPartitionMetricSampleOngoingExecutionStore.java
Outdated
Show resolved
Hide resolved
...in/kafka/cruisecontrol/monitor/sampling/KafkaPartitionMetricSampleOngoingExecutionStore.java
Outdated
Show resolved
Hide resolved
...rc/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/ReadOnlyKafkaSampleStore.java
Show resolved
Hide resolved
...e-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/MonitorConfig.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for all the updates!
Left 2 more nits otherwise LGTM.
...rc/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/ReadOnlyKafkaSampleStore.java
Outdated
Show resolved
Hide resolved
...inkedin/kafka/cruisecontrol/monitor/sampling/KafkaPartitionMetricSampleOnExecutionStore.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM -- thanks for the PR!
This PR resolves #1497.