From fe138de5f2099c81ac1a2252cfb18640e1340b3a Mon Sep 17 00:00:00 2001 From: Stephane Geneix <147216312+stephane-airbyte@users.noreply.github.com> Date: Fri, 23 Feb 2024 14:12:28 -0800 Subject: [PATCH] reduce interrupt and shutdown delays to 1 minutes and 2 minutes when stopping a connector (initially set at 60minutes and 70minutes) (#35527) Fixes #32348 discussed here : https://airbytehq-team.slack.com/archives/C02U2SSHP9S/p1708552465201999 --- airbyte-cdk/java/airbyte-cdk/README.md | 4 +- .../integrations/base/IntegrationRunner.java | 40 +++++++++++-------- .../connectors/source-mysql/build.gradle | 2 +- .../connectors/source-mysql/metadata.yaml | 2 +- docs/integrations/sources/mysql.md | 3 +- 5 files changed, 29 insertions(+), 22 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index fee7d0f963742..adf174f5a2567 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -167,7 +167,7 @@ MavenLocal debugging steps: | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| | 0.23.2 | 2024-02-22 | [\#35385](https://github.com/airbytehq/airbyte/pull/35342) | Bugfix: inverted logic of disableTypeDedupe flag | -| 0.23.1 | 2024-02-22 | [\#35527](https://github.com/airbytehq/airbyte/pull/35527) | reduce shutdown timeouts | +| 0.23.1 | 2024-02-22 | [\#35527](https://github.com/airbytehq/airbyte/pull/35527) | reduce shutdow timeouts | | 0.23.0 | 2024-02-22 | [\#35342](https://github.com/airbytehq/airbyte/pull/35342) | Consolidate and perform upfront gathering of DB metadata state | | 0.21.4 | 2024-02-21 | [\#35511](https://github.com/airbytehq/airbyte/pull/35511) | Reduce CDC state compression limit to 1MB | | 0.21.3 | 2024-02-20 | [\#35394](https://github.com/airbytehq/airbyte/pull/35394) | Add Junit progress information to the test logs | @@ -261,4 +261,4 @@ MavenLocal debugging steps: | 0.1.1 | 2023-09-28 | [\#30835](https://github.com/airbytehq/airbyte/pull/30835) | JDBC destinations now avoid staging area name collisions by using the raw table name as the stage name. (previously we used the stream name as the stage name) | | 0.1.0 | 2023-09-27 | [\#30445](https://github.com/airbytehq/airbyte/pull/30445) | First launch, including shared classes for all connectors. | | 0.0.2 | 2023-08-21 | [\#28687](https://github.com/airbytehq/airbyte/pull/28687) | Version bump only (no other changes). | -| 0.0.1 | 2023-08-08 | [\#28687](https://github.com/airbytehq/airbyte/pull/28687) | Initial release for testing. | \ No newline at end of file +| 0.0.1 | 2023-08-08 | [\#28687](https://github.com/airbytehq/airbyte/pull/28687) | Initial release for testing. | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/IntegrationRunner.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/IntegrationRunner.java index 8da1ff8588aa8..8fd71f3875b27 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/IntegrationRunner.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/IntegrationRunner.java @@ -67,8 +67,8 @@ public class IntegrationRunner { static final Predicate ORPHANED_THREAD_FILTER = runningThread -> !runningThread.getName().equals(Thread.currentThread().getName()) && !runningThread.isDaemon() && !TYPE_AND_DEDUPE_THREAD_NAME.equals(runningThread.getName()); - public static final int INTERRUPT_THREAD_DELAY_MINUTES = 60; - public static final int EXIT_THREAD_DELAY_MINUTES = 70; + public static final int INTERRUPT_THREAD_DELAY_MINUTES = 1; + public static final int EXIT_THREAD_DELAY_MINUTES = 2; public static final int FORCED_EXIT_CODE = 2; @@ -189,11 +189,7 @@ private void runInternal(final IntegrationConfig parsed) throws Exception { try (final SerializedAirbyteMessageConsumer consumer = destination.getSerializedMessageConsumer(config, catalog, outputRecordCollector)) { consumeWriteStream(consumer); } finally { - stopOrphanedThreads(EXIT_HOOK, - INTERRUPT_THREAD_DELAY_MINUTES, - TimeUnit.MINUTES, - EXIT_THREAD_DELAY_MINUTES, - TimeUnit.MINUTES); + stopOrphanedThreads(); } } default -> throw new IllegalStateException("Unexpected value: " + parsed.getCommand()); @@ -263,11 +259,7 @@ private void readConcurrent(final JsonNode config, final ConfiguredAirbyteCatalo LOGGER.error("Unable to perform concurrent read.", e); throw e; } finally { - stopOrphanedThreads(EXIT_HOOK, - INTERRUPT_THREAD_DELAY_MINUTES, - TimeUnit.MINUTES, - EXIT_THREAD_DELAY_MINUTES, - TimeUnit.MINUTES); + stopOrphanedThreads(); } } @@ -275,11 +267,7 @@ private void readSerial(final JsonNode config, final ConfiguredAirbyteCatalog ca try (final AutoCloseableIterator messageIterator = source.read(config, catalog, stateOptional.orElse(null))) { produceMessages(messageIterator, outputRecordCollector); } finally { - stopOrphanedThreads(EXIT_HOOK, - INTERRUPT_THREAD_DELAY_MINUTES, - TimeUnit.MINUTES, - EXIT_THREAD_DELAY_MINUTES, - TimeUnit.MINUTES); + stopOrphanedThreads(); } } @@ -335,6 +323,23 @@ static void consumeWriteStream(final SerializedAirbyteMessageConsumer consumer, } } + /** + * Stops any non-daemon threads that could block the JVM from exiting when the main thread is done. + * + * If any active non-daemon threads would be left as orphans, this method will schedule some + * interrupt/exit hooks after giving it some time delay to close up properly. It is generally + * preferred to have a proper closing sequence from children threads instead of interrupting or + * force exiting the process, so this mechanism serve as a fallback while surfacing warnings in logs + * for maintainers to fix the code behavior instead. + */ + static void stopOrphanedThreads() { + stopOrphanedThreads(EXIT_HOOK, + INTERRUPT_THREAD_DELAY_MINUTES, + TimeUnit.MINUTES, + EXIT_THREAD_DELAY_MINUTES, + TimeUnit.MINUTES); + } + /** * Stops any non-daemon threads that could block the JVM from exiting when the main thread is done. *

@@ -343,6 +348,7 @@ static void consumeWriteStream(final SerializedAirbyteMessageConsumer consumer, * preferred to have a proper closing sequence from children threads instead of interrupting or * force exiting the process, so this mechanism serve as a fallback while surfacing warnings in logs * for maintainers to fix the code behavior instead. + *

* * @param exitHook The {@link Runnable} exit hook to execute for any orphaned threads. * @param interruptTimeDelay The time to delay execution of the orphaned thread interrupt attempt. diff --git a/airbyte-integrations/connectors/source-mysql/build.gradle b/airbyte-integrations/connectors/source-mysql/build.gradle index daeccabc9ec13..c0f1e6ef51143 100644 --- a/airbyte-integrations/connectors/source-mysql/build.gradle +++ b/airbyte-integrations/connectors/source-mysql/build.gradle @@ -6,7 +6,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.21.4' + cdkVersionRequired = '0.23.1' features = ['db-sources'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/source-mysql/metadata.yaml b/airbyte-integrations/connectors/source-mysql/metadata.yaml index 761f7233b935c..f80f0c555620c 100644 --- a/airbyte-integrations/connectors/source-mysql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mysql/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad - dockerImageTag: 3.3.10 + dockerImageTag: 3.3.11 dockerRepository: airbyte/source-mysql documentationUrl: https://docs.airbyte.com/integrations/sources/mysql githubIssueLabel: source-mysql diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index d109a13a28474..26aafa4f12df2 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -223,7 +223,8 @@ Any database or table encoding combination of charset and collation is supported | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------| -| 3.3.10 | 2024-02-22 | [35569](https://github.com/airbytehq/airbyte/pull/35569) | Fix logging bug. | +| 3.3.11 | 2024-02-23 | [35527](https://github.com/airbytehq/airbyte/pull/35527) | Adopt 0.23.1 and shutdown timeouts. | +| 3.3.10 | 2024-02-22 | [35569](https://github.com/airbytehq/airbyte/pull/35569) | Fix logging bug. | | 3.3.9 | 2024-02-21 | [35525](https://github.com/airbytehq/airbyte/pull/35338) | Adopt 0.21.4 and reduce cdc state compression threshold to 1MB. | | 3.3.8 | 2024-02-20 | [35338](https://github.com/airbytehq/airbyte/pull/35338) | Add config to throw an error on invalid CDC position. | | 3.3.7 | 2024-02-13 | [35036](https://github.com/airbytehq/airbyte/pull/34751) | Emit analytics message for invalid CDC cursor. |