From fa4d506f755703a4ef6539ccf6af7eadcd6e9d93 Mon Sep 17 00:00:00 2001 From: Suraj Kasi Date: Wed, 27 Oct 2021 17:05:01 -0700 Subject: [PATCH 1/4] Fix restartDeadTask logic when the task thread has died In restartDeadTask we check if a task is dead and attempt to cleanly stop the task before restarting the task again. To check if the task is dead or not, one of the things we check is if the task thread is alive or not. Even though this is a correct check, we don't need to attempt to stop the task in this case as the task thread is already dead. Another fix is in KafkaMirrorMakerConnectorTask to count down the latch in case we fail to acquire the task lock. --- .../kafka/AbstractKafkaConnector.java | 35 +++++++++++++------ .../KafkaMirrorMakerConnectorTask.java | 2 ++ .../kafka/TestAbstractKafkaConnector.java | 7 +++- 3 files changed, 33 insertions(+), 11 deletions(-) diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaConnector.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaConnector.java index 60a32837c..fcc43b430 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaConnector.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaConnector.java @@ -256,19 +256,25 @@ protected void restartDeadTasks() { Set deadDatastreamTasks = new HashSet<>(); _runningTasks.forEach((datastreamTask, connectorTaskEntry) -> { - if (isTaskDead(connectorTaskEntry)) { + if (isConnectorTaskDead(connectorTaskEntry)) { _logger.warn("Detected that the kafka connector task is not running for datastream task {}. Restarting it", datastreamTask.getDatastreamTaskName()); - // If stoppedTask is null it means that attempting to stop the task failed and that it will be retired again - // (in the next restartDeadTasks iteration). - // If we dont successfully stop the task before creating another connector task we can potentially end up with - // two connector tasks instances running in parallel. This is possible because the acquire method acts as a - // re-entrant lock if the same host calls the acquire method for the same task multiple times. - DatastreamTask stoppedTask = stopTask(datastreamTask, connectorTaskEntry); - if (stoppedTask != null) { + if (isTaskThreadDead(connectorTaskEntry)) { + _logger.warn("Task thread for datastream task {} has died. No need to attempt to stop the task", + datastreamTask.getDatastreamTaskName()); deadDatastreamTasks.add(datastreamTask); } else { - _logger.error("Connector task for datastream task {} could not be stopped.", datastreamTask.getDatastreamTaskName()); + // If stoppedTask is null it means that attempting to stop the task failed and that it will be retired again + // (in the next restartDeadTasks iteration). + // If we dont successfully stop the task before creating another connector task we can potentially end up with + // two connector tasks instances running in parallel. This is possible because the acquire method acts as a + // re-entrant lock if the same host calls the acquire method for the same task multiple times. + DatastreamTask stoppedTask = stopTask(datastreamTask, connectorTaskEntry); + if (stoppedTask != null) { + deadDatastreamTasks.add(datastreamTask); + } else { + _logger.error("Connector task for datastream task {} could not be stopped.", datastreamTask.getDatastreamTaskName()); + } } } else { _logger.info("Connector task for datastream task {} is healthy", datastreamTask.getDatastreamTaskName()); @@ -385,12 +391,21 @@ private DatastreamTask stopTask(DatastreamTask datastreamTask, ConnectorTaskEntr return null; } + + protected boolean isTaskThreadDead(ConnectorTaskEntry connectorTaskEntry) { + Thread taskThread = connectorTaskEntry.getThread(); + if (taskThread == null || !taskThread.isAlive()) { + return true; + } + return false; + } + /** * Check if the {@link AbstractKafkaBasedConnectorTask} is dead. * @param connectorTaskEntry connector task and thread that needs to be checked whether it is dead. * @return true if it is dead, false if it is still running. */ - protected boolean isTaskDead(ConnectorTaskEntry connectorTaskEntry) { + protected boolean isConnectorTaskDead(ConnectorTaskEntry connectorTaskEntry) { Thread taskThread = connectorTaskEntry.getThread(); AbstractKafkaBasedConnectorTask connectorTask = connectorTaskEntry.getConnectorTask(); return (connectorTaskEntry.isPendingStop() || taskThread == null || !taskThread.isAlive() diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java index e14dee799..b36340df3 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java @@ -356,6 +356,8 @@ public void run() { _dynamicMetricsManager.createOrUpdateMeter(generateMetricsPrefix(_connectorName, CLASS_NAME), _datastreamName, TASK_LOCK_ACQUIRE_ERROR_RATE, 1); throw ex; + } finally { + _stoppedLatch.countDown(); } } super.run(); diff --git a/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/TestAbstractKafkaConnector.java b/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/TestAbstractKafkaConnector.java index dda9bae13..c7262960b 100644 --- a/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/TestAbstractKafkaConnector.java +++ b/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/TestAbstractKafkaConnector.java @@ -235,7 +235,12 @@ public void initializeDatastream(Datastream stream, List allDatastre } @Override - protected boolean isTaskDead(ConnectorTaskEntry connectorTaskEntry) { + protected boolean isConnectorTaskDead(ConnectorTaskEntry connectorTaskEntry) { + return true; + } + + @Override + protected boolean isTaskThreadDead(ConnectorTaskEntry connectorTaskEntry) { return true; } From a9ab2cc4cf16db32f6587c35604858229383f72e Mon Sep 17 00:00:00 2001 From: Suraj Kasi Date: Thu, 28 Oct 2021 15:03:44 -0700 Subject: [PATCH 2/4] Adding a method for counting down stop latch and removing finally block around task lock acquire --- .../kafka/AbstractKafkaBasedConnectorTask.java | 6 +++++- .../connectors/kafka/AbstractKafkaConnector.java | 9 ++++++--- .../kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java | 3 +-- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java index a0e6797ef..7722206a4 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java @@ -436,11 +436,15 @@ public void run() { } } postShutdownHook(); - _stoppedLatch.countDown(); + countDownLatch(); _logger.info("{} stopped", _taskName); } } + protected void countDownLatch() { + _stoppedLatch.countDown(); + } + /** * Signal task to stop */ diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaConnector.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaConnector.java index fcc43b430..71daaf564 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaConnector.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaConnector.java @@ -391,7 +391,11 @@ private DatastreamTask stopTask(DatastreamTask datastreamTask, ConnectorTaskEntr return null; } - + /** + * Check if the connector task thread is alive or not + * @param connectorTaskEntry connector task checked for whether task thread is dead. + * @return true if it is dead, false if it is still running. + */ protected boolean isTaskThreadDead(ConnectorTaskEntry connectorTaskEntry) { Thread taskThread = connectorTaskEntry.getThread(); if (taskThread == null || !taskThread.isAlive()) { @@ -406,9 +410,8 @@ protected boolean isTaskThreadDead(ConnectorTaskEntry connectorTaskEntry) { * @return true if it is dead, false if it is still running. */ protected boolean isConnectorTaskDead(ConnectorTaskEntry connectorTaskEntry) { - Thread taskThread = connectorTaskEntry.getThread(); AbstractKafkaBasedConnectorTask connectorTask = connectorTaskEntry.getConnectorTask(); - return (connectorTaskEntry.isPendingStop() || taskThread == null || !taskThread.isAlive() + return (connectorTaskEntry.isPendingStop() || isTaskThreadDead(connectorTaskEntry) || ((System.currentTimeMillis() - connectorTask.getLastPolledTimeMillis()) >= _config.getNonGoodStateThresholdMillis())); } diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java index b36340df3..0aa9443c8 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java @@ -352,12 +352,11 @@ public void run() { LOG.info("Trying to acquire the lock on datastreamTask: {}", _datastreamTask); _datastreamTask.acquire(LOCK_ACQUIRE_TIMEOUT); } catch (DatastreamRuntimeException ex) { + countDownLatch(); LOG.error(String.format("Failed to acquire lock for datastreamTask %s", _datastreamTask), ex); _dynamicMetricsManager.createOrUpdateMeter(generateMetricsPrefix(_connectorName, CLASS_NAME), _datastreamName, TASK_LOCK_ACQUIRE_ERROR_RATE, 1); throw ex; - } finally { - _stoppedLatch.countDown(); } } super.run(); From 3d463d7e96c9e7e64355ce288965b9b210a1b5ba Mon Sep 17 00:00:00 2001 From: Suraj Kasi Date: Mon, 1 Nov 2021 09:36:15 -0700 Subject: [PATCH 3/4] Renaming the countDownLatch method and remove calling it from KafkaMirrorMakerConnectorTask --- .../connectors/kafka/AbstractKafkaBasedConnectorTask.java | 4 ++-- .../kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java index 7722206a4..8998fd879 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java @@ -436,12 +436,12 @@ public void run() { } } postShutdownHook(); - countDownLatch(); + releaseTaskLatch(); _logger.info("{} stopped", _taskName); } } - protected void countDownLatch() { + protected void releaseTaskLatch() { _stoppedLatch.countDown(); } diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java index 0aa9443c8..e14dee799 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java @@ -352,7 +352,6 @@ public void run() { LOG.info("Trying to acquire the lock on datastreamTask: {}", _datastreamTask); _datastreamTask.acquire(LOCK_ACQUIRE_TIMEOUT); } catch (DatastreamRuntimeException ex) { - countDownLatch(); LOG.error(String.format("Failed to acquire lock for datastreamTask %s", _datastreamTask), ex); _dynamicMetricsManager.createOrUpdateMeter(generateMetricsPrefix(_connectorName, CLASS_NAME), _datastreamName, TASK_LOCK_ACQUIRE_ERROR_RATE, 1); From a223fe9b95dfa7bb6ed19bbeb82d3d0b6e863d86 Mon Sep 17 00:00:00 2001 From: Suraj Kasi Date: Mon, 1 Nov 2021 10:28:29 -0700 Subject: [PATCH 4/4] Removing function for counting down task latch --- .../connectors/kafka/AbstractKafkaBasedConnectorTask.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java index 8998fd879..a0e6797ef 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java @@ -436,15 +436,11 @@ public void run() { } } postShutdownHook(); - releaseTaskLatch(); + _stoppedLatch.countDown(); _logger.info("{} stopped", _taskName); } } - protected void releaseTaskLatch() { - _stoppedLatch.countDown(); - } - /** * Signal task to stop */