Skip to content

Commit

Permalink
KAFKA-15022: [8/N] more tests for HAAssignor (apache#14164)
Browse files Browse the repository at this point in the history
Part of KIP-925.

- Add more tests for HighAvailabilityTaskAssignor
- Remove null and optional check for RackAwareTaskAssignor
- Pass rack aware assignor configs to getMainConsumerConfigs so that they can be picked up in rebalance protocol
- Change STATELESS_NON_OVERLAP_COST to 0. It was a mistake to be 1. Stateless tasks should be moved without this cost.
- Update of existing tests

Reviewers: Matthias J. Sax <matthias@confluent.io>
  • Loading branch information
lihaosky authored and rreddy-22 committed Sep 20, 2023
1 parent 0473851 commit 23e9686
Show file tree
Hide file tree
Showing 16 changed files with 820 additions and 227 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1583,7 +1583,10 @@ public Map<String, Object> getMainConsumerConfigs(final String groupId, final St
consumerProps.put(PROBING_REBALANCE_INTERVAL_MS_CONFIG, getLong(PROBING_REBALANCE_INTERVAL_MS_CONFIG));
consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamsPartitionAssignor.class.getName());
consumerProps.put(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
consumerProps.put(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG, getInt(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG));
consumerProps.put(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, getString(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG));
consumerProps.put(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, getList(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG));
consumerProps.put(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, getInt(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG));

// disable auto topic creation
consumerProps.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ private boolean assignTasksToClients(final Cluster fullMetadata,
final boolean probingRebalanceNeeded = taskAssignor.assign(clientStates,
allTasks,
statefulTasks,
Optional.of(rackAwareTaskAssignor),
rackAwareTaskAssignor,
assignmentConfigs);

log.info("{} assigned tasks {} including stateful {} to {} clients as: \n{}.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;

import java.util.Optional;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;

Expand All @@ -43,7 +42,7 @@ public FallbackPriorTaskAssignor() {
public boolean assign(final Map<UUID, ClientState> clients,
final Set<TaskId> allTaskIds,
final Set<TaskId> statefulTaskIds,
final Optional<RackAwareTaskAssignor> rackAwareTaskAssignor,
final RackAwareTaskAssignor rackAwareTaskAssignor,
final AssignmentConfigs configs) {
delegate.assign(clients, allTaskIds, statefulTaskIds, rackAwareTaskAssignor, configs);
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;

import java.util.Optional;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
Expand Down Expand Up @@ -47,13 +46,13 @@ public class HighAvailabilityTaskAssignor implements TaskAssignor {
private static final int DEFAULT_STATEFUL_TRAFFIC_COST = 10;
private static final int DEFAULT_STATEFUL_NON_OVERLAP_COST = 1;
private static final int STATELESS_TRAFFIC_COST = 1;
private static final int STATELESS_NON_OVERLAP_COST = 1;
private static final int STATELESS_NON_OVERLAP_COST = 0;

@Override
public boolean assign(final Map<UUID, ClientState> clients,
final Set<TaskId> allTaskIds,
final Set<TaskId> statefulTaskIds,
final Optional<RackAwareTaskAssignor> rackAwareTaskAssignor,
final RackAwareTaskAssignor rackAwareTaskAssignor,
final AssignmentConfigs configs) {
final SortedSet<TaskId> statefulTasks = new TreeSet<>(statefulTaskIds);
final TreeMap<UUID, ClientState> clientStates = new TreeMap<>(clients);
Expand Down Expand Up @@ -116,7 +115,7 @@ public boolean assign(final Map<UUID, ClientState> clients,

private static void assignActiveStatefulTasks(final SortedMap<UUID, ClientState> clientStates,
final SortedSet<TaskId> statefulTasks,
final Optional<RackAwareTaskAssignor> rackAwareTaskAssignor,
final RackAwareTaskAssignor rackAwareTaskAssignor,
final AssignmentConfigs configs) {
Iterator<ClientState> clientStateIterator = null;
for (final TaskId task : statefulTasks) {
Expand All @@ -134,19 +133,19 @@ private static void assignActiveStatefulTasks(final SortedMap<UUID, ClientState>
(source, destination) -> true
);

if (rackAwareTaskAssignor != null && rackAwareTaskAssignor.isPresent() && rackAwareTaskAssignor.get().canEnableRackAwareAssignor()) {
if (rackAwareTaskAssignor.canEnableRackAwareAssignor()) {
final int trafficCost = configs.rackAwareAssignmentTrafficCost == null ?
DEFAULT_STATEFUL_TRAFFIC_COST : configs.rackAwareAssignmentTrafficCost;
final int nonOverlapCost = configs.rackAwareAssignmentNonOverlapCost == null ?
DEFAULT_STATEFUL_NON_OVERLAP_COST : configs.rackAwareAssignmentNonOverlapCost;
rackAwareTaskAssignor.get().optimizeActiveTasks(statefulTasks, clientStates, trafficCost, nonOverlapCost);
rackAwareTaskAssignor.optimizeActiveTasks(statefulTasks, clientStates, trafficCost, nonOverlapCost);
}
}

private void assignStandbyReplicaTasks(final TreeMap<UUID, ClientState> clientStates,
final Set<TaskId> allTaskIds,
final Set<TaskId> statefulTasks,
final Optional<RackAwareTaskAssignor> rackAwareTaskAssignor,
final RackAwareTaskAssignor rackAwareTaskAssignor,
final AssignmentConfigs configs) {
if (configs.numStandbyReplicas == 0) {
return;
Expand All @@ -164,12 +163,12 @@ private void assignStandbyReplicaTasks(final TreeMap<UUID, ClientState> clientSt
standbyTaskAssignor::isAllowedTaskMovement
);

if (rackAwareTaskAssignor != null && rackAwareTaskAssignor.isPresent() && rackAwareTaskAssignor.get().canEnableRackAwareAssignor()) {
if (rackAwareTaskAssignor.canEnableRackAwareAssignor()) {
final int trafficCost = configs.rackAwareAssignmentTrafficCost == null ?
DEFAULT_STATEFUL_TRAFFIC_COST : configs.rackAwareAssignmentTrafficCost;
final int nonOverlapCost = configs.rackAwareAssignmentNonOverlapCost == null ?
DEFAULT_STATEFUL_NON_OVERLAP_COST : configs.rackAwareAssignmentNonOverlapCost;
rackAwareTaskAssignor.get().optimizeStandbyTasks(clientStates, trafficCost, nonOverlapCost, standbyTaskAssignor::isAllowedTaskMovement);
rackAwareTaskAssignor.optimizeStandbyTasks(clientStates, trafficCost, nonOverlapCost, standbyTaskAssignor::isAllowedTaskMovement);
}
}

Expand Down Expand Up @@ -235,7 +234,7 @@ private static boolean shouldMoveATask(final ClientState sourceClientState,

private static void assignStatelessActiveTasks(final TreeMap<UUID, ClientState> clientStates,
final Iterable<TaskId> statelessTasks,
final Optional<RackAwareTaskAssignor> rackAwareTaskAssignor) {
final RackAwareTaskAssignor rackAwareTaskAssignor) {
final ConstrainedPrioritySet statelessActiveTaskClientsByTaskLoad = new ConstrainedPrioritySet(
(client, task) -> true,
client -> clientStates.get(client).activeTaskLoad()
Expand All @@ -251,8 +250,8 @@ private static void assignStatelessActiveTasks(final TreeMap<UUID, ClientState>
statelessActiveTaskClientsByTaskLoad.offer(client);
}

if (rackAwareTaskAssignor != null && rackAwareTaskAssignor.isPresent() && rackAwareTaskAssignor.get().canEnableRackAwareAssignor()) {
rackAwareTaskAssignor.get().optimizeActiveTasks(sortedTasks, clientStates,
if (rackAwareTaskAssignor.canEnableRackAwareAssignor()) {
rackAwareTaskAssignor.optimizeActiveTasks(sortedTasks, clientStates,
STATELESS_TRAFFIC_COST, STATELESS_NON_OVERLAP_COST);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.kafka.streams.processor.internals.assignment;

import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import org.apache.kafka.streams.processor.TaskId;
Expand Down Expand Up @@ -46,7 +45,7 @@ default boolean isAllowedTaskMovement(final ClientState source,
default boolean assign(final Map<UUID, ClientState> clients,
final Set<TaskId> allTaskIds,
final Set<TaskId> statefulTaskIds,
final Optional<RackAwareTaskAssignor> rackAwareTaskAssignor,
final RackAwareTaskAssignor rackAwareTaskAssignor,
final AssignmentConfigs configs) {
return assign(clients, allTaskIds, statefulTaskIds, configs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;

import java.util.Optional;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import org.slf4j.Logger;
Expand Down Expand Up @@ -58,7 +57,7 @@ public StickyTaskAssignor() {
public boolean assign(final Map<UUID, ClientState> clients,
final Set<TaskId> allTaskIds,
final Set<TaskId> statefulTaskIds,
final Optional<RackAwareTaskAssignor> rackAwareTaskAssignor,
final RackAwareTaskAssignor rackAwareTaskAssignor,
final AssignmentConfigs configs) {
this.clients = clients;
this.allTaskIds = allTaskIds;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;

import java.util.Optional;
import org.apache.kafka.streams.processor.TaskId;

import java.util.Map;
Expand All @@ -31,6 +30,6 @@ public interface TaskAssignor {
boolean assign(final Map<UUID, ClientState> clients,
final Set<TaskId> allTaskIds,
final Set<TaskId> statefulTaskIds,
final Optional<RackAwareTaskAssignor> rackAwareTaskAssignor,
final RackAwareTaskAssignor rackAwareTaskAssignor,
final AssignmentConfigs configs);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package org.apache.kafka.streams.integration;

import java.util.stream.Stream;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand All @@ -36,6 +39,7 @@
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentListener;
import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
import org.apache.kafka.streams.state.KeyValueStore;
Expand All @@ -45,7 +49,6 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;

Expand All @@ -61,7 +64,11 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
Expand All @@ -74,7 +81,24 @@
@Timeout(600)
@Tag("integration")
public class HighAvailabilityTaskAssignorIntegrationTest {
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3,
new Properties(),
asList(
new Properties() {{
setProperty(KafkaConfig.RackProp(), AssignmentTestUtils.RACK_0);
}},
new Properties() {{
setProperty(KafkaConfig.RackProp(), AssignmentTestUtils.RACK_1);
}},
new Properties() {{
setProperty(KafkaConfig.RackProp(), AssignmentTestUtils.RACK_2);
}}
)
);

public static Stream<Arguments> data() {
return Stream.of(Arguments.of(true), Arguments.of(false));
}

@BeforeAll
public static void startCluster() throws IOException {
Expand All @@ -86,22 +110,25 @@ public static void closeCluster() {
CLUSTER.stop();
}

@Test
public void shouldScaleOutWithWarmupTasksAndInMemoryStores(final TestInfo testInfo) throws InterruptedException {
@ParameterizedTest
@MethodSource("data")
public void shouldScaleOutWithWarmupTasksAndInMemoryStores(final boolean enableRackAwareAssignor, final TestInfo testInfo) throws InterruptedException {
// NB: this test takes at least a minute to run, because it needs a probing rebalance, and the minimum
// value is one minute
shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.inMemoryKeyValueStore(storeName)), testInfo);
shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.inMemoryKeyValueStore(storeName)), testInfo, enableRackAwareAssignor);
}

@Test
public void shouldScaleOutWithWarmupTasksAndPersistentStores(final TestInfo testInfo) throws InterruptedException {
@ParameterizedTest
@MethodSource("data")
public void shouldScaleOutWithWarmupTasksAndPersistentStores(final boolean enableRackAwareAssignor, final TestInfo testInfo) throws InterruptedException {
// NB: this test takes at least a minute to run, because it needs a probing rebalance, and the minimum
// value is one minute
shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.persistentKeyValueStore(storeName)), testInfo);
shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.persistentKeyValueStore(storeName)), testInfo, enableRackAwareAssignor);
}

private void shouldScaleOutWithWarmupTasks(final Function<String, Materialized<Object, Object, KeyValueStore<Bytes, byte[]>>> materializedFunction,
final TestInfo testInfo) throws InterruptedException {
final TestInfo testInfo,
final boolean enableRackAwareAssignor) throws InterruptedException {
final String testId = safeUniqueTestName(getClass(), testInfo);
final String appId = "appId_" + System.currentTimeMillis() + "_" + testId;
final String inputTopic = "input" + testId;
Expand All @@ -117,7 +144,7 @@ private void shouldScaleOutWithWarmupTasks(final Function<String, Materialized<O
new TopicPartition(storeChangelog, 1)
);

IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, 2, inputTopic, storeChangelog);
IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, 2, 2, inputTopic, storeChangelog);

final ReentrantLock assignmentLock = new ReentrantLock();
final AtomicInteger assignmentsCompleted = new AtomicInteger(0);
Expand All @@ -143,8 +170,8 @@ private void shouldScaleOutWithWarmupTasks(final Function<String, Materialized<O

produceTestData(inputTopic, numberOfRecords);

try (final KafkaStreams kafkaStreams0 = new KafkaStreams(topology, streamsProperties(appId, assignmentListener));
final KafkaStreams kafkaStreams1 = new KafkaStreams(topology, streamsProperties(appId, assignmentListener));
try (final KafkaStreams kafkaStreams0 = new KafkaStreams(topology, streamsProperties(appId, assignmentListener, enableRackAwareAssignor, AssignmentTestUtils.RACK_0));
final KafkaStreams kafkaStreams1 = new KafkaStreams(topology, streamsProperties(appId, assignmentListener, enableRackAwareAssignor, AssignmentTestUtils.RACK_1));
final Consumer<String, String> consumer = new KafkaConsumer<>(getConsumerProperties())) {
kafkaStreams0.start();

Expand Down Expand Up @@ -284,7 +311,10 @@ private static void assertFalseNoRetry(final boolean assertion, final String mes
}

private static Properties streamsProperties(final String appId,
final AssignmentListener configuredAssignmentListener) {
final AssignmentListener configuredAssignmentListener,
final boolean enableRackAwareAssignor,
final String rack) {
final String rackAwareStrategy = enableRackAwareAssignor ? StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC : StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE;
return mkObjectProperties(
mkMap(
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
Expand All @@ -300,7 +330,9 @@ private static Properties streamsProperties(final String appId,
// Increasing the number of threads to ensure that a rebalance happens each time a consumer sends a rejoin (KAFKA-10455)
mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 40),
mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()),
mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName())
mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()),
mkEntry(CommonClientConfigs.CLIENT_RACK_CONFIG, rack),
mkEntry(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, rackAwareStrategy)
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
@Rule
public Timeout globalTimeout = Timeout.seconds(600);

public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties(), 0L, 0L);
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties(), Collections.emptyList(), 0L, 0L);

@BeforeClass
public static void startCluster() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public class TaskMetadataIntegrationTest {
@Rule
public Timeout globalTimeout = Timeout.seconds(600);

public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties(), 0L, 0L);
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties(), Collections.emptyList(), 0L, 0L);

@BeforeClass
public static void startCluster() throws IOException {
Expand Down
Loading

0 comments on commit 23e9686

Please sign in to comment.