Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v2.0.2 release #558

Merged
merged 15 commits into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 0 additions & 24 deletions .github/workflows/cla_bot.yml

This file was deleted.

22 changes: 17 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ authentication. Currently, we support ingestion through the following APIs:
1. [Snowpipe](https://docs.snowflake.com/en/user-guide/data-load-snowpipe-rest-gs.html#client-requirement-java-or-python-sdk)
2. [Snowpipe Streaming](https://docs.snowflake.com/en/user-guide/data-load-snowpipe-streaming-overview) - Under Public Preview

# Dependencies

The Snowflake Ingest Service SDK depends on the following libraries:

* snowflake-jdbc (3.13.30+)
* slf4j-api

These dependencies will be fetched automatically by build systems like Maven or Gradle. If you don't build your project
using a build system, please make sure these dependencies are on the classpath.

# Prerequisites

**If your project depends on the Snowflake JDBC driver, as well, please make sure the JDBC driver version is 3.13.30 or newer.**
Expand Down Expand Up @@ -115,9 +125,11 @@ console log output.
- Here is the link for documentation [Key Pair
Generator](https://docs.snowflake.com/en/user-guide/key-pair-auth.html)

# Code style
# Contributing to this repo

We use [Google Java format](https://github.com/google/google-java-format) to format the code. To format all files, run:
```bash
./format.sh
```
Each PR must pass all required github action merge gates before approval and merge. In addition to those tests, you will need:

- Formatter: run this script [`./format.sh`](https://github.com/snowflakedb/snowflake-ingest-java/blob/master/format.sh) from root
- CLA: all contributers must sign the Snowflake CLA. This is a one time signature, please provide your email so we can work with you to get this signed after you open a PR.

Thank you for contributing! We will review and approve PRs as soon as we can.
22 changes: 15 additions & 7 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<!-- Arifact name and version information -->
<groupId>net.snowflake</groupId>
<artifactId>snowflake-ingest-sdk</artifactId>
<version>2.0.1</version>
<version>2.0.2</version>
<packaging>jar</packaging>
<name>Snowflake Ingest SDK</name>
<description>Snowflake Ingest SDK</description>
Expand Down Expand Up @@ -44,8 +44,8 @@
<commonslogging.version>1.2</commonslogging.version>
<commonstext.version>1.10.0</commonstext.version>
<fasterxml.version>2.14.0</fasterxml.version>
<guava.version>31.1-jre</guava.version>
<hadoop.version>3.3.5</hadoop.version>
<guava.version>32.0.1-jre</guava.version>
<hadoop.version>3.3.6</hadoop.version>
<jacoco.skip.instrument>true</jacoco.skip.instrument>
<jacoco.version>0.8.5</jacoco.version>
<license.processing.dependencyJarsDir>${project.build.directory}/dependency-jars</license.processing.dependencyJarsDir>
Expand All @@ -55,15 +55,15 @@
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<net.minidev.version>2.4.9</net.minidev.version>
<netty.version>4.1.82.Final</netty.version>
<nimbusds.version>9.9.3</nimbusds.version>
<netty.version>4.1.94.Final</netty.version>
<nimbusds.version>9.31</nimbusds.version>
<objenesis.version>3.1</objenesis.version>
<parquet.version>1.12.3</parquet.version>
<parquet.version>1.13.1</parquet.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<protobuf.version>3.19.6</protobuf.version>
<shadeBase>net.snowflake.ingest.internal</shadeBase>
<slf4j.version>1.7.36</slf4j.version>
<snappy.version>1.1.8.3</snappy.version>
<snappy.version>1.1.10.1</snappy.version>
<snowjdbc.version>3.13.30</snowjdbc.version>
<yetus.version>0.13.0</yetus.version>
</properties>
Expand Down Expand Up @@ -184,6 +184,10 @@
<groupId>dnsjava</groupId>
<artifactId>dnsjava</artifactId>
</exclusion>
<exclusion>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</exclusion>
<exclusion>
<groupId>javax.activation</groupId>
<artifactId>activation</artifactId>
Expand Down Expand Up @@ -1015,6 +1019,10 @@
<pattern>javax.activation</pattern>
<shadedPattern>${shadeBase}.javax.activation</shadedPattern>
</relocation>
<relocation>
<pattern>io.airlift.compress</pattern>
<shadedPattern>${shadeBase}.io.airlift.compress</shadedPattern>
</relocation>
</relocations>
<filters>
<filter>
Expand Down
1 change: 1 addition & 0 deletions scripts/process_licenses.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
"org.apache.parquet:parquet-common": APACHE_LICENSE,
"org.apache.parquet:parquet-format-structures": APACHE_LICENSE,
"com.github.luben:zstd-jni": BSD_2_CLAUSE_LICENSE,
"io.airlift:aircompressor": APACHE_LICENSE,
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public class RequestBuilder {
// Don't change!
public static final String CLIENT_NAME = "SnowpipeJavaSDK";

public static final String DEFAULT_VERSION = "2.0.1";
public static final String DEFAULT_VERSION = "2.0.2";

public static final String JAVA_USER_AGENT = "JAVA";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
public class SnowflakeStreamingIngestExample {
// Please follow the example in profile_streaming.json.example to see the required properties, or
// if you have already set up profile.json with Snowpipe before, all you need is to add the "role"
// property.
// property. If the "role" is not specified, the default user role will be applied.
private static String PROFILE_PATH = "profile.json";
private static final ObjectMapper mapper = new ObjectMapper();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ static <T> Blob constructBlobAndMetadata(
// The paddedChunkLength is used because it is the actual data size used for
// decompression and md5 calculation on server side.
.setChunkLength(paddedChunkLength)
.setUncompressedChunkLength((int) serializedChunk.chunkEstimatedUncompressedSize)
.setChannelList(serializedChunk.channelsMetadataList)
.setChunkMD5(md5)
.setEncryptionKeyId(firstChannelFlushContext.getEncryptionKeyId())
Expand All @@ -132,13 +133,13 @@ static <T> Blob constructBlobAndMetadata(

logger.logInfo(
"Finish building chunk in blob={}, table={}, rowCount={}, startOffset={},"
+ " uncompressedSize={}, paddedChunkLength={}, encryptedCompressedSize={},"
+ " estimatedUncompressedSize={}, paddedChunkLength={}, encryptedCompressedSize={},"
+ " bdecVersion={}",
filePath,
firstChannelFlushContext.getFullyQualifiedTableName(),
serializedChunk.rowCount,
startOffset,
serializedChunk.chunkUncompressedSize,
serializedChunk.chunkEstimatedUncompressedSize,
paddedChunkLength,
encryptedCompressedChunkDataSize,
bdecVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class ChunkMetadata {
private final String tableName;
private Long chunkStartOffset;
private final Integer chunkLength;
private final Integer uncompressedChunkLength;
private final List<ChannelMetadata> channels;
private final String chunkMD5;
private final EpInfo epInfo;
Expand All @@ -33,6 +34,9 @@ static class Builder {
private String tableName;
private Long chunkStartOffset;
private Integer chunkLength; // compressedChunkLength

private Integer uncompressedChunkLength;

private List<ChannelMetadata> channels;
private String chunkMD5;
private EpInfo epInfo;
Expand Down Expand Up @@ -62,6 +66,15 @@ Builder setChunkLength(Integer chunkLength) {
return this;
}

/**
* Currently we send estimated uncompressed size that is close to the actual parquet data size
* and mostly about user data but parquet encoding overhead may be slightly different.
*/
public Builder setUncompressedChunkLength(Integer uncompressedChunkLength) {
this.uncompressedChunkLength = uncompressedChunkLength;
return this;
}

Builder setChannelList(List<ChannelMetadata> channels) {
this.channels = channels;
return this;
Expand Down Expand Up @@ -110,6 +123,7 @@ private ChunkMetadata(Builder builder) {
this.tableName = builder.tableName;
this.chunkStartOffset = builder.chunkStartOffset;
this.chunkLength = builder.chunkLength;
this.uncompressedChunkLength = builder.uncompressedChunkLength;
this.channels = builder.channels;
this.chunkMD5 = builder.chunkMD5;
this.epInfo = builder.epInfo;
Expand Down Expand Up @@ -152,6 +166,11 @@ Integer getChunkLength() {
return chunkLength;
}

@JsonProperty("chunk_length_uncompressed")
public Integer getUncompressedChunkLength() {
return uncompressedChunkLength;
}

@JsonProperty("channels")
List<ChannelMetadata> getChannels() {
return this.channels;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,21 @@ class SerializationResult {
final List<ChannelMetadata> channelsMetadataList;
final Map<String, RowBufferStats> columnEpStatsMapCombined;
final long rowCount;
final float chunkUncompressedSize;
final float chunkEstimatedUncompressedSize;
final ByteArrayOutputStream chunkData;
final Pair<Long, Long> chunkMinMaxInsertTimeInMs;

public SerializationResult(
List<ChannelMetadata> channelsMetadataList,
Map<String, RowBufferStats> columnEpStatsMapCombined,
long rowCount,
float chunkUncompressedSize,
float chunkEstimatedUncompressedSize,
ByteArrayOutputStream chunkData,
Pair<Long, Long> chunkMinMaxInsertTimeInMs) {
this.channelsMetadataList = channelsMetadataList;
this.columnEpStatsMapCombined = columnEpStatsMapCombined;
this.rowCount = rowCount;
this.chunkUncompressedSize = chunkUncompressedSize;
this.chunkEstimatedUncompressedSize = chunkEstimatedUncompressedSize;
this.chunkData = chunkData;
this.chunkMinMaxInsertTimeInMs = chunkMinMaxInsertTimeInMs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
throws IOException {
List<ChannelMetadata> channelsMetadataList = new ArrayList<>();
long rowCount = 0L;
float chunkUncompressedSize = 0f;
float chunkEstimatedUncompressedSize = 0f;

Check warning on line 56 in src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java#L56

Added line #L56 was not covered by tests
String firstChannelFullyQualifiedTableName = null;
Map<String, RowBufferStats> columnEpStatsMapCombined = null;
BdecParquetWriter mergedChannelWriter = null;
Expand Down Expand Up @@ -104,7 +104,7 @@
}

rowCount += data.getRowCount();
chunkUncompressedSize += data.getBufferSize();
chunkEstimatedUncompressedSize += data.getBufferSize();

Check warning on line 107 in src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java#L107

Added line #L107 was not covered by tests

logger.logDebug(
"Parquet Flusher: Finish building channel={}, rowCount={}, bufferSize={} in blob={}",
Expand All @@ -121,7 +121,7 @@
channelsMetadataList,
columnEpStatsMapCombined,
rowCount,
chunkUncompressedSize,
chunkEstimatedUncompressedSize,
mergedChunkData,
chunkMinMaxInsertTimeInMs);
}
Expand All @@ -131,7 +131,7 @@
throws IOException {
List<ChannelMetadata> channelsMetadataList = new ArrayList<>();
long rowCount = 0L;
float chunkUncompressedSize = 0f;
float chunkEstimatedUncompressedSize = 0f;
String firstChannelFullyQualifiedTableName = null;
Map<String, RowBufferStats> columnEpStatsMapCombined = null;
List<List<Object>> rows = null;
Expand Down Expand Up @@ -183,7 +183,7 @@
rows.addAll(data.getVectors().rows);

rowCount += data.getRowCount();
chunkUncompressedSize += data.getBufferSize();
chunkEstimatedUncompressedSize += data.getBufferSize();

logger.logDebug(
"Parquet Flusher: Finish building channel={}, rowCount={}, bufferSize={} in blob={},"
Expand All @@ -206,7 +206,7 @@
channelsMetadataList,
columnEpStatsMapCombined,
rowCount,
chunkUncompressedSize,
chunkEstimatedUncompressedSize,
mergedData,
chunkMinMaxInsertTimeInMs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,40 @@ public Long apply(T input) {

private static final ObjectMapper objectMapper = new ObjectMapper();

static void sleepForRetry(int executionCount) {
/**
* How many milliseconds of exponential backoff to sleep before retrying the request again:
*
* <ul>
* <li>0 or 1 failure => no sleep
* <li>2 failures => 1s
* <li>3 failures => 2s
* <li>4 or more failures => 4s
* </ul>
*
* @param executionCount How many unsuccessful attempts have been attempted
* @return Sleep time in ms
*/
static long getSleepForRetryMs(int executionCount) {
if (executionCount < 0) {
throw new IllegalArgumentException(
String.format(
"executionCount must be a non-negative integer, passed: %d", executionCount));
} else if (executionCount < 2) {
return 0;
} else {
final int effectiveExecutionCount = Math.min(executionCount, 4);
return (1 << (effectiveExecutionCount - 2)) * 1000L;
}
}

public static void sleepForRetry(int executionCount) {
long sleepForRetryMs = getSleepForRetryMs(executionCount);
if (sleepForRetryMs == 0) {
return;
}

try {
Thread.sleep((1 << (executionCount + 1)) * 1000);
Thread.sleep(sleepForRetryMs);
} catch (InterruptedException e) {
throw new SFException(ErrorCode.INTERNAL_ERROR, e.getMessage());
}
Expand Down
Loading
Loading