Skip to content

Commit

Permalink
Refactor Stopping Tasks On Assignment Change of Tasks (#868)
Browse files Browse the repository at this point in the history
Co-authored-by: Shrinand Thakkar <sthakkar@sthakkar-mn1.linkedin.biz>
  • Loading branch information
shrinandthakkar and Shrinand Thakkar committed Apr 12, 2022
1 parent b821911 commit 4f806ac
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ public abstract class AbstractKafkaConnector implements Connector, DiagnosticsAw
// multiple concurrent threads. If access is required to both maps then the order of synchronization must be
// _runningTasks followed by _tasksToStop to prevent deadlocks.
private final Map<DatastreamTask, ConnectorTaskEntry> _runningTasks = new HashMap<>();
private final Map<DatastreamTask, ConnectorTaskEntry> _tasksToStop = new HashMap<>();

// _tasksPendingStop contains the tasks that are pending stop across various assignment changes. The periodic health
// check call will attempt to stop these tasks until they are not stopped / are stuck somewhere in stop path.
private final Map<DatastreamTask, ConnectorTaskEntry> _tasksPendingStop = new HashMap<>();

// A daemon executor to constantly check whether all tasks are running and restart them if not.
private final ScheduledExecutorService _daemonThreadExecutorService =
Expand Down Expand Up @@ -154,23 +157,25 @@ public synchronized void onAssignmentChange(List<DatastreamTask> tasks) {
_logger.info("onAssignmentChange called with tasks {}", tasks);

synchronized (_runningTasks) {
Map<DatastreamTask, ConnectorTaskEntry> runningTasksToStop = new HashMap<>();
Set<DatastreamTask> toCancel = new HashSet<>(_runningTasks.keySet());
tasks.forEach(toCancel::remove);

if (toCancel.size() > 0) {
// Mark the connector task as stopped so that, in case stopping the task here fails for any reason in
// restartDeadTasks the task is not restarted
synchronized (_tasksToStop) {
synchronized (_tasksPendingStop) {
toCancel.forEach(task -> {
_tasksToStop.put(task, _runningTasks.get(task));
runningTasksToStop.put(task, _runningTasks.get(task));
_tasksPendingStop.put(task, _runningTasks.get(task));
_runningTasks.remove(task);
});
}
stopUnassignedTasks();
scheduleTasksToStop(runningTasksToStop);
}

boolean toCallRestartDeadTasks = false;
synchronized (_tasksToStop) {
synchronized (_tasksPendingStop) {
for (DatastreamTask task : tasks) {
ConnectorTaskEntry connectorTaskEntry = _runningTasks.get(task);
if (connectorTaskEntry != null) {
Expand All @@ -180,16 +185,17 @@ public synchronized void onAssignmentChange(List<DatastreamTask> tasks) {
// This is necessary because DatastreamTaskImpl.hashCode() does not take into account all the
// fields/properties of the DatastreamTask (e.g. dependencies).
_runningTasks.remove(task);
_runningTasks.put(task, connectorTaskEntry);
} else {
if (_tasksToStop.containsKey(task)) {
// If a pending stop task is reassigned to this host, we'd have to ensure to restart the
// task or replace the connectorTaskEntry for that task in the restartDeadTasks function.
if (_tasksPendingStop.containsKey(task)) {
toCallRestartDeadTasks = true;
connectorTaskEntry = _tasksToStop.remove(task);
connectorTaskEntry = _tasksPendingStop.remove(task);
} else {
connectorTaskEntry = createKafkaConnectorTask(task);
}
_runningTasks.put(task, connectorTaskEntry);
}
_runningTasks.put(task, connectorTaskEntry);
}
}
// If any tasks pending stop were re-assigned to this host we explicitly call restartDeadTasks to ensure
Expand Down Expand Up @@ -293,8 +299,8 @@ protected void restartDeadTasks() {
* Returns the number of tasks yet to be stopped.
*/
int getTasksToStopCount() {
synchronized (_tasksToStop) {
return _tasksToStop.size();
synchronized (_tasksPendingStop) {
return _tasksPendingStop.size();
}
}

Expand All @@ -308,44 +314,50 @@ int getRunningTasksCount() {
}

/**
* Attempt to stop the unassigned tasks.
* Attempt to stop the unassigned tasks from the _tasksToStop map.
*/
private void stopUnassignedTasks() {
synchronized (_tasksToStop) {
if (_tasksToStop.size() == 0) {
_logger.info("No tasks to stop");
return;
}
scheduleTasksToStop(_tasksPendingStop);
}

// Spawn a separate thread to attempt stopping the connectorTask. The connectorTask will be canceled if it
// does not stop within a certain amount of time. This will force cleanup of connectorTasks which take too long
// to stop, or are stuck indefinitely. A separate thread is spawned to handle this because the Coordinator
// requires that this step completely because we call this from onAssignmentChange() (assignment thread gets
// killed if it takes too long) and restartDeadTasks which must complete quickly.
List<Future<DatastreamTask>> stopTaskFutures = _tasksToStop.keySet().stream()
.map(task -> asyncStopTask(task, _tasksToStop.get(task)))
.collect(Collectors.toList());
/**
* Attempt to stop the unassigned tasks from the argument map.
*/
private void scheduleTasksToStop(Map<DatastreamTask, ConnectorTaskEntry> tasks) {
if (tasks.size() == 0) {
_logger.info("No tasks to stop");
return;
}

_shutdownExecutorService.submit(() -> {
List<DatastreamTask> toRemoveTasks = stopTaskFutures.stream().map(stopTaskFuture -> {
try {
return stopTaskFuture.get(CANCEL_TASK_TIMEOUT.plus(POST_CANCEL_TASK_TIMEOUT).toMillis(), TimeUnit.MILLISECONDS);
} catch (ExecutionException | InterruptedException | TimeoutException e) {
_logger.warn("Stop task future failed with exception", e);
}
return null;
}).filter(Objects::nonNull).collect(Collectors.toList());

if (toRemoveTasks.size() > 0) {
synchronized (_tasksToStop) {
// Its possible that while stopping the task was pending there was another onAssignmentChange event
// which reassigned the task back to this host and the task was moved back to _runningTasks. In this
// case the remove operation here will be a no-op.
toRemoveTasks.forEach(_tasksToStop::remove);
}
// Spawn a separate thread to attempt stopping the connectorTask. The connectorTask will be canceled if it
// does not stop within a certain amount of time. This will force cleanup of connectorTasks which take too long
// to stop, or are stuck indefinitely. A separate thread is spawned to handle this because the Coordinator
// requires that this step completely because we call this from onAssignmentChange() (assignment thread gets
// killed if it takes too long) and restartDeadTasks which must complete quickly.
List<Future<DatastreamTask>> stopTaskFutures = tasks.keySet().stream()
.map(task -> asyncStopTask(task, tasks.get(task)))
.collect(Collectors.toList());

_shutdownExecutorService.submit(() -> {
List<DatastreamTask> toRemoveTasks = stopTaskFutures.stream().map(stopTaskFuture -> {
try {
return stopTaskFuture.get(CANCEL_TASK_TIMEOUT.plus(POST_CANCEL_TASK_TIMEOUT).toMillis(),
TimeUnit.MILLISECONDS);
} catch (ExecutionException | InterruptedException | TimeoutException e) {
_logger.warn("Stop task future failed with exception", e);
}
});
}
return null;
}).filter(Objects::nonNull).collect(Collectors.toList());

if (toRemoveTasks.size() > 0) {
synchronized (_tasksPendingStop) {
// Its possible that while stopping the task was pending there was another onAssignmentChange event
// which reassigned the task back to this host and the task was moved back to _runningTasks. In this
// case the remove operation here will be a no-op.
toRemoveTasks.forEach(_tasksPendingStop::remove);
}
}
});
}

@NotNull
Expand Down Expand Up @@ -420,10 +432,10 @@ public void stop() {
_runningTasks.forEach(this::asyncStopTask);
_runningTasks.clear();
}
synchronized (_tasksToStop) {
synchronized (_tasksPendingStop) {
// Try to stop the tasks
_tasksToStop.forEach(this::asyncStopTask);
_tasksToStop.clear();
_tasksPendingStop.forEach(this::asyncStopTask);
_tasksPendingStop.clear();
}
_logger.info("Start to shut down the shutdown executor and wait up to {} ms.",
SHUTDOWN_EXECUTOR_SHUTDOWN_TIMEOUT.toMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import org.slf4j.Logger;
Expand Down Expand Up @@ -138,6 +142,51 @@ public void testOnAssignmentChangeStopTaskFailure() {
connector.stop();
}

@Test
public void testOnAssignmentChangeMultipleReassignments() throws InterruptedException {
Properties props = new Properties();
// Reduce time interval between calls to restartDeadTasks to force invocation of stopTasks
props.setProperty("daemonThreadIntervalInSeconds", "2");
// With failStopTaskOnce set to true the AbstractKafkaBasedConnectorTask.stop is configured
// to fail the first time with InterruptedException and pass the second time.
TestKafkaConnector connector = new TestKafkaConnector(false, props, true);

// first task assignment assigns task 1
List<DatastreamTask> firstTaskAssignment = getTaskListInRange(1, 2);
connector.onAssignmentChange(firstTaskAssignment);
connector.start(null);
Assert.assertEquals(connector.getRunningTasksCount(), 1);

// second task assignment assigns task 2,3,4,5 and takes out task 1
List<DatastreamTask> secondTaskAssignment = getTaskListInRange(2, 6);

// during the assignment, the _taskToStop map count need to be less than 1, as only task 1 would be taken out.
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.execute(() -> connector.onAssignmentChange(secondTaskAssignment));
executor.execute(() -> Assert.assertTrue(connector.getTasksToStopCount() <= 1));

awaitForExecution(executor, 50L);
Assert.assertTrue(connector.getTasksToStopCount() >= 1); // the count of the _taskToStopTracker
Assert.assertEquals(connector.getRunningTasksCount(), 4);

// second task assignment keeps task 5, assigns task 6,7,8 and takes out task 2,3,4
List<DatastreamTask> thirdTaskAssignment = getTaskListInRange(5, 9);

// during the assignment, the _taskToStop map count need to be less than 4, as task 2,3,4 would be taken out and task 1 if not already stopped.
executor = Executors.newFixedThreadPool(2);
executor.execute(() -> connector.onAssignmentChange(thirdTaskAssignment));
executor.execute(() -> Assert.assertTrue(connector.getTasksToStopCount() <= 4));

awaitForExecution(executor, 50L);
Assert.assertTrue(connector.getTasksToStopCount() >= 3); // the count of the _taskToStopTracker

// Wait for restartDeadTasks to be called to attempt another stopTasks call
PollUtils.poll(() -> connector.getCreateTaskCalled() >= 3, Duration.ofSeconds(1).toMillis(),
Duration.ofSeconds(10).toMillis());
Assert.assertEquals(connector.getRunningTasksCount(), 4);
connector.stop();
}

@Test
public void testCalculateThreadStartDelay() {
Properties props = new Properties();
Expand Down Expand Up @@ -191,6 +240,26 @@ public void testRestartThrowsException() {
connector.stop();
}

// helper method to generate the tasks in a range for assignment
private List<DatastreamTask> getTaskListInRange(int start, int end) {
List<DatastreamTask> taskAssignmentList = new ArrayList<>();
IntStream.range(start, end).forEach(index -> {
DatastreamTaskImpl dt = new DatastreamTaskImpl();
dt.setTaskPrefix("testtask" + index);
taskAssignmentList.add(dt);
});
return taskAssignmentList;
}

// helper method to await on the executor for the given timeout period
private void awaitForExecution(ExecutorService executor, Long timeUnitMs) throws InterruptedException {
try {
executor.awaitTermination(timeUnitMs, TimeUnit.MILLISECONDS);
} finally {
executor.shutdownNow();
}
}

/**
* Dummy implementation of {@link AbstractKafkaConnector} for testing purposes
*/
Expand Down

0 comments on commit 4f806ac

Please sign in to comment.