diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/BlobMetadata.java b/src/main/java/net/snowflake/ingest/streaming/internal/BlobMetadata.java index d2cf31a6c..fb366db9c 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/BlobMetadata.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/BlobMetadata.java @@ -18,11 +18,18 @@ class BlobMetadata { private final Constants.BdecVersion bdecVersion; private final List chunks; private final BlobStats blobStats; + private final boolean spansMixedTables; // used for testing only @VisibleForTesting BlobMetadata(String path, String md5, List chunks, BlobStats blobStats) { - this(path, md5, ParameterProvider.BLOB_FORMAT_VERSION_DEFAULT, chunks, blobStats); + this( + path, + md5, + ParameterProvider.BLOB_FORMAT_VERSION_DEFAULT, + chunks, + blobStats, + chunks == null ? false : chunks.size() > 1); } BlobMetadata( @@ -30,12 +37,14 @@ class BlobMetadata { String md5, Constants.BdecVersion bdecVersion, List chunks, - BlobStats blobStats) { + BlobStats blobStats, + boolean spansMixedTables) { this.path = path; this.md5 = md5; this.bdecVersion = bdecVersion; this.chunks = chunks; this.blobStats = blobStats; + this.spansMixedTables = spansMixedTables; } @JsonIgnore @@ -68,13 +77,19 @@ BlobStats getBlobStats() { return this.blobStats; } + @JsonProperty("spans_mixed_tables") + boolean getSpansMixedTables() { + return this.spansMixedTables; + } + /** Create {@link BlobMetadata}. */ static BlobMetadata createBlobMetadata( String path, String md5, Constants.BdecVersion bdecVersion, List chunks, - BlobStats blobStats) { - return new BlobMetadata(path, md5, bdecVersion, chunks, blobStats); + BlobStats blobStats, + boolean spansMixedTables) { + return new BlobMetadata(path, md5, bdecVersion, chunks, blobStats, spansMixedTables); } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index cb1ef7810..f47580feb 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -568,8 +568,15 @@ BlobMetadata upload( blob.length, System.currentTimeMillis() - startTime); + // at this point we know for sure if the BDEC file has data for more than one chunk, i.e. + // spans mixed tables or not return BlobMetadata.createBlobMetadata( - blobPath, BlobBuilder.computeMD5(blob), bdecVersion, metadata, blobStats); + blobPath, + BlobBuilder.computeMD5(blob), + bdecVersion, + metadata, + blobStats, + metadata == null ? false : metadata.size() > 1); } /** diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index d857efdf9..88510a8d2 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -719,7 +719,10 @@ List getRetryBlobs( blobMetadata.getMD5(), blobMetadata.getVersion(), relevantChunks, - blobMetadata.getBlobStats())); + blobMetadata.getBlobStats(), + // Important to not change the spansMixedTables value in case of retries. The + // correct value is the value that the already uploaded blob has. + blobMetadata.getSpansMixedTables())); } }); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java index 000b948e9..37eb5f96e 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java @@ -7,6 +7,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import net.snowflake.ingest.utils.Pair; @@ -17,7 +18,7 @@ public class RegisterServiceTest { @Test - public void testRegisterService() { + public void testRegisterService() throws ExecutionException, InterruptedException { RegisterService rs = new RegisterService<>(null, true); Pair, CompletableFuture> blobFuture = @@ -26,6 +27,7 @@ public void testRegisterService() { CompletableFuture.completedFuture(new BlobMetadata("path", "md5", null, null))); rs.addBlobs(Collections.singletonList(blobFuture)); Assert.assertEquals(1, rs.getBlobsList().size()); + Assert.assertEquals(false, blobFuture.getValue().get().getSpansMixedTables()); List> errorBlobs = rs.registerBlobs(null); Assert.assertEquals(0, rs.getBlobsList().size()); Assert.assertEquals(0, errorBlobs.size());