Skip to content

Commit

Permalink
reduce interrupt and shutdown delays to 1 minutes and 2 minutes when …
Browse files Browse the repository at this point in the history
…stopping a connector (initially set at 60minutes and 70minutes) (airbytehq#35527)

Fixes airbytehq#32348 
discussed here : https://airbytehq-team.slack.com/archives/C02U2SSHP9S/p1708552465201999
  • Loading branch information
stephane-airbyte authored and jatinyadav-cc committed Feb 26, 2024
1 parent da903d1 commit fe138de
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 22 deletions.
4 changes: 2 additions & 2 deletions airbyte-cdk/java/airbyte-cdk/README.md
Expand Up @@ -167,7 +167,7 @@ MavenLocal debugging steps:
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.23.2 | 2024-02-22 | [\#35385](https://github.com/airbytehq/airbyte/pull/35342) | Bugfix: inverted logic of disableTypeDedupe flag |
| 0.23.1 | 2024-02-22 | [\#35527](https://github.com/airbytehq/airbyte/pull/35527) | reduce shutdown timeouts |
| 0.23.1 | 2024-02-22 | [\#35527](https://github.com/airbytehq/airbyte/pull/35527) | reduce shutdow timeouts |
| 0.23.0 | 2024-02-22 | [\#35342](https://github.com/airbytehq/airbyte/pull/35342) | Consolidate and perform upfront gathering of DB metadata state |
| 0.21.4 | 2024-02-21 | [\#35511](https://github.com/airbytehq/airbyte/pull/35511) | Reduce CDC state compression limit to 1MB |
| 0.21.3 | 2024-02-20 | [\#35394](https://github.com/airbytehq/airbyte/pull/35394) | Add Junit progress information to the test logs |
Expand Down Expand Up @@ -261,4 +261,4 @@ MavenLocal debugging steps:
| 0.1.1 | 2023-09-28 | [\#30835](https://github.com/airbytehq/airbyte/pull/30835) | JDBC destinations now avoid staging area name collisions by using the raw table name as the stage name. (previously we used the stream name as the stage name) |
| 0.1.0 | 2023-09-27 | [\#30445](https://github.com/airbytehq/airbyte/pull/30445) | First launch, including shared classes for all connectors. |
| 0.0.2 | 2023-08-21 | [\#28687](https://github.com/airbytehq/airbyte/pull/28687) | Version bump only (no other changes). |
| 0.0.1 | 2023-08-08 | [\#28687](https://github.com/airbytehq/airbyte/pull/28687) | Initial release for testing. |
| 0.0.1 | 2023-08-08 | [\#28687](https://github.com/airbytehq/airbyte/pull/28687) | Initial release for testing. |
Expand Up @@ -67,8 +67,8 @@ public class IntegrationRunner {
static final Predicate<Thread> ORPHANED_THREAD_FILTER = runningThread -> !runningThread.getName().equals(Thread.currentThread().getName())
&& !runningThread.isDaemon() && !TYPE_AND_DEDUPE_THREAD_NAME.equals(runningThread.getName());

public static final int INTERRUPT_THREAD_DELAY_MINUTES = 60;
public static final int EXIT_THREAD_DELAY_MINUTES = 70;
public static final int INTERRUPT_THREAD_DELAY_MINUTES = 1;
public static final int EXIT_THREAD_DELAY_MINUTES = 2;

public static final int FORCED_EXIT_CODE = 2;

Expand Down Expand Up @@ -189,11 +189,7 @@ private void runInternal(final IntegrationConfig parsed) throws Exception {
try (final SerializedAirbyteMessageConsumer consumer = destination.getSerializedMessageConsumer(config, catalog, outputRecordCollector)) {
consumeWriteStream(consumer);
} finally {
stopOrphanedThreads(EXIT_HOOK,
INTERRUPT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES,
EXIT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES);
stopOrphanedThreads();
}
}
default -> throw new IllegalStateException("Unexpected value: " + parsed.getCommand());
Expand Down Expand Up @@ -263,23 +259,15 @@ private void readConcurrent(final JsonNode config, final ConfiguredAirbyteCatalo
LOGGER.error("Unable to perform concurrent read.", e);
throw e;
} finally {
stopOrphanedThreads(EXIT_HOOK,
INTERRUPT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES,
EXIT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES);
stopOrphanedThreads();
}
}

private void readSerial(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final Optional<JsonNode> stateOptional) throws Exception {
try (final AutoCloseableIterator<AirbyteMessage> messageIterator = source.read(config, catalog, stateOptional.orElse(null))) {
produceMessages(messageIterator, outputRecordCollector);
} finally {
stopOrphanedThreads(EXIT_HOOK,
INTERRUPT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES,
EXIT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES);
stopOrphanedThreads();
}
}

Expand Down Expand Up @@ -335,6 +323,23 @@ static void consumeWriteStream(final SerializedAirbyteMessageConsumer consumer,
}
}

/**
* Stops any non-daemon threads that could block the JVM from exiting when the main thread is done.
*
* If any active non-daemon threads would be left as orphans, this method will schedule some
* interrupt/exit hooks after giving it some time delay to close up properly. It is generally
* preferred to have a proper closing sequence from children threads instead of interrupting or
* force exiting the process, so this mechanism serve as a fallback while surfacing warnings in logs
* for maintainers to fix the code behavior instead.
*/
static void stopOrphanedThreads() {
stopOrphanedThreads(EXIT_HOOK,
INTERRUPT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES,
EXIT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES);
}

/**
* Stops any non-daemon threads that could block the JVM from exiting when the main thread is done.
* <p>
Expand All @@ -343,6 +348,7 @@ static void consumeWriteStream(final SerializedAirbyteMessageConsumer consumer,
* preferred to have a proper closing sequence from children threads instead of interrupting or
* force exiting the process, so this mechanism serve as a fallback while surfacing warnings in logs
* for maintainers to fix the code behavior instead.
* </p>
*
* @param exitHook The {@link Runnable} exit hook to execute for any orphaned threads.
* @param interruptTimeDelay The time to delay execution of the orphaned thread interrupt attempt.
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/build.gradle
Expand Up @@ -6,7 +6,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.21.4'
cdkVersionRequired = '0.23.1'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/metadata.yaml
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.3.10
dockerImageTag: 3.3.11
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/mysql.md
Expand Up @@ -223,7 +223,8 @@ Any database or table encoding combination of charset and collation is supported

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.3.10 | 2024-02-22 | [35569](https://github.com/airbytehq/airbyte/pull/35569) | Fix logging bug. |
| 3.3.11 | 2024-02-23 | [35527](https://github.com/airbytehq/airbyte/pull/35527) | Adopt 0.23.1 and shutdown timeouts. |
| 3.3.10 | 2024-02-22 | [35569](https://github.com/airbytehq/airbyte/pull/35569) | Fix logging bug. |
| 3.3.9 | 2024-02-21 | [35525](https://github.com/airbytehq/airbyte/pull/35338) | Adopt 0.21.4 and reduce cdc state compression threshold to 1MB. |
| 3.3.8 | 2024-02-20 | [35338](https://github.com/airbytehq/airbyte/pull/35338) | Add config to throw an error on invalid CDC position. |
| 3.3.7 | 2024-02-13 | [35036](https://github.com/airbytehq/airbyte/pull/34751) | Emit analytics message for invalid CDC cursor. |
Expand Down

0 comments on commit fe138de

Please sign in to comment.