Skip to content

Commit

Permalink
馃悰 Destination snowflake: use 200MB batches (airbytehq#34502)
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao authored and jatinyadav-cc committed Feb 21, 2024
1 parent 77b32b1 commit 02ba78d
Show file tree
Hide file tree
Showing 14 changed files with 166 additions and 61 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.17.0 | 2024-02-08 | [\#34502](https://github.com/airbytehq/airbyte/pull/34502) | Enable configuring async destination batch size. |
| 0.16.6 | 2024-02-07 | [\#34892](https://github.com/airbytehq/airbyte/pull/34892) | Improved testcontainers logging and support for unshared containers. |
| 0.16.5 | 2024-02-07 | [\#34948](https://github.com/airbytehq/airbyte/pull/34948) | Fix source state stats counting logic |
| 0.16.4 | 2024-02-01 | [\#34727](https://github.com/airbytehq/airbyte/pull/34727) | Add future based stdout consumer in BaseTypingDedupingTest |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public interface DestinationFlushFunction {
* vague because I don't understand the specifics.
*/
default long getQueueFlushThresholdBytes() {
return 10 * 1024 * 1024; // 10MB
return Math.max(10 * 1024 * 1024, getOptimalBatchSizeBytes());
}

}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.16.6
version=0.17.0
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,6 @@ class AsyncFlush implements DestinationFlushFunction {
private final long optimalBatchSizeBytes;
private final boolean useDestinationsV2Columns;

public AsyncFlush(final Map<StreamDescriptor, WriteConfig> streamDescToWriteConfig,
final StagingOperations stagingOperations,
final JdbcDatabase database,
final ConfiguredAirbyteCatalog catalog,
final TypeAndDedupeOperationValve typerDeduperValve,
final TyperDeduper typerDeduper,
final boolean useDestinationsV2Columns) {
this(streamDescToWriteConfig, stagingOperations, database, catalog, typerDeduperValve, typerDeduper, 50 * 1024 * 1024, useDestinationsV2Columns);
}

public AsyncFlush(final Map<StreamDescriptor, WriteConfig> streamDescToWriteConfig,
final StagingOperations stagingOperations,
final JdbcDatabase database,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,50 +50,149 @@ public class StagingConsumerFactory extends SerialStagingConsumerFactory {

private static final DateTime SYNC_DATETIME = DateTime.now(DateTimeZone.UTC);

public SerializedAirbyteMessageConsumer createAsync(final Consumer<AirbyteMessage> outputRecordCollector,
final JdbcDatabase database,
final StagingOperations stagingOperations,
final NamingConventionTransformer namingResolver,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final boolean purgeStagingData,
final TypeAndDedupeOperationValve typerDeduperValve,
final TyperDeduper typerDeduper,
final ParsedCatalog parsedCatalog,
final String defaultNamespace,
final boolean useDestinationsV2Columns) {
return createAsync(outputRecordCollector,
database,
stagingOperations,
namingResolver,
config,
catalog,
purgeStagingData,
typerDeduperValve,
typerDeduper,
parsedCatalog,
defaultNamespace,
useDestinationsV2Columns,
Optional.empty());
private final Consumer<AirbyteMessage> outputRecordCollector;
private final JdbcDatabase database;
private final StagingOperations stagingOperations;
private final NamingConventionTransformer namingResolver;
private final JsonNode config;
private final ConfiguredAirbyteCatalog catalog;
private final boolean purgeStagingData;
private final TypeAndDedupeOperationValve typerDeduperValve;
private final TyperDeduper typerDeduper;
private final ParsedCatalog parsedCatalog;
private final String defaultNamespace;
private final boolean useDestinationsV2Columns;

// Optional fields
private final Optional<Long> bufferMemoryLimit;
private final long optimalBatchSizeBytes;

private StagingConsumerFactory(
final Consumer<AirbyteMessage> outputRecordCollector,
final JdbcDatabase database,
final StagingOperations stagingOperations,
final NamingConventionTransformer namingResolver,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final boolean purgeStagingData,
final TypeAndDedupeOperationValve typerDeduperValve,
final TyperDeduper typerDeduper,
final ParsedCatalog parsedCatalog,
final String defaultNamespace,
final boolean useDestinationsV2Columns,
final Optional<Long> bufferMemoryLimit,
final long optimalBatchSizeBytes) {
this.outputRecordCollector = outputRecordCollector;
this.database = database;
this.stagingOperations = stagingOperations;
this.namingResolver = namingResolver;
this.config = config;
this.catalog = catalog;
this.purgeStagingData = purgeStagingData;
this.typerDeduperValve = typerDeduperValve;
this.typerDeduper = typerDeduper;
this.parsedCatalog = parsedCatalog;
this.defaultNamespace = defaultNamespace;
this.useDestinationsV2Columns = useDestinationsV2Columns;
this.bufferMemoryLimit = bufferMemoryLimit;
this.optimalBatchSizeBytes = optimalBatchSizeBytes;
}

public SerializedAirbyteMessageConsumer createAsync(final Consumer<AirbyteMessage> outputRecordCollector,
final JdbcDatabase database,
final StagingOperations stagingOperations,
final NamingConventionTransformer namingResolver,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final boolean purgeStagingData,
final TypeAndDedupeOperationValve typerDeduperValve,
final TyperDeduper typerDeduper,
final ParsedCatalog parsedCatalog,
final String defaultNamespace,
final boolean useDestinationsV2Columns,
final Optional<Long> bufferMemoryLimit) {
public static class Builder {

// Required (?) fields
// (TODO which of these are _actually_ required, and which have we just coincidentally always
// provided?)
private Consumer<AirbyteMessage> outputRecordCollector;
private JdbcDatabase database;
private StagingOperations stagingOperations;
private NamingConventionTransformer namingResolver;
private JsonNode config;
private ConfiguredAirbyteCatalog catalog;
private boolean purgeStagingData;
private TypeAndDedupeOperationValve typerDeduperValve;
private TyperDeduper typerDeduper;
private ParsedCatalog parsedCatalog;
private String defaultNamespace;
private boolean useDestinationsV2Columns;

// Optional fields
private Optional<Long> bufferMemoryLimit = Optional.empty();
private long optimalBatchSizeBytes = 50 * 1024 * 1024;

private Builder() {}

public Builder setBufferMemoryLimit(final Optional<Long> bufferMemoryLimit) {
this.bufferMemoryLimit = bufferMemoryLimit;
return this;
}

public Builder setOptimalBatchSizeBytes(final long optimalBatchSizeBytes) {
this.optimalBatchSizeBytes = optimalBatchSizeBytes;
return this;
}

public StagingConsumerFactory build() {
return new StagingConsumerFactory(
outputRecordCollector,
database,
stagingOperations,
namingResolver,
config,
catalog,
purgeStagingData,
typerDeduperValve,
typerDeduper,
parsedCatalog,
defaultNamespace,
useDestinationsV2Columns,
bufferMemoryLimit,
optimalBatchSizeBytes);
}

}

public static Builder builder(
final Consumer<AirbyteMessage> outputRecordCollector,
final JdbcDatabase database,
final StagingOperations stagingOperations,
final NamingConventionTransformer namingResolver,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final boolean purgeStagingData,
final TypeAndDedupeOperationValve typerDeduperValve,
final TyperDeduper typerDeduper,
final ParsedCatalog parsedCatalog,
final String defaultNamespace,
final boolean useDestinationsV2Columns) {
final Builder builder = new Builder();
builder.outputRecordCollector = outputRecordCollector;
builder.database = database;
builder.stagingOperations = stagingOperations;
builder.namingResolver = namingResolver;
builder.config = config;
builder.catalog = catalog;
builder.purgeStagingData = purgeStagingData;
builder.typerDeduperValve = typerDeduperValve;
builder.typerDeduper = typerDeduper;
builder.parsedCatalog = parsedCatalog;
builder.defaultNamespace = defaultNamespace;
builder.useDestinationsV2Columns = useDestinationsV2Columns;
return builder;
}

public SerializedAirbyteMessageConsumer createAsync() {
final List<WriteConfig> writeConfigs = createWriteConfigs(namingResolver, config, catalog, parsedCatalog, useDestinationsV2Columns);
final var streamDescToWriteConfig = streamDescToWriteConfig(writeConfigs);
final var flusher =
new AsyncFlush(streamDescToWriteConfig, stagingOperations, database, catalog, typerDeduperValve, typerDeduper, useDestinationsV2Columns);
final var flusher = new AsyncFlush(
streamDescToWriteConfig,
stagingOperations,
database,
catalog,
typerDeduperValve,
typerDeduper,
optimalBatchSizeBytes,
useDestinationsV2Columns);
return new AsyncStreamConsumer(
outputRecordCollector,
GeneralStagingFunctions.onStartFunction(database, stagingOperations, writeConfigs, typerDeduper),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.16.3'
cdkVersionRequired = '0.17.0'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerImageTag: 2.1.5
dockerImageTag: 2.1.6
dockerRepository: airbyte/destination-redshift
documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift
githubIssueLabel: destination-redshift
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
typerDeduper =
new DefaultTyperDeduper<>(sqlGenerator, redshiftDestinationHandler, parsedCatalog, migrator, v2TableMigrator, defaultThreadCount);
}
return new StagingConsumerFactory().createAsync(
return StagingConsumerFactory.builder(
outputRecordCollector,
database,
new RedshiftS3StagingSqlOperations(getNamingResolver(), s3Config.getS3Client(), s3Config, encryptionConfig),
Expand All @@ -248,7 +248,7 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
typerDeduper,
parsedCatalog,
defaultNamespace,
true);
true).build().createAsync();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.15.1'
cdkVersionRequired = '0.17.0'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerImageTag: 3.5.4
dockerImageTag: 3.5.5
dockerRepository: airbyte/destination-snowflake
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
githubIssueLabel: destination-snowflake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
new DefaultTyperDeduper<>(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator, v2TableMigrator, defaultThreadCount);
}

return new StagingConsumerFactory().createAsync(
return StagingConsumerFactory.builder(
outputRecordCollector,
database,
new SnowflakeInternalStagingSqlOperations(getNamingResolver()),
Expand All @@ -177,8 +177,16 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
typerDeduper,
parsedCatalog,
defaultNamespace,
true,
Optional.of(getSnowflakeBufferMemoryLimit()));
true)
.setBufferMemoryLimit(Optional.of(getSnowflakeBufferMemoryLimit()))
.setOptimalBatchSizeBytes(
// The per stream size limit is following recommendations from:
// https://docs.snowflake.com/en/user-guide/data-load-considerations-prepare.html#general-file-sizing-recommendations
// "To optimize the number of parallel operations for a load,
// we recommend aiming to produce data files roughly 100-250 MB (or larger) in size compressed."
200 * 1024 * 1024)
.build()
.createAsync();
}

private static long getSnowflakeBufferMemoryLimit() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ public Optional<SnowflakeTableDefinition> findExistingTable(final StreamId id) t
}
}

@Override
public LinkedHashMap<String, SnowflakeTableDefinition> findExistingFinalTables(final List<StreamId> list) throws Exception {
return null;
}

@Override
public boolean isFinalTableEmpty(final StreamId id) throws SQLException {
final int rowCount = database.queryInt(
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/redshift.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:------------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 2.1.6 | 2024-02-08 | [\#34502](https://github.com/airbytehq/airbyte/pull/34502) | Update to CDK version 0.17.0 |
| 2.1.5 | 2024-01-30 | [\#34680](https://github.com/airbytehq/airbyte/pull/34680) | Update to CDK version 0.16.3 |
| 2.1.4 | 2024-01-29 | [\#34634](https://github.com/airbytehq/airbyte/pull/34634) | Use lowercase raw schema and table in T+D [CDK changes](https://github.com/airbytehq/airbyte/pull/34533) |
| 2.1.3 | 2024-01-26 | [\#34544](https://github.com/airbytehq/airbyte/pull/34544) | Proper string-escaping in raw tables |
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ Otherwise, make sure to grant the role the required permissions in the desired n

| Version | Date | Pull Request | Subject |
|:----------------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.5.5 | 2024-02-08 | [\#34502](https://github.com/airbytehq/airbyte/pull/34502) | Reduce COPY frequency |
| 3.5.4 | 2024-01-24 | [\#34451](https://github.com/airbytehq/airbyte/pull/34451) | Improve logging for unparseable input |
| 3.5.3 | 2024-01-25 | [\#34528](https://github.com/airbytehq/airbyte/pull/34528) | Fix spurious `check` failure (`UnsupportedOperationException: Snowflake does not use the native JDBC DV2 interface`) |
| 3.5.2 | 2024-01-24 | [\#34458](https://github.com/airbytehq/airbyte/pull/34458) | Improve error reporting |
Expand Down

0 comments on commit 02ba78d

Please sign in to comment.