Skip to content

KAFKA-18193 Refactor Kafka Streams CloseOptions to Fluent API Style #19955

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.streams.CloseOptions;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.CloseOptions;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
Expand Down Expand Up @@ -159,7 +159,7 @@ public void testCloseOptions() throws Exception {
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);

streams.close(new CloseOptions().leaveGroup(true).timeout(Duration.ofSeconds(30)));
streams.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP).withTimeout(Duration.ofSeconds(30)));
waitForEmptyConsumerGroup(adminClient, streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG), 0);
}

Expand Down
98 changes: 98 additions & 0 deletions streams/src/main/java/org/apache/kafka/streams/CloseOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams;

import java.time.Duration;
import java.util.Objects;
import java.util.Optional;

public class CloseOptions {
/**
* Enum to specify the group membership operation upon leaving group.
*
* <ul>
* <li><b>{@code LEAVE_GROUP}</b>: means the consumer leave the group.</li>
* <li><b>{@code REMAIN_IN_GROUP}</b>: means the consumer will remain in the group.</li>
* </ul>
*/
public enum GroupMembershipOperation {
LEAVE_GROUP,
REMAIN_IN_GROUP
}

/**
* Specifies the group membership operation upon shutdown.
* By default, {@code GroupMembershipOperation.REMAIN_IN_GROUP} will be applied, which follows the KafkaStreams default behavior.
*/
protected GroupMembershipOperation operation = GroupMembershipOperation.REMAIN_IN_GROUP;

/**
* Specifies the maximum amount of time to wait for the close process to complete.
* This allows users to define a custom timeout for gracefully stopping the KafkaStreams.
*/
protected Optional<Duration> timeout = Optional.of(Duration.ofMillis(Long.MAX_VALUE));

private CloseOptions() {
}

protected CloseOptions(final CloseOptions closeOptions) {
this.operation = closeOptions.operation;
this.timeout = closeOptions.timeout;
}

/**
* Static method to create a {@code CloseOptions} with a custom timeout.
*
* @param timeout the maximum time to wait for the KafkaStreams to close.
* @return a new {@code CloseOptions} instance with the specified timeout.
*/
public static CloseOptions timeout(final Duration timeout) {
return new CloseOptions().withTimeout(timeout);
}

/**
* Static method to create a {@code CloseOptions} with a specified group membership operation.
*
* @param operation the group membership operation to apply. Must be one of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP}.
* @return a new {@code CloseOptions} instance with the specified group membership operation.
*/
public static CloseOptions groupMembershipOperation(final GroupMembershipOperation operation) {
return new CloseOptions().withGroupMembershipOperation(operation);
}

/**
* Fluent method to set the timeout for the close process.
*
* @param timeout the maximum time to wait for the KafkaStreams to close. If {@code null}, the default timeout will be used.
* @return this {@code CloseOptions} instance.
*/
public CloseOptions withTimeout(final Duration timeout) {
this.timeout = Optional.ofNullable(timeout);
return this;
}

/**
* Fluent method to set the group membership operation upon shutdown.
*
* @param operation the group membership operation to apply. Must be one of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP}.
* @return this {@code CloseOptions} instance.
*/
public CloseOptions withGroupMembershipOperation(final GroupMembershipOperation operation) {
this.operation = Objects.requireNonNull(operation, "operation should not be null");
return this;
}
}
59 changes: 45 additions & 14 deletions streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.internals.ClientInstanceIdsImpl;
import org.apache.kafka.streams.internals.CloseOptionsInternal;
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
import org.apache.kafka.streams.internals.metrics.StreamsClientMetricsDelegatingReporter;
import org.apache.kafka.streams.processor.StandbyUpdateListener;
Expand Down Expand Up @@ -236,7 +237,7 @@ public class KafkaStreams implements AutoCloseable {
* </li>
* <li>
* REBALANCING state will transit to RUNNING if all of its threads are in RUNNING state
* (Note: a thread transits to RUNNING state, if all active tasks got restored are are ready for processing.
* (Note: a thread transits to RUNNING state, if all active tasks got restored are ready for processing.
* Standby tasks are not considered.)
* </li>
* <li>
Expand Down Expand Up @@ -768,7 +769,7 @@ private void throwOnFatalException(final Exception fatalUserException,

@Override
public void onUpdateStart(final TopicPartition topicPartition,
final String storeName,
final String storeName,
final long startingOffset) {
if (userStandbyListener != null) {
try {
Expand Down Expand Up @@ -981,7 +982,7 @@ private KafkaStreams(final TopologyMetadata topologyMetadata,

// use client id instead of thread client id since this admin client may be shared among threads
adminClient = clientSupplier.getAdmin(applicationConfigs.getAdminConfigs(ClientUtils.adminClientId(clientId)));

metrics = createMetrics(applicationConfigs, time, clientId);
final StreamsClientMetricsDelegatingReporter reporter = new StreamsClientMetricsDelegatingReporter(adminClient, clientId);
metrics.addReporter(reporter);
Expand Down Expand Up @@ -1227,7 +1228,7 @@ private Optional<String> removeStreamThread(final long timeoutMs) throws Timeout
if (groupInstanceID.isPresent() && callingThreadIsNotCurrentStreamThread) {
final MemberToRemove memberToRemove = new MemberToRemove(groupInstanceID.get());
final Collection<MemberToRemove> membersToRemove = Collections.singletonList(memberToRemove);
final RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroupResult =
final RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroupResult =
adminClient.removeMembersFromConsumerGroup(
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG),
new RemoveMembersFromConsumerGroupOptions(membersToRemove)
Expand Down Expand Up @@ -1461,15 +1462,18 @@ public synchronized void start() throws IllegalStateException, StreamsException
/**
* Class that handles options passed in case of {@code KafkaStreams} instance scale down
*/
@Deprecated(since = "4.2")
public static class CloseOptions {
private Duration timeout = Duration.ofMillis(Long.MAX_VALUE);
private boolean leaveGroup = false;

@Deprecated(since = "4.2")
public CloseOptions timeout(final Duration timeout) {
this.timeout = timeout;
return this;
}

@Deprecated(since = "4.2")
public CloseOptions leaveGroup(final boolean leaveGroup) {
this.leaveGroup = leaveGroup;
return this;
Expand All @@ -1481,10 +1485,14 @@ public CloseOptions leaveGroup(final boolean leaveGroup) {
* This will block until all threads have stopped.
*/
public void close() {
close(Optional.empty(), false);
close(Optional.empty(), org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
}

private Thread shutdownHelper(final boolean error, final long timeoutMs, final boolean leaveGroup) {
private Thread shutdownHelper(
final boolean error,
final long timeoutMs,
final org.apache.kafka.streams.CloseOptions.GroupMembershipOperation operation
) {
stateDirCleaner.shutdownNow();
if (rocksDBMetricsRecordingService != null) {
rocksDBMetricsRecordingService.shutdownNow();
Expand Down Expand Up @@ -1516,7 +1524,7 @@ private Thread shutdownHelper(final boolean error, final long timeoutMs, final b
}
});

if (leaveGroup) {
if (operation == org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP) {
processStreamThread(streamThreadLeaveConsumerGroup(timeoutMs));
}

Expand Down Expand Up @@ -1560,7 +1568,7 @@ private Thread shutdownHelper(final boolean error, final long timeoutMs, final b
}, clientId + "-CloseThread");
}

private boolean close(final Optional<Long> timeout, final boolean leaveGroup) {
private boolean close(final Optional<Long> timeout, final org.apache.kafka.streams.CloseOptions.GroupMembershipOperation operation) {
final long timeoutMs;
if (timeout.isPresent()) {
timeoutMs = timeout.get();
Expand Down Expand Up @@ -1591,7 +1599,7 @@ private boolean close(final Optional<Long> timeout, final boolean leaveGroup) {
+ "PENDING_SHUTDOWN, PENDING_ERROR, ERROR, or NOT_RUNNING");
}

final Thread shutdownThread = shutdownHelper(false, timeoutMs, leaveGroup);
final Thread shutdownThread = shutdownHelper(false, timeoutMs, operation);

shutdownThread.setDaemon(true);
shutdownThread.start();
Expand All @@ -1609,7 +1617,8 @@ private void closeToError() {
if (!setState(State.PENDING_ERROR)) {
log.info("Skipping shutdown since we are already in {}", state());
} else {
final Thread shutdownThread = shutdownHelper(true, -1, false);
final Thread shutdownThread = shutdownHelper(true, -1,
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);

shutdownThread.setDaemon(true);
shutdownThread.start();
Expand All @@ -1635,28 +1644,50 @@ public synchronized boolean close(final Duration timeout) throws IllegalArgument
throw new IllegalArgumentException("Timeout can't be negative.");
}

return close(Optional.of(timeoutMs), false);
return close(Optional.of(timeoutMs), org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
}

/**
* Shutdown this {@code KafkaStreams} by signaling all the threads to stop, and then wait up to the timeout for the
* threads to join.
* This method is deprecated and replaced by {@link #close(org.apache.kafka.streams.CloseOptions)}.
* @param options contains timeout to specify how long to wait for the threads to shut down, and a flag leaveGroup to
* trigger consumer leave call
* @return {@code true} if all threads were successfully stopped&mdash;{@code false} if the timeout was reached
* before all threads stopped
* Note that this method must not be called in the {@link StateListener#onChange(KafkaStreams.State, KafkaStreams.State)} callback of {@link StateListener}.
* @throws IllegalArgumentException if {@code timeout} can't be represented as {@code long milliseconds}
*/
@Deprecated(since = "4.2")
public synchronized boolean close(final CloseOptions options) throws IllegalArgumentException {
final org.apache.kafka.streams.CloseOptions closeOptions = org.apache.kafka.streams.CloseOptions.timeout(options.timeout)
.withGroupMembershipOperation(options.leaveGroup ?
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP :
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
return close(closeOptions);
}

/**
* Shutdown this {@code KafkaStreams} by signaling all the threads to stop, and then wait up to the timeout for the
* threads to join.
* @param options contains timeout to specify how long to wait for the threads to shut down, and a flag
* {@link org.apache.kafka.streams.CloseOptions.GroupMembershipOperation#LEAVE_GROUP} to
* trigger consumer leave call
* @return {@code true} if all threads were successfully stopped&mdash;{@code false} if the timeout was reached
* before all threads stopped
* Note that this method must not be called in the {@link StateListener#onChange(KafkaStreams.State, KafkaStreams.State)} callback of {@link StateListener}.
* @throws IllegalArgumentException if {@code timeout} can't be represented as {@code long milliseconds}
*/
public synchronized boolean close(final org.apache.kafka.streams.CloseOptions options) throws IllegalArgumentException {
Objects.requireNonNull(options, "options cannot be null");
final String msgPrefix = prepareMillisCheckFailMsgPrefix(options.timeout, "timeout");
final long timeoutMs = validateMillisecondDuration(options.timeout, msgPrefix);
final CloseOptionsInternal optionsInternal = new CloseOptionsInternal(options);
final String msgPrefix = prepareMillisCheckFailMsgPrefix(optionsInternal.timeout(), "timeout");
final long timeoutMs = validateMillisecondDuration(optionsInternal.timeout().get(), msgPrefix);
if (timeoutMs < 0) {
throw new IllegalArgumentException("Timeout can't be negative.");
}

return close(Optional.of(timeoutMs), options.leaveGroup);
return close(Optional.of(timeoutMs), optionsInternal.operation());
}

private Consumer<StreamThread> streamThreadLeaveConsumerGroup(final long remainingTimeMs) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.internals;

import org.apache.kafka.streams.CloseOptions;

import java.time.Duration;
import java.util.Optional;

public class CloseOptionsInternal extends CloseOptions {

public CloseOptionsInternal(final CloseOptions options) {
super(options);
}

public GroupMembershipOperation operation() {
return operation;
}

public Optional<Duration> timeout() {
return timeout;
}
}
Loading