diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaConnector.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaConnector.java index 60a32837c..71daaf564 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaConnector.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaConnector.java @@ -256,19 +256,25 @@ protected void restartDeadTasks() { Set deadDatastreamTasks = new HashSet<>(); _runningTasks.forEach((datastreamTask, connectorTaskEntry) -> { - if (isTaskDead(connectorTaskEntry)) { + if (isConnectorTaskDead(connectorTaskEntry)) { _logger.warn("Detected that the kafka connector task is not running for datastream task {}. Restarting it", datastreamTask.getDatastreamTaskName()); - // If stoppedTask is null it means that attempting to stop the task failed and that it will be retired again - // (in the next restartDeadTasks iteration). - // If we dont successfully stop the task before creating another connector task we can potentially end up with - // two connector tasks instances running in parallel. This is possible because the acquire method acts as a - // re-entrant lock if the same host calls the acquire method for the same task multiple times. - DatastreamTask stoppedTask = stopTask(datastreamTask, connectorTaskEntry); - if (stoppedTask != null) { + if (isTaskThreadDead(connectorTaskEntry)) { + _logger.warn("Task thread for datastream task {} has died. No need to attempt to stop the task", + datastreamTask.getDatastreamTaskName()); deadDatastreamTasks.add(datastreamTask); } else { - _logger.error("Connector task for datastream task {} could not be stopped.", datastreamTask.getDatastreamTaskName()); + // If stoppedTask is null it means that attempting to stop the task failed and that it will be retired again + // (in the next restartDeadTasks iteration). + // If we dont successfully stop the task before creating another connector task we can potentially end up with + // two connector tasks instances running in parallel. This is possible because the acquire method acts as a + // re-entrant lock if the same host calls the acquire method for the same task multiple times. + DatastreamTask stoppedTask = stopTask(datastreamTask, connectorTaskEntry); + if (stoppedTask != null) { + deadDatastreamTasks.add(datastreamTask); + } else { + _logger.error("Connector task for datastream task {} could not be stopped.", datastreamTask.getDatastreamTaskName()); + } } } else { _logger.info("Connector task for datastream task {} is healthy", datastreamTask.getDatastreamTaskName()); @@ -385,15 +391,27 @@ private DatastreamTask stopTask(DatastreamTask datastreamTask, ConnectorTaskEntr return null; } + /** + * Check if the connector task thread is alive or not + * @param connectorTaskEntry connector task checked for whether task thread is dead. + * @return true if it is dead, false if it is still running. + */ + protected boolean isTaskThreadDead(ConnectorTaskEntry connectorTaskEntry) { + Thread taskThread = connectorTaskEntry.getThread(); + if (taskThread == null || !taskThread.isAlive()) { + return true; + } + return false; + } + /** * Check if the {@link AbstractKafkaBasedConnectorTask} is dead. * @param connectorTaskEntry connector task and thread that needs to be checked whether it is dead. * @return true if it is dead, false if it is still running. */ - protected boolean isTaskDead(ConnectorTaskEntry connectorTaskEntry) { - Thread taskThread = connectorTaskEntry.getThread(); + protected boolean isConnectorTaskDead(ConnectorTaskEntry connectorTaskEntry) { AbstractKafkaBasedConnectorTask connectorTask = connectorTaskEntry.getConnectorTask(); - return (connectorTaskEntry.isPendingStop() || taskThread == null || !taskThread.isAlive() + return (connectorTaskEntry.isPendingStop() || isTaskThreadDead(connectorTaskEntry) || ((System.currentTimeMillis() - connectorTask.getLastPolledTimeMillis()) >= _config.getNonGoodStateThresholdMillis())); } diff --git a/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/TestAbstractKafkaConnector.java b/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/TestAbstractKafkaConnector.java index dda9bae13..c7262960b 100644 --- a/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/TestAbstractKafkaConnector.java +++ b/datastream-kafka-connector/src/test/java/com/linkedin/datastream/connectors/kafka/TestAbstractKafkaConnector.java @@ -235,7 +235,12 @@ public void initializeDatastream(Datastream stream, List allDatastre } @Override - protected boolean isTaskDead(ConnectorTaskEntry connectorTaskEntry) { + protected boolean isConnectorTaskDead(ConnectorTaskEntry connectorTaskEntry) { + return true; + } + + @Override + protected boolean isTaskThreadDead(ConnectorTaskEntry connectorTaskEntry) { return true; }