Skip to content

Commit

Permalink
KAFKA-10199: Change to RUNNING if no pending task to recycle exist (a…
Browse files Browse the repository at this point in the history
…pache#14145)

A stream thread should only change to RUNNING if there are no
active tasks in restoration in the state updater and if there
are no pending tasks to recycle.

There are situations in which a stream thread might only have
standby tasks that are recycled to active task after a rebalance.
In such situations, the stream thread might be faster in checking
active tasks in restoration then the state updater removing the
standby task to recycle from the state updater. If that happens
the stream thread changes to RUNNING although it should wait until
the standby tasks are recycled to active tasks and restored.

Reviewers: Walker Carlson <wcarlson@confluent.io>, Matthias J. Sax <matthias@confluent.io>
  • Loading branch information
cadonna committed Aug 4, 2023
1 parent e0b7499 commit 7782741
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ public boolean checkStateUpdater(final long now,
if (stateUpdater.restoresActiveTasks()) {
handleRestoredTasksFromStateUpdater(now, offsetResetter);
}
return !stateUpdater.restoresActiveTasks();
return !stateUpdater.restoresActiveTasks() && !tasks.hasPendingTasksToRecycle();
}

private void recycleTaskFromStateUpdater(final Task task,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ public void addPendingTaskToRecycle(final TaskId taskId, final Set<TopicPartitio
pendingUpdateActions.put(taskId, PendingUpdateAction.createRecycleTask(inputPartitions));
}

@Override
public boolean hasPendingTasksToRecycle() {
return pendingUpdateActions.values().stream().anyMatch(action -> action.getAction() == Action.RECYCLE);
}

@Override
public Set<TopicPartition> removePendingTaskToUpdateInputPartitions(final TaskId taskId) {
if (containsTaskIdWithAction(taskId, Action.UPDATE_INPUT_PARTITIONS)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public interface TasksRegistry {

Set<TopicPartition> removePendingTaskToRecycle(final TaskId taskId);

boolean hasPendingTasksToRecycle();

void addPendingTaskToRecycle(final TaskId taskId, final Set<TopicPartition> inputPartitions);

Set<TopicPartition> removePendingTaskToUpdateInputPartitions(final TaskId taskId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,7 @@ public void shouldSuspendRevokedTaskRemovedFromStateUpdater() {
when(tasks.removePendingActiveTaskToSuspend(statefulTask.id())).thenReturn(true);
when(stateUpdater.hasRemovedTasks()).thenReturn(true);
when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(statefulTask));
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
replay(consumer);

taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
Expand All @@ -894,6 +894,7 @@ public void shouldSuspendRevokedTaskRemovedFromStateUpdater() {
Mockito.verify(statefulTask).suspend();
Mockito.verify(tasks).addTask(statefulTask);
}

@Test
public void shouldHandleMultipleRemovedTasksFromStateUpdater() {
final StreamTask taskToRecycle0 = statefulTask(taskId00, taskId00ChangelogPartitions)
Expand Down Expand Up @@ -947,6 +948,35 @@ public void shouldHandleMultipleRemovedTasksFromStateUpdater() {
Mockito.verify(stateUpdater).add(taskToUpdateInputPartitions);
}

@Test
public void shouldReturnFalseFromCheckStateUpdaterIfActiveTasksAreRestoring() {
when(stateUpdater.restoresActiveTasks()).thenReturn(true);
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);

assertFalse(taskManager.checkStateUpdater(time.milliseconds(), noOpResetter));
}

@Test
public void shouldReturnFalseFromCheckStateUpdaterIfActiveTasksAreNotRestoringButPendingTasksToRecycle() {
when(stateUpdater.restoresActiveTasks()).thenReturn(false);
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.hasPendingTasksToRecycle()).thenReturn(true);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);

assertFalse(taskManager.checkStateUpdater(time.milliseconds(), noOpResetter));
}

@Test
public void shouldReturnTrueFromCheckStateUpdaterIfActiveTasksAreNotRestoringAndNoPendingTasksToRecycle() {
when(stateUpdater.restoresActiveTasks()).thenReturn(false);
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.hasPendingTasksToRecycle()).thenReturn(false);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);

assertTrue(taskManager.checkStateUpdater(time.milliseconds(), noOpResetter));
}

@Test
public void shouldAddActiveTaskWithRevokedInputPartitionsInStateUpdaterToPendingTasksToSuspend() {
final StreamTask task = statefulTask(taskId00, taskId00ChangelogPartitions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ public class TasksTest {
private final static TopicPartition TOPIC_PARTITION_B_1 = new TopicPartition("topicB", 1);
private final static TaskId TASK_0_0 = new TaskId(0, 0);
private final static TaskId TASK_0_1 = new TaskId(0, 1);
private final static TaskId TASK_0_2 = new TaskId(0, 2);
private final static TaskId TASK_1_0 = new TaskId(1, 0);
private final static TaskId TASK_1_1 = new TaskId(1, 1);
private final static TaskId TASK_1_2 = new TaskId(1, 2);

private final Tasks tasks = new Tasks(new LogContext());

Expand Down Expand Up @@ -122,6 +125,28 @@ public void shouldAddAndRemovePendingTaskToRecycle() {
assertNull(tasks.removePendingTaskToRecycle(TASK_0_0));
}

@Test
public void shouldVerifyIfPendingTaskToRecycleExist() {
assertFalse(tasks.hasPendingTasksToRecycle());
tasks.addPendingTaskToRecycle(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
assertTrue(tasks.hasPendingTasksToRecycle());

tasks.addPendingTaskToRecycle(TASK_1_0, mkSet(TOPIC_PARTITION_A_1));
assertTrue(tasks.hasPendingTasksToRecycle());

tasks.addPendingTaskToCloseClean(TASK_0_1);
tasks.addPendingTaskToCloseDirty(TASK_0_2);
tasks.addPendingTaskToUpdateInputPartitions(TASK_1_1, mkSet(TOPIC_PARTITION_B_0));
tasks.addPendingActiveTaskToSuspend(TASK_1_2);
assertTrue(tasks.hasPendingTasksToRecycle());

tasks.removePendingTaskToRecycle(TASK_0_0);
assertTrue(tasks.hasPendingTasksToRecycle());

tasks.removePendingTaskToRecycle(TASK_1_0);
assertFalse(tasks.hasPendingTasksToRecycle());
}

@Test
public void shouldAddAndRemovePendingTaskToUpdateInputPartitions() {
final Set<TopicPartition> expectedInputPartitions = mkSet(TOPIC_PARTITION_A_0);
Expand Down

0 comments on commit 7782741

Please sign in to comment.