Skip to content

Commit

Permalink
@snow SNOW-835618 Snowpipe Streaming: send uncompressed chunk length …
Browse files Browse the repository at this point in the history
…from SDK to GS
  • Loading branch information
sfc-gh-azagrebin committed Jun 9, 2023
1 parent bdb0a4a commit fde5fdb
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ static <T> Blob constructBlobAndMetadata(
// The paddedChunkLength is used because it is the actual data size used for
// decompression and md5 calculation on server side.
.setChunkLength(paddedChunkLength)
.setUncompressedChunkLength((int) serializedChunk.chunkUncompressedSize)
.setChannelList(serializedChunk.channelsMetadataList)
.setChunkMD5(md5)
.setEncryptionKeyId(firstChannelFlushContext.getEncryptionKeyId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class ChunkMetadata {
private final String tableName;
private Long chunkStartOffset;
private final Integer chunkLength;
private final Integer uncompressedChunkLength;
private final List<ChannelMetadata> channels;
private final String chunkMD5;
private final EpInfo epInfo;
Expand All @@ -33,6 +34,9 @@ static class Builder {
private String tableName;
private Long chunkStartOffset;
private Integer chunkLength; // compressedChunkLength

private Integer uncompressedChunkLength;

private List<ChannelMetadata> channels;
private String chunkMD5;
private EpInfo epInfo;
Expand Down Expand Up @@ -62,6 +66,11 @@ Builder setChunkLength(Integer chunkLength) {
return this;
}

public Builder setUncompressedChunkLength(Integer uncompressedChunkLength) {
this.uncompressedChunkLength = uncompressedChunkLength;
return this;
}

Builder setChannelList(List<ChannelMetadata> channels) {
this.channels = channels;
return this;
Expand Down Expand Up @@ -110,6 +119,7 @@ private ChunkMetadata(Builder builder) {
this.tableName = builder.tableName;
this.chunkStartOffset = builder.chunkStartOffset;
this.chunkLength = builder.chunkLength;
this.uncompressedChunkLength = builder.uncompressedChunkLength;
this.channels = builder.channels;
this.chunkMD5 = builder.chunkMD5;
this.epInfo = builder.epInfo;
Expand Down Expand Up @@ -152,6 +162,11 @@ Integer getChunkLength() {
return chunkLength;
}

@JsonProperty("chunk_length_uncompressed")
public Integer getUncompressedChunkLength() {
return uncompressedChunkLength;
}

@JsonProperty("channels")
List<ChannelMetadata> getChannels() {
return this.channels;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,8 @@ public void testBlobBuilder() throws Exception {
Assert.assertEquals(chunkMetadata.getTableName(), map.get("table"));
Assert.assertEquals(chunkMetadata.getSchemaName(), map.get("schema"));
Assert.assertEquals(chunkMetadata.getDBName(), map.get("database"));
Assert.assertEquals(chunkMetadata.getChunkLength(), map.get("chunk_length"));
Assert.assertEquals(chunkMetadata.getUncompressedChunkLength(), map.get("chunk_length_uncompressed"));
Assert.assertEquals(
Long.toString(chunkMetadata.getChunkStartOffset() - offset),
map.get("chunk_start_offset").toString());
Expand Down

0 comments on commit fde5fdb

Please sign in to comment.