From 0534b0006302f32157d3958f66d81661b2e278fc Mon Sep 17 00:00:00 2001 From: Lukas Sembera Date: Tue, 16 May 2023 13:13:05 +0000 Subject: [PATCH 1/3] SNOW-818891 Remove Arrow and other unneeded dependenies --- pom.xml | 39 +- .../streaming/internal/AbstractRowBuffer.java | 46 +- .../streaming/internal/ArrowFlusher.java | 121 --- .../streaming/internal/ArrowRowBuffer.java | 813 ------------------ .../streaming/internal/BlobBuilder.java | 98 +-- .../streaming/internal/BlobMetadata.java | 2 +- .../streaming/internal/ChannelCache.java | 3 +- .../streaming/internal/ChannelData.java | 3 +- .../streaming/internal/FlushService.java | 9 +- .../ingest/streaming/internal/Flusher.java | 3 +- .../streaming/internal/ParquetRowBuffer.java | 3 - .../streaming/internal/RegisterService.java | 3 +- .../ingest/streaming/internal/RowBuffer.java | 3 +- ...nowflakeStreamingIngestChannelFactory.java | 17 +- ...owflakeStreamingIngestChannelInternal.java | 12 +- ...nowflakeStreamingIngestClientInternal.java | 21 +- .../internal/StreamingIngestStage.java | 2 +- .../streaming/internal/TimestampWrapper.java | 2 +- .../net/snowflake/ingest/utils/Constants.java | 6 +- .../net/snowflake/ingest/utils/Cryptor.java | 2 +- .../ingest/utils/ParameterProvider.java | 2 +- .../net/snowflake/ingest/utils/Utils.java | 9 - .../third-party-licenses/REVISED_BSD.txt | 30 - .../java/net/snowflake/ingest/TestUtils.java | 5 +- .../streaming/internal/ArrowBufferTest.java | 444 ---------- .../streaming/internal/FlushServiceTest.java | 61 +- .../internal/OpenManyChannelsIT.java | 2 +- .../streaming/internal/RowBufferTest.java | 27 +- .../SnowflakeStreamingIngestChannelTest.java | 16 +- .../SnowflakeStreamingIngestClientTest.java | 24 +- .../datatypes/AbstractDataTypeTest.java | 2 +- .../internal/datatypes/DateTimeIT.java | 8 +- .../internal/datatypes/StringsIT.java | 10 +- 33 files changed, 84 insertions(+), 1764 deletions(-) delete mode 100644 src/main/java/net/snowflake/ingest/streaming/internal/ArrowFlusher.java delete mode 100644 src/main/java/net/snowflake/ingest/streaming/internal/ArrowRowBuffer.java delete mode 100644 src/main/resources/META-INF/third-party-licenses/REVISED_BSD.txt delete mode 100644 src/test/java/net/snowflake/ingest/streaming/internal/ArrowBufferTest.java diff --git a/pom.xml b/pom.xml index 1be577462..dd972a28c 100644 --- a/pom.xml +++ b/pom.xml @@ -35,7 +35,6 @@ - 10.0.0 1.9.13 1.15 3.2.2 @@ -157,6 +156,10 @@ com.github.pjfanning jersey-json + + com.jcraft + jsch + com.sun.jersey jersey-core @@ -169,6 +172,10 @@ com.sun.jersey jersey-servlet + + commons-logging + commons-logging + dnsjava dnsjava @@ -189,10 +196,22 @@ org.apache.avro avro + + org.apache.curator + curator-client + + + org.apache.curator + curator-recipes + org.apache.hadoop hadoop-auth + + org.apache.httpcomponents + httpclient + org.apache.kerby kerb-core @@ -406,17 +425,6 @@ net.snowflake snowflake-jdbc - - - org.apache.arrow - arrow-memory-core - ${arrow.version} - - - org.apache.arrow - arrow-vector - ${arrow.version} - org.apache.hadoop hadoop-common @@ -439,12 +447,6 @@ slf4j-api provided - - org.apache.arrow - arrow-memory-netty - ${arrow.version} - runtime - junit @@ -752,7 +754,6 @@ Apache License 2.0 BSD 2-Clause License 3-Clause BSD License - Revised BSD The MIT License EDL 1.0 The Go license diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java index 0580006d0..c5d0bddac 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java @@ -4,6 +4,7 @@ package net.snowflake.ingest.streaming.internal; +import com.google.common.annotations.VisibleForTesting; import java.time.ZoneId; import java.util.ArrayList; import java.util.HashMap; @@ -23,17 +24,13 @@ import net.snowflake.ingest.utils.Logging; import net.snowflake.ingest.utils.Pair; import net.snowflake.ingest.utils.SFException; -import net.snowflake.ingest.utils.Utils; -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.util.VisibleForTesting; /** * The abstract implementation of the buffer in the Streaming Ingest channel that holds the * un-flushed rows, these rows will be converted to the underlying format implementation for faster * processing * - * @param type of column data (Arrow {@link org.apache.arrow.vector.VectorSchemaRoot} or Parquet - * {@link ParquetChunkData}) + * @param type of column data ({@link ParquetChunkData} for Parquet) */ abstract class AbstractRowBuffer implements RowBuffer { private static final Logging logger = new Logging(AbstractRowBuffer.class); @@ -162,9 +159,6 @@ public int getOrdinal() { // Metric callback to report size of inserted rows private final Consumer rowSizeMetric; - // Allocator used to allocate the buffers - final BufferAllocator allocator; - // State of the owning channel final ChannelRuntimeState channelState; @@ -176,7 +170,6 @@ public int getOrdinal() { AbstractRowBuffer( OpenChannelRequest.OnErrorOption onErrorOption, ZoneId defaultTimezone, - BufferAllocator allocator, String fullyQualifiedChannelName, Consumer rowSizeMetric, ChannelRuntimeState channelRuntimeState) { @@ -185,7 +178,6 @@ public int getOrdinal() { this.rowSizeMetric = rowSizeMetric; this.channelState = channelRuntimeState; this.channelFullyQualifiedName = fullyQualifiedChannelName; - this.allocator = allocator; this.nonNullableFieldNames = new HashSet<>(); this.flushLock = new ReentrantLock(); this.bufferedRowCount = 0; @@ -505,30 +497,8 @@ void reset() { /** Close the row buffer and release allocated memory for the channel. */ @Override public synchronized void close(String name) { - long allocatedBeforeRelease = this.allocator.getAllocatedMemory(); closeInternal(); - - long allocatedAfterRelease = this.allocator.getAllocatedMemory(); - logger.logInfo( - "Trying to close {} for channel={} from function={}, allocatedBeforeRelease={}," - + " allocatedAfterRelease={}", - this.getClass().getSimpleName(), - channelFullyQualifiedName, - name, - allocatedBeforeRelease, - allocatedAfterRelease); - Utils.closeAllocator(this.allocator); - - // If the channel is valid but still has leftover data, throw an exception because it should be - // cleaned up already before calling close - if (allocatedBeforeRelease > 0 && this.channelState.isValid()) { - throw new SFException( - ErrorCode.INTERNAL_ERROR, - String.format( - "Memory leaked=%d by allocator=%s, channel=%s", - allocatedBeforeRelease, this.allocator, channelFullyQualifiedName)); - } } /** @@ -554,7 +524,6 @@ static EpInfo buildEpInfoFromStats(long rowCount, Map co static AbstractRowBuffer createRowBuffer( OpenChannelRequest.OnErrorOption onErrorOption, ZoneId defaultTimezone, - BufferAllocator allocator, Constants.BdecVersion bdecVersion, String fullyQualifiedChannelName, Consumer rowSizeMetric, @@ -562,23 +531,12 @@ static AbstractRowBuffer createRowBuffer( boolean enableParquetMemoryOptimization, long maxChunkSizeInBytes) { switch (bdecVersion) { - case ONE: - //noinspection unchecked - return (AbstractRowBuffer) - new ArrowRowBuffer( - onErrorOption, - defaultTimezone, - allocator, - fullyQualifiedChannelName, - rowSizeMetric, - channelRuntimeState); case THREE: //noinspection unchecked return (AbstractRowBuffer) new ParquetRowBuffer( onErrorOption, defaultTimezone, - allocator, fullyQualifiedChannelName, rowSizeMetric, channelRuntimeState, diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ArrowFlusher.java b/src/main/java/net/snowflake/ingest/streaming/internal/ArrowFlusher.java deleted file mode 100644 index 061405a53..000000000 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ArrowFlusher.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Copyright (c) 2022 Snowflake Computing Inc. All rights reserved. - */ - -package net.snowflake.ingest.streaming.internal; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import net.snowflake.ingest.utils.ErrorCode; -import net.snowflake.ingest.utils.Logging; -import net.snowflake.ingest.utils.Pair; -import net.snowflake.ingest.utils.SFException; -import org.apache.arrow.vector.VectorLoader; -import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.VectorUnloader; -import org.apache.arrow.vector.ipc.ArrowStreamWriter; -import org.apache.arrow.vector.ipc.ArrowWriter; -import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; - -/** - * Converts {@link ChannelData} buffered in {@link RowBuffer} to the Arrow format for faster - * processing. - */ -public class ArrowFlusher implements Flusher { - private static final Logging logger = new Logging(ArrowFlusher.class); - - @Override - public Flusher.SerializationResult serialize( - List> channelsDataPerTable, String filePath) - throws IOException { - ByteArrayOutputStream chunkData = new ByteArrayOutputStream(); - List channelsMetadataList = new ArrayList<>(); - long rowCount = 0L; - float chunkUncompressedSize = 0f; - VectorSchemaRoot root = null; - ArrowWriter arrowWriter = null; - VectorLoader loader = null; - String firstChannelFullyQualifiedTableName = null; - Map columnEpStatsMapCombined = null; - Pair chunkMinMaxInsertTimeInMs = null; - - try { - for (ChannelData data : channelsDataPerTable) { - // Create channel metadata - ChannelMetadata channelMetadata = - ChannelMetadata.builder() - .setOwningChannelFromContext(data.getChannelContext()) - .setRowSequencer(data.getRowSequencer()) - .setOffsetToken(data.getOffsetToken()) - .build(); - // Add channel metadata to the metadata list - channelsMetadataList.add(channelMetadata); - - logger.logDebug( - "Start building channel={}, rowCount={}, bufferSize={} in blob={}", - data.getChannelContext().getFullyQualifiedName(), - data.getRowCount(), - data.getBufferSize(), - filePath); - - if (root == null) { - columnEpStatsMapCombined = data.getColumnEps(); - root = data.getVectors(); - arrowWriter = new ArrowStreamWriter(root, null, chunkData); - loader = new VectorLoader(root); - firstChannelFullyQualifiedTableName = - data.getChannelContext().getFullyQualifiedTableName(); - arrowWriter.start(); - chunkMinMaxInsertTimeInMs = data.getMinMaxInsertTimeInMs(); - } else { - // This method assumes that channelsDataPerTable is grouped by table. We double check - // here and throw an error if the assumption is violated - if (!data.getChannelContext() - .getFullyQualifiedTableName() - .equals(firstChannelFullyQualifiedTableName)) { - throw new SFException(ErrorCode.INVALID_DATA_IN_CHUNK); - } - - columnEpStatsMapCombined = - ChannelData.getCombinedColumnStatsMap(columnEpStatsMapCombined, data.getColumnEps()); - chunkMinMaxInsertTimeInMs = - ChannelData.getCombinedMinMaxInsertTimeInMs( - chunkMinMaxInsertTimeInMs, data.getMinMaxInsertTimeInMs()); - - VectorUnloader unloader = new VectorUnloader(data.getVectors()); - ArrowRecordBatch recordBatch = unloader.getRecordBatch(); - loader.load(recordBatch); - recordBatch.close(); - data.getVectors().close(); - } - - // Write channel data using the stream writer - arrowWriter.writeBatch(); - rowCount += data.getRowCount(); - chunkUncompressedSize += data.getBufferSize(); - - logger.logDebug( - "Finish building channel={}, rowCount={}, bufferSize={} in blob={}", - data.getChannelContext().getFullyQualifiedName(), - data.getRowCount(), - data.getBufferSize(), - filePath); - } - } finally { - if (arrowWriter != null) { - arrowWriter.close(); - root.close(); - } - } - return new Flusher.SerializationResult( - channelsMetadataList, - columnEpStatsMapCombined, - rowCount, - chunkUncompressedSize, - chunkData, - chunkMinMaxInsertTimeInMs); - } -} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ArrowRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ArrowRowBuffer.java deleted file mode 100644 index 3e936233d..000000000 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ArrowRowBuffer.java +++ /dev/null @@ -1,813 +0,0 @@ -/* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. - */ - -package net.snowflake.ingest.streaming.internal; - -import java.math.BigDecimal; -import java.math.BigInteger; -import java.math.RoundingMode; -import java.time.ZoneId; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.function.Consumer; -import net.snowflake.client.jdbc.internal.google.common.collect.Sets; -import net.snowflake.ingest.streaming.OpenChannelRequest; -import net.snowflake.ingest.utils.ErrorCode; -import net.snowflake.ingest.utils.Logging; -import net.snowflake.ingest.utils.SFException; -import net.snowflake.ingest.utils.Utils; -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.util.VisibleForTesting; -import org.apache.arrow.vector.BaseFixedWidthVector; -import org.apache.arrow.vector.BaseVariableWidthVector; -import org.apache.arrow.vector.BigIntVector; -import org.apache.arrow.vector.BitVector; -import org.apache.arrow.vector.DateDayVector; -import org.apache.arrow.vector.DecimalVector; -import org.apache.arrow.vector.FieldVector; -import org.apache.arrow.vector.Float8Vector; -import org.apache.arrow.vector.IntVector; -import org.apache.arrow.vector.SmallIntVector; -import org.apache.arrow.vector.TinyIntVector; -import org.apache.arrow.vector.VarBinaryVector; -import org.apache.arrow.vector.VarCharVector; -import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.complex.StructVector; -import org.apache.arrow.vector.types.Types; -import org.apache.arrow.vector.types.pojo.ArrowType; -import org.apache.arrow.vector.types.pojo.Field; -import org.apache.arrow.vector.types.pojo.FieldType; -import org.apache.arrow.vector.util.Text; -import org.apache.arrow.vector.util.TransferPair; - -/** - * The buffer in the Streaming Ingest channel that holds the un-flushed rows, these rows will be - * converted to Arrow format for faster processing - */ -class ArrowRowBuffer extends AbstractRowBuffer { - private static final Logging logger = new Logging(ArrowRowBuffer.class); - - // Constants for column fields - private static final String FIELD_EPOCH_IN_SECONDS = "epoch"; // seconds since epoch - private static final String FIELD_TIME_ZONE = "timezone"; // time zone index - private static final String FIELD_FRACTION_IN_NANOSECONDS = "fraction"; // fraction in nanoseconds - - // Column metadata that will send back to server as part of the blob, and will be used by the - // Arrow reader - private static final String COLUMN_PHYSICAL_TYPE = "physicalType"; - private static final String COLUMN_LOGICAL_TYPE = "logicalType"; - private static final String COLUMN_NULLABLE = "nullable"; - static final String COLUMN_SCALE = "scale"; - private static final String COLUMN_PRECISION = "precision"; - private static final String COLUMN_CHAR_LENGTH = "charLength"; - private static final String COLUMN_BYTE_LENGTH = "byteLength"; - @VisibleForTesting static final int DECIMAL_BIT_WIDTH = 128; - - // Holder for a set of the Arrow vectors (buffers) - @VisibleForTesting VectorSchemaRoot vectorsRoot; - - // For ABORT on_error option, temp vectors are needed to temporarily holding the rows until - // they're all validated, then the rows will be transferred to the final VectorSchemaRoot - @VisibleForTesting VectorSchemaRoot tempVectorsRoot; - - // Map the column name to Arrow column field - private final Map fields; - - /** Construct a ArrowRowBuffer object. */ - ArrowRowBuffer( - OpenChannelRequest.OnErrorOption onErrorOption, - ZoneId defaultTimezone, - BufferAllocator allocator, - String fullyQualifiedChannelName, - Consumer rowSizeMetric, - ChannelRuntimeState channelState) { - super( - onErrorOption, - defaultTimezone, - allocator, - fullyQualifiedChannelName, - rowSizeMetric, - channelState); - this.fields = new HashMap<>(); - } - - /** - * Setup the column fields and vectors using the column metadata from the server - * - * @param columns list of column metadata - */ - @Override - public void setupSchema(List columns) { - List vectors = new ArrayList<>(); - List tempVectors = new ArrayList<>(); - - for (ColumnMetadata column : columns) { - validateColumnCollation(column); - Field field = buildField(column); - FieldVector vector = field.createVector(this.allocator); - if (!field.isNullable()) { - addNonNullableFieldName(field.getName()); - } - this.fields.put(column.getInternalName(), field); - vectors.add(vector); - this.statsMap.put( - column.getInternalName(), new RowBufferStats(column.getName(), column.getCollation())); - - if (onErrorOption == OpenChannelRequest.OnErrorOption.ABORT) { - FieldVector tempVector = field.createVector(this.allocator); - tempVectors.add(tempVector); - this.tempStatsMap.put( - column.getInternalName(), new RowBufferStats(column.getName(), column.getCollation())); - } - } - - this.vectorsRoot = new VectorSchemaRoot(vectors); - this.tempVectorsRoot = new VectorSchemaRoot(tempVectors); - } - - /** Close the row buffer by releasing its internal resources. */ - @Override - public void closeInternal() { - if (this.vectorsRoot != null) { - this.vectorsRoot.close(); - this.tempVectorsRoot.close(); - } - this.fields.clear(); - } - - /** Reset the variables after each flush. Note that the caller needs to handle synchronization */ - @Override - void reset() { - super.reset(); - this.vectorsRoot.clear(); - } - - /** - * Build the column field from the column metadata - * - * @param column column metadata - * @return Column field object - */ - Field buildField(ColumnMetadata column) { - ArrowType arrowType; - FieldType fieldType; - List children = null; - - // Put info into the metadata, which will be used by the Arrow reader later - Map metadata = new HashMap<>(); - metadata.put(COLUMN_LOGICAL_TYPE, column.getLogicalType()); - metadata.put(COLUMN_PHYSICAL_TYPE, column.getPhysicalType()); - metadata.put(COLUMN_NULLABLE, String.valueOf(column.getNullable())); - - ColumnPhysicalType physicalType; - ColumnLogicalType logicalType; - try { - physicalType = ColumnPhysicalType.valueOf(column.getPhysicalType()); - logicalType = ColumnLogicalType.valueOf(column.getLogicalType()); - } catch (IllegalArgumentException e) { - throw new SFException( - ErrorCode.UNKNOWN_DATA_TYPE, column.getLogicalType(), column.getPhysicalType()); - } - - if (column.getPrecision() != null) { - metadata.put(COLUMN_PRECISION, column.getPrecision().toString()); - } - if (column.getScale() != null) { - metadata.put(COLUMN_SCALE, column.getScale().toString()); - } - if (column.getByteLength() != null) { - metadata.put(COLUMN_BYTE_LENGTH, column.getByteLength().toString()); - } - if (column.getLength() != null) { - metadata.put(COLUMN_CHAR_LENGTH, column.getLength().toString()); - } - - // Handle differently depends on the column logical and physical types - switch (logicalType) { - case FIXED: - if ((column.getScale() != null && column.getScale() != 0) - || physicalType == ColumnPhysicalType.SB16) { - arrowType = - new ArrowType.Decimal(column.getPrecision(), column.getScale(), DECIMAL_BIT_WIDTH); - } else { - switch (physicalType) { - case SB1: - arrowType = Types.MinorType.TINYINT.getType(); - break; - case SB2: - arrowType = Types.MinorType.SMALLINT.getType(); - break; - case SB4: - arrowType = Types.MinorType.INT.getType(); - break; - case SB8: - arrowType = Types.MinorType.BIGINT.getType(); - break; - default: - throw new SFException( - ErrorCode.UNKNOWN_DATA_TYPE, column.getLogicalType(), column.getPhysicalType()); - } - } - break; - case ANY: - case ARRAY: - case CHAR: - case TEXT: - case OBJECT: - case VARIANT: - arrowType = Types.MinorType.VARCHAR.getType(); - break; - case TIMESTAMP_LTZ: - case TIMESTAMP_NTZ: - switch (physicalType) { - case SB8: - arrowType = Types.MinorType.BIGINT.getType(); - break; - case SB16: - { - arrowType = Types.MinorType.STRUCT.getType(); - FieldType fieldTypeEpoch = - new FieldType(true, Types.MinorType.BIGINT.getType(), null, metadata); - FieldType fieldTypeFraction = - new FieldType(true, Types.MinorType.INT.getType(), null, metadata); - Field fieldEpoch = new Field(FIELD_EPOCH_IN_SECONDS, fieldTypeEpoch, null); - Field fieldFraction = - new Field(FIELD_FRACTION_IN_NANOSECONDS, fieldTypeFraction, null); - children = new ArrayList<>(); - children.add(fieldEpoch); - children.add(fieldFraction); - break; - } - default: - throw new SFException( - ErrorCode.UNKNOWN_DATA_TYPE, column.getLogicalType(), column.getPhysicalType()); - } - break; - case TIMESTAMP_TZ: - switch (physicalType) { - case SB8: - { - arrowType = Types.MinorType.STRUCT.getType(); - FieldType fieldTypeEpoch = - new FieldType(true, Types.MinorType.BIGINT.getType(), null, metadata); - FieldType fieldTypeTimezone = - new FieldType(true, Types.MinorType.INT.getType(), null, metadata); - Field fieldEpoch = new Field(FIELD_EPOCH_IN_SECONDS, fieldTypeEpoch, null); - Field fieldTimezone = new Field(FIELD_TIME_ZONE, fieldTypeTimezone, null); - - children = new ArrayList<>(); - children.add(fieldEpoch); - children.add(fieldTimezone); - break; - } - case SB16: - { - arrowType = Types.MinorType.STRUCT.getType(); - FieldType fieldTypeEpoch = - new FieldType(true, Types.MinorType.BIGINT.getType(), null, metadata); - FieldType fieldTypeFraction = - new FieldType(true, Types.MinorType.INT.getType(), null, metadata); - FieldType fieldTypeTimezone = - new FieldType(true, Types.MinorType.INT.getType(), null, metadata); - Field fieldEpoch = new Field(FIELD_EPOCH_IN_SECONDS, fieldTypeEpoch, null); - Field fieldFraction = - new Field(FIELD_FRACTION_IN_NANOSECONDS, fieldTypeFraction, null); - Field fieldTimezone = new Field(FIELD_TIME_ZONE, fieldTypeTimezone, null); - - children = new ArrayList<>(); - children.add(fieldEpoch); - children.add(fieldFraction); - children.add(fieldTimezone); - break; - } - default: - throw new SFException( - ErrorCode.UNKNOWN_DATA_TYPE, - "Unknown physical type for TIMESTAMP_TZ: " + physicalType); - } - break; - case DATE: - arrowType = Types.MinorType.DATEDAY.getType(); - break; - case TIME: - switch (physicalType) { - case SB4: - arrowType = Types.MinorType.INT.getType(); - break; - case SB8: - arrowType = Types.MinorType.BIGINT.getType(); - break; - default: - throw new SFException( - ErrorCode.UNKNOWN_DATA_TYPE, column.getLogicalType(), column.getPhysicalType()); - } - break; - case BOOLEAN: - arrowType = Types.MinorType.BIT.getType(); - break; - case BINARY: - arrowType = Types.MinorType.VARBINARY.getType(); - break; - case REAL: - arrowType = Types.MinorType.FLOAT8.getType(); - break; - default: - throw new SFException( - ErrorCode.UNKNOWN_DATA_TYPE, column.getLogicalType(), column.getPhysicalType()); - } - - // Create the corresponding column field base on the column data type - fieldType = new FieldType(column.getNullable(), arrowType, null, metadata); - return new Field(column.getInternalName(), fieldType, children); - } - - @Override - void moveTempRowsToActualBuffer(int tempRowCount) { - // If all the rows are inserted successfully, transfer the rows from temp vectors to - // the final vectors and update the row size and row count - // TODO: switch to VectorSchemaRootAppender once it works for all vector types - for (Field field : fields.values()) { - FieldVector from = this.tempVectorsRoot.getVector(field); - FieldVector to = this.vectorsRoot.getVector(field); - for (int rowIdx = 0; rowIdx < tempRowCount; rowIdx++) { - to.copyFromSafe(rowIdx, this.bufferedRowCount + rowIdx, from); - } - } - } - - @Override - void clearTempRows() { - tempVectorsRoot.clear(); - } - - @Override - boolean hasColumns() { - return !fields.isEmpty(); - } - - @Override - Optional getSnapshot(final String filePath) { - List oldVectors = new ArrayList<>(); - for (FieldVector vector : this.vectorsRoot.getFieldVectors()) { - vector.setValueCount(this.bufferedRowCount); - if (vector instanceof DecimalVector) { - // DecimalVectors do not transfer FieldType metadata when using - // vector.getTransferPair. We need to explicitly create the new vector to transfer to - // in order to keep the metadata. - ArrowType arrowType = - new ArrowType.Decimal( - ((DecimalVector) vector).getPrecision(), - ((DecimalVector) vector).getScale(), - DECIMAL_BIT_WIDTH); - FieldType fieldType = - new FieldType( - vector.getField().isNullable(), arrowType, null, vector.getField().getMetadata()); - Field f = new Field(vector.getName(), fieldType, null); - DecimalVector newVector = new DecimalVector(f, this.allocator); - TransferPair t = vector.makeTransferPair(newVector); - t.transfer(); - oldVectors.add((FieldVector) t.getTo()); - } else { - TransferPair t = vector.getTransferPair(this.allocator); - t.transfer(); - oldVectors.add((FieldVector) t.getTo()); - } - } - VectorSchemaRoot root = new VectorSchemaRoot(oldVectors); - root.setRowCount(this.bufferedRowCount); - return oldVectors.isEmpty() ? Optional.empty() : Optional.of(root); - } - - @Override - boolean hasColumn(String name) { - return this.fields.get(name) != null; - } - - @Override - float addRow( - Map row, - int bufferedRowIndex, - Map statsMap, - Set formattedInputColumnNames, - final long insertRowIndex) { - return convertRowToArrow( - row, vectorsRoot, bufferedRowIndex, statsMap, formattedInputColumnNames, insertRowIndex); - } - - @Override - float addTempRow( - Map row, - int curRowIndex, - Map statsMap, - Set formattedInputColumnNames, - long insertRowIndex) { - return convertRowToArrow( - row, tempVectorsRoot, curRowIndex, statsMap, formattedInputColumnNames, insertRowIndex); - } - - /** - * Convert the input row to the correct Arrow format - * - * @param row input row - * @param sourceVectors vectors (buffers) that hold the row - * @param bufferedRowIndex Buffered row index. This is not the same as the input row index - * @param statsMap column stats map - * @param inputColumnNames list of input column names after formatting - * @param insertRowsCurrIndex Row index of the input Rows passed in {@link - * net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel#insertRows(Iterable, - * String)} - * @return row size - */ - private float convertRowToArrow( - Map row, - VectorSchemaRoot sourceVectors, - int bufferedRowIndex, - Map statsMap, - Set inputColumnNames, - long insertRowsCurrIndex) { - // Insert values to the corresponding arrow buffers - float rowBufferSize = 0F; - // Create new empty stats just for the current row. - Map forkedStatsMap = new HashMap<>(); - - for (Map.Entry entry : row.entrySet()) { - rowBufferSize += 0.125; // 1/8 for null value bitmap - String columnName = LiteralQuoteUtils.unquoteColumnName(entry.getKey()); - Object value = entry.getValue(); - Field field = this.fields.get(columnName); - Utils.assertNotNull("Arrow column field", field); - FieldVector vector = sourceVectors.getVector(field); - Utils.assertNotNull("Arrow column vector", vector); - RowBufferStats forkedStats = statsMap.get(columnName).forkEmpty(); - forkedStatsMap.put(columnName, forkedStats); - Utils.assertNotNull("Arrow column stats", forkedStats); - ColumnLogicalType logicalType = - ColumnLogicalType.valueOf(field.getMetadata().get(COLUMN_LOGICAL_TYPE)); - ColumnPhysicalType physicalType = - ColumnPhysicalType.valueOf(field.getMetadata().get(COLUMN_PHYSICAL_TYPE)); - - boolean isParsedValueNull = false; - if (value != null) { - switch (logicalType) { - case FIXED: - int columnPrecision = Integer.parseInt(field.getMetadata().get(COLUMN_PRECISION)); - int columnScale = getColumnScale(field.getMetadata()); - BigDecimal inputAsBigDecimal = - DataValidationUtil.validateAndParseBigDecimal( - forkedStats.getColumnDisplayName(), value, insertRowsCurrIndex); - // vector.setSafe requires the BigDecimal input scale explicitly match its scale - inputAsBigDecimal = inputAsBigDecimal.setScale(columnScale, RoundingMode.HALF_UP); - - DataValidationUtil.checkValueInRange( - inputAsBigDecimal, columnScale, columnPrecision, insertRowsCurrIndex); - - if (columnScale != 0 || physicalType == ColumnPhysicalType.SB16) { - ((DecimalVector) vector).setSafe(bufferedRowIndex, inputAsBigDecimal); - forkedStats.addIntValue(inputAsBigDecimal.unscaledValue()); - rowBufferSize += 16; - } else { - switch (physicalType) { - case SB1: - ((TinyIntVector) vector) - .setSafe(bufferedRowIndex, inputAsBigDecimal.byteValueExact()); - forkedStats.addIntValue(inputAsBigDecimal.toBigInteger()); - rowBufferSize += 1; - break; - case SB2: - ((SmallIntVector) vector) - .setSafe(bufferedRowIndex, inputAsBigDecimal.shortValueExact()); - forkedStats.addIntValue(inputAsBigDecimal.toBigInteger()); - rowBufferSize += 2; - break; - case SB4: - ((IntVector) vector).setSafe(bufferedRowIndex, inputAsBigDecimal.intValueExact()); - forkedStats.addIntValue(inputAsBigDecimal.toBigInteger()); - rowBufferSize += 4; - break; - case SB8: - ((BigIntVector) vector) - .setSafe(bufferedRowIndex, inputAsBigDecimal.longValueExact()); - forkedStats.addIntValue(inputAsBigDecimal.toBigInteger()); - rowBufferSize += 8; - break; - default: - throw new SFException(ErrorCode.UNKNOWN_DATA_TYPE, logicalType, physicalType); - } - } - break; - case ANY: - case CHAR: - case TEXT: - { - String maxLengthString = field.getMetadata().get(COLUMN_CHAR_LENGTH); - String str = - DataValidationUtil.validateAndParseString( - forkedStats.getColumnDisplayName(), - value, - Optional.ofNullable(maxLengthString).map(Integer::parseInt), - insertRowsCurrIndex); - Text text = new Text(str); - ((VarCharVector) vector).setSafe(bufferedRowIndex, text); - forkedStats.addStrValue(str); - rowBufferSize += text.getBytes().length; - break; - } - case OBJECT: - { - String str = - DataValidationUtil.validateAndParseObject( - forkedStats.getColumnDisplayName(), value, insertRowsCurrIndex); - Text text = new Text(str); - ((VarCharVector) vector).setSafe(bufferedRowIndex, text); - rowBufferSize += text.getBytes().length; - break; - } - case ARRAY: - { - String str = - DataValidationUtil.validateAndParseArray( - forkedStats.getColumnDisplayName(), value, insertRowsCurrIndex); - Text text = new Text(str); - ((VarCharVector) vector).setSafe(bufferedRowIndex, text); - rowBufferSize += text.getBytes().length; - break; - } - case VARIANT: - { - String str = - DataValidationUtil.validateAndParseVariant( - forkedStats.getColumnDisplayName(), value, insertRowsCurrIndex); - if (str != null) { - Text text = new Text(str); - ((VarCharVector) vector).setSafe(bufferedRowIndex, text); - rowBufferSize += text.getBytes().length; - } else { - isParsedValueNull = true; - } - break; - } - case TIMESTAMP_LTZ: - case TIMESTAMP_NTZ: - boolean trimTimezone = logicalType == ColumnLogicalType.TIMESTAMP_NTZ; - - switch (physicalType) { - case SB8: - { - BigIntVector bigIntVector = (BigIntVector) vector; - TimestampWrapper timestampWrapper = - DataValidationUtil.validateAndParseTimestamp( - forkedStats.getColumnDisplayName(), - value, - getColumnScale(field.getMetadata()), - defaultTimezone, - trimTimezone, - insertRowsCurrIndex); - BigInteger timestampBinary = timestampWrapper.toBinary(false); - bigIntVector.setSafe(bufferedRowIndex, timestampBinary.longValue()); - forkedStats.addIntValue(timestampBinary); - rowBufferSize += 8; - break; - } - case SB16: - { - StructVector structVector = (StructVector) vector; - BigIntVector epochVector = - (BigIntVector) structVector.getChild(FIELD_EPOCH_IN_SECONDS); - IntVector fractionVector = - (IntVector) structVector.getChild(FIELD_FRACTION_IN_NANOSECONDS); - rowBufferSize += 0.25; // for children vector's null value - structVector.setIndexDefined(bufferedRowIndex); - - TimestampWrapper timestampWrapper = - DataValidationUtil.validateAndParseTimestamp( - forkedStats.getColumnDisplayName(), - value, - getColumnScale(field.getMetadata()), - defaultTimezone, - trimTimezone, - insertRowsCurrIndex); - epochVector.setSafe(bufferedRowIndex, timestampWrapper.getEpoch()); - fractionVector.setSafe(bufferedRowIndex, timestampWrapper.getFraction()); - rowBufferSize += 12; - forkedStats.addIntValue(timestampWrapper.toBinary(false)); - break; - } - default: - throw new SFException(ErrorCode.UNKNOWN_DATA_TYPE, logicalType, physicalType); - } - break; - case TIMESTAMP_TZ: - switch (physicalType) { - case SB8: - { - StructVector structVector = (StructVector) vector; - BigIntVector epochVector = - (BigIntVector) structVector.getChild(FIELD_EPOCH_IN_SECONDS); - IntVector timezoneVector = (IntVector) structVector.getChild(FIELD_TIME_ZONE); - - rowBufferSize += 0.25; // for children vector's null value - structVector.setIndexDefined(bufferedRowIndex); - - TimestampWrapper timestampWrapper = - DataValidationUtil.validateAndParseTimestamp( - forkedStats.getColumnDisplayName(), - value, - getColumnScale(field.getMetadata()), - defaultTimezone, - false, - insertRowsCurrIndex); - epochVector.setSafe( - bufferedRowIndex, timestampWrapper.toBinary(false).longValueExact()); - timezoneVector.setSafe(bufferedRowIndex, timestampWrapper.getTimeZoneIndex()); - rowBufferSize += 12; - forkedStats.addIntValue(timestampWrapper.toBinary(true)); - break; - } - case SB16: - { - StructVector structVector = (StructVector) vector; - BigIntVector epochVector = - (BigIntVector) structVector.getChild(FIELD_EPOCH_IN_SECONDS); - IntVector fractionVector = - (IntVector) structVector.getChild(FIELD_FRACTION_IN_NANOSECONDS); - IntVector timezoneVector = (IntVector) structVector.getChild(FIELD_TIME_ZONE); - - rowBufferSize += 0.375; // for children vector's null value - structVector.setIndexDefined(bufferedRowIndex); - - TimestampWrapper timestampWrapper = - DataValidationUtil.validateAndParseTimestamp( - forkedStats.getColumnDisplayName(), - value, - getColumnScale(field.getMetadata()), - defaultTimezone, - false, - insertRowsCurrIndex); - epochVector.setSafe(bufferedRowIndex, timestampWrapper.getEpoch()); - fractionVector.setSafe(bufferedRowIndex, timestampWrapper.getFraction()); - timezoneVector.setSafe(bufferedRowIndex, timestampWrapper.getTimeZoneIndex()); - rowBufferSize += 16; - BigInteger timeInBinary = timestampWrapper.toBinary(true); - forkedStats.addIntValue(timeInBinary); - break; - } - default: - throw new SFException(ErrorCode.UNKNOWN_DATA_TYPE, logicalType, physicalType); - } - break; - case DATE: - { - DateDayVector dateDayVector = (DateDayVector) vector; - // Expect days past the epoch - int intValue = - DataValidationUtil.validateAndParseDate( - forkedStats.getColumnDisplayName(), value, insertRowsCurrIndex); - dateDayVector.setSafe(bufferedRowIndex, intValue); - forkedStats.addIntValue(BigInteger.valueOf(intValue)); - rowBufferSize += 4; - break; - } - case TIME: - switch (physicalType) { - case SB4: - { - BigInteger timeInScale = - DataValidationUtil.validateAndParseTime( - forkedStats.getColumnDisplayName(), - value, - getColumnScale(field.getMetadata()), - insertRowsCurrIndex); - ((IntVector) vector).setSafe(bufferedRowIndex, timeInScale.intValue()); - forkedStats.addIntValue(timeInScale); - rowBufferSize += 4; - break; - } - case SB8: - { - BigInteger timeInScale = - DataValidationUtil.validateAndParseTime( - forkedStats.getColumnDisplayName(), - value, - getColumnScale(field.getMetadata()), - insertRowsCurrIndex); - ((BigIntVector) vector).setSafe(bufferedRowIndex, timeInScale.longValue()); - forkedStats.addIntValue(timeInScale); - rowBufferSize += 8; - break; - } - default: - throw new SFException(ErrorCode.UNKNOWN_DATA_TYPE, logicalType, physicalType); - } - break; - case BOOLEAN: - { - int intValue = - DataValidationUtil.validateAndParseBoolean( - forkedStats.getColumnDisplayName(), value, insertRowsCurrIndex); - ((BitVector) vector).setSafe(bufferedRowIndex, intValue); - rowBufferSize += 0.125; - forkedStats.addIntValue(BigInteger.valueOf(intValue)); - break; - } - case BINARY: - String maxLengthString = field.getMetadata().get(COLUMN_BYTE_LENGTH); - byte[] bytes = - DataValidationUtil.validateAndParseBinary( - forkedStats.getColumnDisplayName(), - value, - Optional.ofNullable(maxLengthString).map(Integer::parseInt), - insertRowsCurrIndex); - ((VarBinaryVector) vector).setSafe(bufferedRowIndex, bytes); - forkedStats.addBinaryValue(bytes); - rowBufferSize += bytes.length; - break; - case REAL: - double doubleValue = - DataValidationUtil.validateAndParseReal( - forkedStats.getColumnDisplayName(), value, insertRowsCurrIndex); - ((Float8Vector) vector).setSafe(bufferedRowIndex, doubleValue); - forkedStats.addRealValue(doubleValue); - rowBufferSize += 8; - break; - default: - throw new SFException(ErrorCode.UNKNOWN_DATA_TYPE, logicalType, physicalType); - } - } - - if (value == null || isParsedValueNull) { - if (!field.getFieldType().isNullable()) { - throw new SFException( - ErrorCode.INVALID_FORMAT_ROW, columnName, "Passed null to non nullable field"); - } else { - insertNull(vector, forkedStats, bufferedRowIndex); - } - } - } - - // All input values passed validation, iterate over the columns again and combine their existing - // statistics with the forked statistics for the current row. - for (Map.Entry forkedColStats : forkedStatsMap.entrySet()) { - String columnName = forkedColStats.getKey(); - statsMap.put( - columnName, - RowBufferStats.getCombinedStats(statsMap.get(columnName), forkedColStats.getValue())); - } - - // Insert nulls to the columns that doesn't show up in the input - for (String columnName : Sets.difference(this.fields.keySet(), inputColumnNames)) { - rowBufferSize += 0.125; // 1/8 for null value bitmap - insertNull( - sourceVectors.getVector(this.fields.get(columnName)), - statsMap.get(columnName), - bufferedRowIndex); - } - - return rowBufferSize; - } - - /** Helper function to insert null value to a field vector */ - private void insertNull(FieldVector vector, RowBufferStats stats, int curRowIndex) { - if (BaseFixedWidthVector.class.isAssignableFrom(vector.getClass())) { - ((BaseFixedWidthVector) vector).setNull(curRowIndex); - } else if (BaseVariableWidthVector.class.isAssignableFrom(vector.getClass())) { - ((BaseVariableWidthVector) vector).setNull(curRowIndex); - } else if (vector instanceof StructVector) { - ((StructVector) vector).setNull(curRowIndex); - ((StructVector) vector) - .getChildrenFromFields() - .forEach( - child -> { - ((BaseFixedWidthVector) child).setNull(curRowIndex); - }); - } else { - throw new SFException(ErrorCode.INTERNAL_ERROR, "Unexpected FieldType"); - } - stats.incCurrentNullCount(); - } - - private int getColumnScale(Map metadata) { - return Integer.parseInt(metadata.get(ArrowRowBuffer.COLUMN_SCALE)); - } - - @Override - public Flusher createFlusher() { - return new ArrowFlusher(); - } - - @VisibleForTesting - @Override - Object getVectorValueAt(String column, int index) { - Object value = vectorsRoot.getVector(column).getObject(index); - return (value instanceof Text) ? new String(((Text) value).getBytes()) : value; - } - - @VisibleForTesting - int getTempRowCount() { - return tempVectorsRoot.getRowCount(); - } -} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java index 7d7c139c1..6ab0ce18b 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java @@ -11,7 +11,6 @@ import static net.snowflake.ingest.utils.Constants.BLOB_NO_HEADER; import static net.snowflake.ingest.utils.Constants.BLOB_TAG_SIZE_IN_BYTES; import static net.snowflake.ingest.utils.Constants.BLOB_VERSION_SIZE_IN_BYTES; -import static net.snowflake.ingest.utils.Constants.COMPRESS_BLOB_TWICE; import static net.snowflake.ingest.utils.Utils.toByteArray; import com.fasterxml.jackson.core.JsonProcessingException; @@ -25,7 +24,6 @@ import java.util.ArrayList; import java.util.List; import java.util.zip.CRC32; -import java.util.zip.GZIPOutputStream; import javax.crypto.BadPaddingException; import javax.crypto.IllegalBlockSizeException; import javax.crypto.NoSuchPaddingException; @@ -48,8 +46,8 @@ *
  • variable size chunks metadata in json format * * - *

    After the metadata, it will be one or more chunks of variable size Arrow/Parquet data, and - * each chunk will be encrypted and compressed separately. + *

    After the metadata, it will be one or more chunks of variable size Parquet data, and each + * chunk will be encrypted and compressed separately. */ class BlobBuilder { @@ -86,14 +84,10 @@ static Blob constructBlobAndMetadata( if (!serializedChunk.channelsMetadataList.isEmpty()) { ByteArrayOutputStream chunkData = serializedChunk.chunkData; - Pair compressionResult = - compressIfNeededAndPadChunk( - filePath, - chunkData, - Constants.ENCRYPTION_ALGORITHM_BLOCK_SIZE_BYTES, - bdecVersion == Constants.BdecVersion.ONE); - byte[] compressedAndPaddedChunkData = compressionResult.getFirst(); - int compressedChunkLength = compressionResult.getSecond(); + Pair paddedChunk = + padChunk(chunkData, Constants.ENCRYPTION_ALGORITHM_BLOCK_SIZE_BYTES); + byte[] compressedAndPaddedChunkData = paddedChunk.getFirst(); + int compressedChunkLength = paddedChunk.getSecond(); // Encrypt the compressed chunk data, the encryption key is derived using the key from // server with the full blob path. @@ -161,84 +155,22 @@ static Blob constructBlobAndMetadata( } /** - * Gzip compress the given chunk data - * - * @param filePath blob file full path - * @param chunkData uncompressed chunk data - * @param blockSizeToAlignTo block size to align to for encryption - * @return padded compressed chunk data, aligned to blockSizeToAlignTo, and actual length of - * compressed data before padding at the end - * @throws IOException - */ - static Pair compress( - String filePath, ByteArrayOutputStream chunkData, int blockSizeToAlignTo) throws IOException { - int uncompressedSize = chunkData.size(); - ByteArrayOutputStream compressedOutputStream = new ByteArrayOutputStream(uncompressedSize); - try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(compressedOutputStream, true)) { - gzipOutputStream.write(chunkData.toByteArray()); - } - int firstCompressedSize = compressedOutputStream.size(); - - // Based on current experiment, compressing twice will give us the best compression - // ratio and compression time combination - int doubleCompressedSize = 0; - if (COMPRESS_BLOB_TWICE) { - ByteArrayOutputStream doubleCompressedOutputStream = - new ByteArrayOutputStream(firstCompressedSize); - try (GZIPOutputStream doubleGzipOutputStream = - new GZIPOutputStream(doubleCompressedOutputStream, true)) { - doubleGzipOutputStream.write(compressedOutputStream.toByteArray()); - } - doubleCompressedSize = doubleCompressedOutputStream.size(); - compressedOutputStream = doubleCompressedOutputStream; - } - - logger.logDebug( - "Finish compressing chunk in blob={}, uncompressedSize={}, firstCompressedSize={}," - + " doubleCompressedSize={}", - filePath, - uncompressedSize, - firstCompressedSize, - doubleCompressedSize); - - int compressedSize = compressedOutputStream.size(); - int paddingSize = blockSizeToAlignTo - compressedSize % blockSizeToAlignTo; - compressedOutputStream.write(new byte[paddingSize]); - return new Pair<>(compressedOutputStream.toByteArray(), compressedSize); - } - - /** - * Gzip compress the given chunk data if required by the given write mode and pads the compressed - * data for encryption. Only for Arrow. For Parquet the compression is done in the Parquet - * library. + * Pad the compressed data for encryption. Encryption needs padding to the + * ENCRYPTION_ALGORITHM_BLOCK_SIZE_BYTES to align with decryption on the Snowflake query path + * starting from this chunk offset. * - * @param filePath blob file full path * @param chunkData uncompressed chunk data * @param blockSizeToAlignTo block size to align to for encryption - * @param compress whether to compress the chunk * @return padded compressed chunk data, aligned to blockSizeToAlignTo, and actual length of * compressed data before padding at the end * @throws IOException */ - static Pair compressIfNeededAndPadChunk( - String filePath, ByteArrayOutputStream chunkData, int blockSizeToAlignTo, boolean compress) + static Pair padChunk(ByteArrayOutputStream chunkData, int blockSizeToAlignTo) throws IOException { - // Encryption needs padding to the ENCRYPTION_ALGORITHM_BLOCK_SIZE_BYTES - // to align with decryption on the Snowflake query path starting from this chunk offset. - // The padding does not have arrow data and not compressed. - // Hence, the actual chunk size is smaller by the padding size. - // The compression on the Snowflake query path needs the correct size of the compressed - // data. - if (compress) { - // Stream write mode does not support column level compression. - // Compress the chunk data and pad it for encryption. - return BlobBuilder.compress(filePath, chunkData, blockSizeToAlignTo); - } else { - int actualSize = chunkData.size(); - int paddingSize = blockSizeToAlignTo - actualSize % blockSizeToAlignTo; - chunkData.write(new byte[paddingSize]); - return new Pair<>(chunkData.toByteArray(), actualSize); - } + int actualSize = chunkData.size(); + int paddingSize = blockSizeToAlignTo - actualSize % blockSizeToAlignTo; + chunkData.write(new byte[paddingSize]); + return new Pair<>(chunkData.toByteArray(), actualSize); } /** @@ -285,7 +217,7 @@ static byte[] buildBlob( blob.write(chunkData); } - // We need to update the start offset for the EP and Arrow/Parquet data in the request since + // We need to update the start offset for the EP and Parquet data in the request since // some metadata was added at the beginning for (ChunkMetadata chunkMetadata : chunksMetadataList) { chunkMetadata.advanceStartOffset(metadataSize); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/BlobMetadata.java b/src/main/java/net/snowflake/ingest/streaming/internal/BlobMetadata.java index 392bd53fd..d2cf31a6c 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/BlobMetadata.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/BlobMetadata.java @@ -6,10 +6,10 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; import java.util.List; import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ParameterProvider; -import org.apache.arrow.util.VisibleForTesting; /** Metadata for a blob that sends to Snowflake as part of the register blob request */ class BlobMetadata { diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java index 2cfeefc00..3926832fb 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java @@ -14,8 +14,7 @@ * during flush. The key is a fully qualified table name and the value is a set of channels that * belongs to this table * - * @param type of column data (Arrow {@link org.apache.arrow.vector.VectorSchemaRoot} or {@link - * ParquetChunkData}) + * @param type of column data ({@link ParquetChunkData}) */ class ChannelCache { // Cache to hold all the valid channels, the key for the outer map is FullyQualifiedTableName and diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelData.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelData.java index 07ce4b989..cd4dabaa6 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelData.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelData.java @@ -15,8 +15,7 @@ * Contains the data and metadata returned for each channel flush, which will be used to build the * blob and register blob request * - * @param type of column data (Arrow {@link org.apache.arrow.vector.VectorSchemaRoot} or Parquet - * {@link ParquetChunkData} + * @param type of column data (Parquet {@link ParquetChunkData} */ class ChannelData { private Long rowSequencer; diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index 7f38c1324..ae4d04a23 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -12,6 +12,7 @@ import static net.snowflake.ingest.utils.Utils.getStackTrace; import com.codahale.metrics.Timer; +import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.lang.management.ManagementFactory; import java.security.InvalidAlgorithmParameterException; @@ -46,8 +47,6 @@ import net.snowflake.ingest.utils.Pair; import net.snowflake.ingest.utils.SFException; import net.snowflake.ingest.utils.Utils; -import org.apache.arrow.util.VisibleForTesting; -import org.apache.arrow.vector.VectorSchemaRoot; /** * Responsible for flushing data from client to Snowflake tables. When a flush is triggered, it will @@ -57,8 +56,7 @@ *

  • upload the blob to stage *
  • register the blob to the targeted Snowflake table * - * @param type of column data (Arrow {@link org.apache.arrow.vector.VectorSchemaRoot} or {@link - * ParquetChunkData}) + * @param type of column data ({@link ParquetChunkData}) */ class FlushService { @@ -626,9 +624,6 @@ void invalidateAllChannelsInBlob(List>> blobData) { chunkData -> chunkData.forEach( channelData -> { - if (channelData.getVectors() instanceof VectorSchemaRoot) { - ((VectorSchemaRoot) channelData.getVectors()).close(); - } this.owningClient .getChannelCache() .invalidateChannelIfSequencersMatch( diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/Flusher.java b/src/main/java/net/snowflake/ingest/streaming/internal/Flusher.java index fc9d1c858..e5a562f01 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/Flusher.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/Flusher.java @@ -14,8 +14,7 @@ * Interface to convert {@link ChannelData} buffered in {@link RowBuffer} to the underlying format * implementation for faster processing. * - * @param type of column data (Arrow {@link org.apache.arrow.vector.VectorSchemaRoot} or {@link - * ParquetChunkData}) + * @param type of column data ({@link ParquetChunkData}) */ public interface Flusher { /** diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index 9c35a13df..03b1c1762 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -23,7 +23,6 @@ import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; -import org.apache.arrow.memory.BufferAllocator; import org.apache.parquet.hadoop.BdecParquetWriter; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; @@ -59,7 +58,6 @@ public class ParquetRowBuffer extends AbstractRowBuffer { ParquetRowBuffer( OpenChannelRequest.OnErrorOption onErrorOption, ZoneId defaultTimezone, - BufferAllocator allocator, String fullyQualifiedChannelName, Consumer rowSizeMetric, ChannelRuntimeState channelRuntimeState, @@ -68,7 +66,6 @@ public class ParquetRowBuffer extends AbstractRowBuffer { super( onErrorOption, defaultTimezone, - allocator, fullyQualifiedChannelName, rowSizeMetric, channelRuntimeState); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/RegisterService.java b/src/main/java/net/snowflake/ingest/streaming/internal/RegisterService.java index 44b90987b..1e3e63d98 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/RegisterService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/RegisterService.java @@ -25,8 +25,7 @@ * Register one or more blobs to the targeted Snowflake table, it will be done using the dedicated * thread in order to maintain ordering per channel * - * @param type of column data (Arrow {@link org.apache.arrow.vector.VectorSchemaRoot} or {@link - * ParquetChunkData}) + * @param type of column data ({@link ParquetChunkData}) */ class RegisterService { diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/RowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/RowBuffer.java index 5b7e937cd..d7ba6dbd9 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/RowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/RowBuffer.java @@ -12,8 +12,7 @@ * Interface for the buffer in the Streaming Ingest channel that holds the un-flushed rows, these * rows will be converted to the underlying format implementation for faster processing * - * @param type of column data (Arrow {@link org.apache.arrow.vector.VectorSchemaRoot} or {@link - * ParquetChunkData}) + * @param type of column data ({@link ParquetChunkData}) */ interface RowBuffer { /** diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java index ed75796a4..3e442d43b 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java @@ -7,8 +7,6 @@ import java.time.ZoneId; import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.utils.Utils; -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.memory.RootAllocator; /** Builds a Streaming Ingest channel for a specific Streaming Ingest client */ class SnowflakeStreamingIngestChannelFactory { @@ -105,7 +103,6 @@ SnowflakeStreamingIngestChannelInternal build() { Utils.assertNotNull("encryption key_id", this.encryptionKeyId); Utils.assertNotNull("on_error option", this.onErrorOption); Utils.assertNotNull("default timezone", this.defaultTimezone); - BufferAllocator allocator = createBufferAllocator(); return new SnowflakeStreamingIngestChannelInternal<>( this.name, this.dbName, @@ -119,19 +116,7 @@ SnowflakeStreamingIngestChannelInternal build() { this.encryptionKeyId, this.onErrorOption, this.defaultTimezone, - this.owningClient.getParameterProvider().getBlobFormatVersion(), - allocator); - } - - private BufferAllocator createBufferAllocator() { - return owningClient.isTestMode() - ? new RootAllocator() - : owningClient - .getAllocator() - .newChildAllocator( - String.format("%s_%s", name, channelSequencer), - 0, - owningClient.getAllocator().getLimit()); + this.owningClient.getParameterProvider().getBlobFormatVersion()); } } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java index f4c4c6952..136713324 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java @@ -27,14 +27,11 @@ import net.snowflake.ingest.utils.ParameterProvider; import net.snowflake.ingest.utils.SFException; import net.snowflake.ingest.utils.Utils; -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.memory.RootAllocator; /** * The first version of implementation for SnowflakeStreamingIngestChannel * - * @param type of column data (Arrow {@link org.apache.arrow.vector.VectorSchemaRoot} or {@link - * ParquetChunkData}) + * @param type of column data {@link ParquetChunkData}) */ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIngestChannel { @@ -93,8 +90,7 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn encryptionKeyId, onErrorOption, defaultTimezone, - client.getParameterProvider().getBlobFormatVersion(), - new RootAllocator()); + client.getParameterProvider().getBlobFormatVersion()); } /** Default constructor */ @@ -111,8 +107,7 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn Long encryptionKeyId, OpenChannelRequest.OnErrorOption onErrorOption, ZoneId defaultTimezone, - Constants.BdecVersion bdecVersion, - BufferAllocator allocator) { + Constants.BdecVersion bdecVersion) { this.isClosed = false; this.owningClient = client; this.channelFlushContext = @@ -123,7 +118,6 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn AbstractRowBuffer.createRowBuffer( onErrorOption, defaultTimezone, - allocator, bdecVersion, getFullyQualifiedName(), this::collectRowSize, diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 4a87e2bd2..ec396c6d3 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -36,6 +36,7 @@ import com.codahale.metrics.jvm.MemoryUsageGaugeSet; import com.codahale.metrics.jvm.ThreadStatesGaugeSet; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.security.KeyPair; import java.security.NoSuchAlgorithmException; @@ -73,9 +74,6 @@ import net.snowflake.ingest.utils.SFException; import net.snowflake.ingest.utils.SnowflakeURL; import net.snowflake.ingest.utils.Utils; -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.memory.RootAllocator; -import org.apache.arrow.util.VisibleForTesting; /** * The first version of implementation for SnowflakeStreamingIngestClient. The client internally @@ -83,8 +81,7 @@ *
  • the channel cache, which contains all the channels that belong to this account *
  • the flush service, which schedules and coordinates the flush to Snowflake tables * - * @param type of column data (Arrow {@link org.apache.arrow.vector.VectorSchemaRoot} or {@link - * ParquetChunkData}) + * @param type of column data ({@link ParquetChunkData}) */ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStreamingIngestClient { @@ -116,9 +113,6 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea // Reference to the flush service private final FlushService flushService; - // Memory allocator - private final BufferAllocator allocator; - // Indicates whether the client has closed private volatile boolean isClosed; @@ -172,7 +166,6 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea this.isTestMode = isTestMode; this.httpClient = httpClient == null ? HttpUtil.getHttpClient(accountName) : httpClient; this.channelCache = new ChannelCache<>(); - this.allocator = new RootAllocator(); this.isClosed = false; this.requestBuilder = requestBuilder; @@ -632,7 +625,6 @@ public void close() throws Exception { this.requestBuilder.closeResources(); } HttpUtil.shutdownHttpConnectionManagerDaemonThread(); - Utils.closeAllocator(this.allocator); } } @@ -654,15 +646,6 @@ void setNeedFlush() { this.flushService.setNeedFlush(); } - /** - * Get the buffer allocator - * - * @return the buffer allocator - */ - BufferAllocator getAllocator() { - return this.allocator; - } - /** Remove the channel in the channel cache if the channel sequencer matches */ void removeChannelIfSequencersMatch(SnowflakeStreamingIngestChannelInternal channel) { this.channelCache.removeChannelIfSequencersMatch(channel); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStage.java b/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStage.java index 2feed6b81..0d7e3f211 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStage.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStage.java @@ -11,6 +11,7 @@ import static net.snowflake.ingest.utils.Constants.RESPONSE_SUCCESS; import static net.snowflake.ingest.utils.HttpUtil.generateProxyPropertiesForJDBC; +import com.google.common.annotations.VisibleForTesting; import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; @@ -39,7 +40,6 @@ import net.snowflake.ingest.utils.Logging; import net.snowflake.ingest.utils.SFException; import net.snowflake.ingest.utils.Utils; -import org.apache.arrow.util.VisibleForTesting; /** Handles uploading files to the Snowflake Streaming Ingest Stage */ class StreamingIngestStage { diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/TimestampWrapper.java b/src/main/java/net/snowflake/ingest/streaming/internal/TimestampWrapper.java index deb664edd..871cf8c14 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/TimestampWrapper.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/TimestampWrapper.java @@ -12,7 +12,7 @@ /** * This class represents the outcome of timestamp parsing and validation. It contains methods needed - * to serialize timestamps into Arrow and Parquet. + * to serialize timestamps into Parquet. */ public class TimestampWrapper { diff --git a/src/main/java/net/snowflake/ingest/utils/Constants.java b/src/main/java/net/snowflake/ingest/utils/Constants.java index 050f14264..41465304a 100644 --- a/src/main/java/net/snowflake/ingest/utils/Constants.java +++ b/src/main/java/net/snowflake/ingest/utils/Constants.java @@ -61,10 +61,10 @@ public enum WriteMode { REST_API, } - /** The write mode to generate Arrow BDEC file. */ + /** The write mode to generate BDEC file. */ public enum BdecVersion { - /** Uses Arrow to generate BDEC chunks. */ - ONE(1), + // Unused (previously Arrow) + // ONE(1), // Unused (previously Arrow with per column compression. // TWO(2), diff --git a/src/main/java/net/snowflake/ingest/utils/Cryptor.java b/src/main/java/net/snowflake/ingest/utils/Cryptor.java index 74a033fe1..ae7199417 100644 --- a/src/main/java/net/snowflake/ingest/utils/Cryptor.java +++ b/src/main/java/net/snowflake/ingest/utils/Cryptor.java @@ -2,6 +2,7 @@ import static net.snowflake.ingest.utils.Constants.ENCRYPTION_ALGORITHM; +import com.google.common.annotations.VisibleForTesting; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.security.InvalidAlgorithmParameterException; @@ -16,7 +17,6 @@ import javax.crypto.SecretKey; import javax.crypto.spec.IvParameterSpec; import javax.crypto.spec.SecretKeySpec; -import org.apache.arrow.util.VisibleForTesting; public class Cryptor { diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index e2f38f476..7faea2859 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -209,7 +209,7 @@ public boolean hasEnabledSnowpipeStreamingMetrics() { return (boolean) val; } - /** @return Blob format version: 1 (arrow stream write mode), 2 (arrow file write mode) etc */ + /** @return Blob format version */ public Constants.BdecVersion getBlobFormatVersion() { Object val = this.parameterMap.getOrDefault(BLOB_FORMAT_VERSION, BLOB_FORMAT_VERSION_DEFAULT); if (val instanceof Constants.BdecVersion) { diff --git a/src/main/java/net/snowflake/ingest/utils/Utils.java b/src/main/java/net/snowflake/ingest/utils/Utils.java index 1280dbf46..662063a90 100644 --- a/src/main/java/net/snowflake/ingest/utils/Utils.java +++ b/src/main/java/net/snowflake/ingest/utils/Utils.java @@ -33,7 +33,6 @@ import net.snowflake.client.jdbc.internal.org.bouncycastle.openssl.jcajce.JceOpenSSLPKCS8DecryptorProviderBuilder; import net.snowflake.client.jdbc.internal.org.bouncycastle.operator.InputDecryptorProvider; import net.snowflake.client.jdbc.internal.org.bouncycastle.pkcs.PKCS8EncryptedPrivateKeyInfo; -import org.apache.arrow.memory.BufferAllocator; import org.apache.commons.codec.binary.Base64; /** Contains Ingest related utility functions */ @@ -268,14 +267,6 @@ public static boolean isNullOrEmpty(String string) { return string == null || string.isEmpty(); } - /** Release any outstanding memory and then close the buffer allocator */ - public static void closeAllocator(BufferAllocator alloc) { - for (BufferAllocator childAlloc : alloc.getChildAllocators()) { - childAlloc.close(); - } - alloc.close(); - } - /** Util function to show memory usage info and debug memory issue in the SDK */ public static void showMemory() { List pools = ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class); diff --git a/src/main/resources/META-INF/third-party-licenses/REVISED_BSD.txt b/src/main/resources/META-INF/third-party-licenses/REVISED_BSD.txt deleted file mode 100644 index 641d4f45e..000000000 --- a/src/main/resources/META-INF/third-party-licenses/REVISED_BSD.txt +++ /dev/null @@ -1,30 +0,0 @@ -JSch 0.0.* was released under the GNU LGPL license. Later, we have switched -over to a BSD-style license. - ------------------------------------------------------------------------------- -Copyright (c) 2002-2015 Atsuhiko Yamanaka, JCraft,Inc. -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are met: - - 1. Redistributions of source code must retain the above copyright notice, - this list of conditions and the following disclaimer. - - 2. Redistributions in binary form must reproduce the above copyright - notice, this list of conditions and the following disclaimer in - the documentation and/or other materials provided with the distribution. - - 3. The names of the authors may not be used to endorse or promote products - derived from this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, -INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND -FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL JCRAFT, -INC. OR ANY CONTRIBUTORS TO THIS SOFTWARE BE LIABLE FOR ANY DIRECT, INDIRECT, -INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, -OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF -LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING -NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, -EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file diff --git a/src/test/java/net/snowflake/ingest/TestUtils.java b/src/test/java/net/snowflake/ingest/TestUtils.java index 293ece5bf..eede847b8 100644 --- a/src/test/java/net/snowflake/ingest/TestUtils.java +++ b/src/test/java/net/snowflake/ingest/TestUtils.java @@ -150,10 +150,7 @@ private static String getTestProfilePath() { /** @return list of Bdec versions for which to execute IT tests. */ public static Collection getBdecVersionItCases() { - return Arrays.asList( - new Object[][] { - {"Arrow", Constants.BdecVersion.ONE}, {"Parquet", Constants.BdecVersion.THREE} - }); + return Arrays.asList(new Object[][] {{"Parquet", Constants.BdecVersion.THREE}}); } public static String getUser() throws Exception { diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ArrowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ArrowBufferTest.java deleted file mode 100644 index f2cc72a16..000000000 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ArrowBufferTest.java +++ /dev/null @@ -1,444 +0,0 @@ -package net.snowflake.ingest.streaming.internal; - -import static net.snowflake.ingest.streaming.internal.ArrowRowBuffer.DECIMAL_BIT_WIDTH; - -import java.time.ZoneOffset; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import net.snowflake.ingest.streaming.InsertValidationResponse; -import net.snowflake.ingest.streaming.OpenChannelRequest; -import org.apache.arrow.memory.RootAllocator; -import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.types.Types; -import org.apache.arrow.vector.types.pojo.ArrowType; -import org.apache.arrow.vector.types.pojo.Field; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -public class ArrowBufferTest { - private ArrowRowBuffer rowBufferOnErrorContinue; - - @Before - public void setupRowBuffer() { - this.rowBufferOnErrorContinue = createTestBuffer(OpenChannelRequest.OnErrorOption.CONTINUE); - this.rowBufferOnErrorContinue.setupSchema(RowBufferTest.createSchema()); - } - - ArrowRowBuffer createTestBuffer(OpenChannelRequest.OnErrorOption onErrorOption) { - ChannelRuntimeState initialState = new ChannelRuntimeState("0", 0L, true); - return new ArrowRowBuffer( - onErrorOption, ZoneOffset.UTC, new RootAllocator(), "test.buffer", rs -> {}, initialState); - } - - @Test - public void testFieldNumberAfterFlush() { - String offsetToken = "1"; - Map row = new HashMap<>(); - row.put("colTinyInt", (byte) 1); - row.put("\"colTinyInt\"", (byte) 1); - row.put("colSmallInt", (short) 2); - row.put("colInt", 3); - row.put("colBigInt", 4L); - row.put("colDecimal", 1.23); - row.put("colChar", "2"); - - InsertValidationResponse response = - rowBufferOnErrorContinue.insertRows(Collections.singletonList(row), offsetToken); - Assert.assertFalse(response.hasErrors()); - - ChannelData data = - rowBufferOnErrorContinue.flush("my_snowpipe_streaming.bdec"); - Assert.assertEquals(7, data.getVectors().getFieldVectors().size()); - } - - @Test - public void buildFieldFixedSB1() { - // FIXED, SB1 - ColumnMetadata testCol = - ColumnMetadataBuilder.newBuilder() - .logicalType("FIXED") - .physicalType("SB1") - .nullable(true) - .build(); - - Field result = this.rowBufferOnErrorContinue.buildField(testCol); - - Assert.assertEquals("TESTCOL", result.getName()); - Assert.assertEquals(result.getFieldType().getType(), Types.MinorType.TINYINT.getType()); - Assert.assertEquals(result.getFieldType().getMetadata().get("physicalType"), "SB1"); - Assert.assertEquals(result.getFieldType().getMetadata().get("scale"), "0"); - Assert.assertEquals(result.getFieldType().getMetadata().get("logicalType"), "FIXED"); - Assert.assertEquals(result.getFieldType().getMetadata().get("nullable"), "true"); - Assert.assertTrue(result.getFieldType().isNullable()); - Assert.assertEquals(result.getChildren().size(), 0); - } - - @Test - public void buildFieldFixedSB2() { - ColumnMetadata testCol = - ColumnMetadataBuilder.newBuilder() - .logicalType("FIXED") - .physicalType("SB2") - .nullable(false) - .build(); - - Field result = this.rowBufferOnErrorContinue.buildField(testCol); - - Assert.assertEquals("TESTCOL", result.getName()); - Assert.assertEquals(result.getFieldType().getType(), Types.MinorType.SMALLINT.getType()); - Assert.assertEquals(result.getFieldType().getMetadata().get("physicalType"), "SB2"); - Assert.assertEquals(result.getFieldType().getMetadata().get("scale"), "0"); - Assert.assertEquals(result.getFieldType().getMetadata().get("logicalType"), "FIXED"); - Assert.assertEquals(result.getFieldType().getMetadata().get("nullable"), "false"); - Assert.assertFalse(result.getFieldType().isNullable()); - Assert.assertEquals(result.getChildren().size(), 0); - } - - @Test - public void buildFieldFixedSB4() { - ColumnMetadata testCol = - ColumnMetadataBuilder.newBuilder() - .logicalType("FIXED") - .physicalType("SB4") - .nullable(true) - .build(); - - Field result = this.rowBufferOnErrorContinue.buildField(testCol); - - Assert.assertEquals("TESTCOL", result.getName()); - Assert.assertEquals(result.getFieldType().getType(), Types.MinorType.INT.getType()); - Assert.assertEquals(result.getFieldType().getMetadata().get("physicalType"), "SB4"); - Assert.assertEquals(result.getFieldType().getMetadata().get("scale"), "0"); - Assert.assertEquals(result.getFieldType().getMetadata().get("logicalType"), "FIXED"); - Assert.assertTrue(result.getFieldType().isNullable()); - Assert.assertEquals(result.getChildren().size(), 0); - } - - @Test - public void buildFieldFixedSB8() { - ColumnMetadata testCol = - ColumnMetadataBuilder.newBuilder() - .logicalType("FIXED") - .physicalType("SB8") - .nullable(true) - .build(); - - Field result = this.rowBufferOnErrorContinue.buildField(testCol); - - Assert.assertEquals("TESTCOL", result.getName()); - Assert.assertEquals(result.getFieldType().getType(), Types.MinorType.BIGINT.getType()); - Assert.assertEquals(result.getFieldType().getMetadata().get("physicalType"), "SB8"); - Assert.assertEquals(result.getFieldType().getMetadata().get("scale"), "0"); - Assert.assertEquals(result.getFieldType().getMetadata().get("logicalType"), "FIXED"); - Assert.assertTrue(result.getFieldType().isNullable()); - Assert.assertEquals(result.getChildren().size(), 0); - } - - @Test - public void buildFieldFixedSB16() { - ColumnMetadata testCol = - ColumnMetadataBuilder.newBuilder() - .logicalType("FIXED") - .physicalType("SB16") - .nullable(true) - .build(); - - Field result = this.rowBufferOnErrorContinue.buildField(testCol); - - ArrowType expectedType = - new ArrowType.Decimal(testCol.getPrecision(), testCol.getScale(), DECIMAL_BIT_WIDTH); - - Assert.assertEquals("TESTCOL", result.getName()); - Assert.assertEquals(result.getFieldType().getType(), expectedType); - Assert.assertEquals(result.getFieldType().getMetadata().get("physicalType"), "SB16"); - Assert.assertEquals(result.getFieldType().getMetadata().get("scale"), "0"); - Assert.assertEquals(result.getFieldType().getMetadata().get("logicalType"), "FIXED"); - Assert.assertTrue(result.getFieldType().isNullable()); - Assert.assertEquals(result.getChildren().size(), 0); - } - - @Test - public void buildFieldLobVariant() { - ColumnMetadata testCol = - ColumnMetadataBuilder.newBuilder() - .logicalType("VARIANT") - .physicalType("LOB") - .nullable(true) - .build(); - - Field result = this.rowBufferOnErrorContinue.buildField(testCol); - - Assert.assertEquals("TESTCOL", result.getName()); - Assert.assertEquals(result.getFieldType().getType(), Types.MinorType.VARCHAR.getType()); - Assert.assertEquals(result.getFieldType().getMetadata().get("physicalType"), "LOB"); - Assert.assertEquals(result.getFieldType().getMetadata().get("scale"), "0"); - Assert.assertEquals(result.getFieldType().getMetadata().get("logicalType"), "VARIANT"); - Assert.assertTrue(result.getFieldType().isNullable()); - Assert.assertEquals(result.getChildren().size(), 0); - } - - @Test - public void buildFieldTimestampNtzSB8() { - ColumnMetadata testCol = - ColumnMetadataBuilder.newBuilder() - .logicalType("TIMESTAMP_NTZ") - .physicalType("SB8") - .nullable(true) - .build(); - - Field result = this.rowBufferOnErrorContinue.buildField(testCol); - - Assert.assertEquals("TESTCOL", result.getName()); - Assert.assertEquals(result.getFieldType().getType(), Types.MinorType.BIGINT.getType()); - Assert.assertEquals(result.getFieldType().getMetadata().get("physicalType"), "SB8"); - Assert.assertEquals(result.getFieldType().getMetadata().get("scale"), "0"); - Assert.assertEquals(result.getFieldType().getMetadata().get("logicalType"), "TIMESTAMP_NTZ"); - Assert.assertTrue(result.getFieldType().isNullable()); - Assert.assertEquals(result.getChildren().size(), 0); - } - - @Test - public void buildFieldTimestampNtzSB16() { - ColumnMetadata testCol = - ColumnMetadataBuilder.newBuilder() - .logicalType("TIMESTAMP_NTZ") - .physicalType("SB16") - .nullable(true) - .build(); - - Field result = this.rowBufferOnErrorContinue.buildField(testCol); - - Assert.assertEquals("TESTCOL", result.getName()); - Assert.assertEquals(result.getFieldType().getType(), Types.MinorType.STRUCT.getType()); - Assert.assertEquals(result.getFieldType().getMetadata().get("physicalType"), "SB16"); - Assert.assertEquals(result.getFieldType().getMetadata().get("scale"), "0"); - Assert.assertEquals(result.getFieldType().getMetadata().get("logicalType"), "TIMESTAMP_NTZ"); - Assert.assertTrue(result.getFieldType().isNullable()); - Assert.assertEquals(result.getChildren().size(), 2); - Assert.assertEquals( - result.getChildren().get(0).getFieldType().getType(), Types.MinorType.BIGINT.getType()); - Assert.assertEquals( - result.getChildren().get(1).getFieldType().getType(), Types.MinorType.INT.getType()); - } - - @Test - public void buildFieldTimestampTzSB8() { - ColumnMetadata testCol = - ColumnMetadataBuilder.newBuilder() - .logicalType("TIMESTAMP_TZ") - .physicalType("SB8") - .nullable(true) - .build(); - - Field result = this.rowBufferOnErrorContinue.buildField(testCol); - - Assert.assertEquals("TESTCOL", result.getName()); - Assert.assertEquals(result.getFieldType().getType(), Types.MinorType.STRUCT.getType()); - Assert.assertEquals(result.getFieldType().getMetadata().get("physicalType"), "SB8"); - Assert.assertEquals(result.getFieldType().getMetadata().get("scale"), "0"); - Assert.assertEquals(result.getFieldType().getMetadata().get("logicalType"), "TIMESTAMP_TZ"); - Assert.assertTrue(result.getFieldType().isNullable()); - Assert.assertEquals(result.getChildren().size(), 2); - Assert.assertEquals( - result.getChildren().get(0).getFieldType().getType(), Types.MinorType.BIGINT.getType()); - Assert.assertEquals( - result.getChildren().get(1).getFieldType().getType(), Types.MinorType.INT.getType()); - } - - @Test - public void buildFieldTimestampTzSB16() { - ColumnMetadata testCol = - ColumnMetadataBuilder.newBuilder() - .logicalType("TIMESTAMP_TZ") - .physicalType("SB16") - .nullable(true) - .build(); - - Field result = this.rowBufferOnErrorContinue.buildField(testCol); - - Assert.assertEquals("TESTCOL", result.getName()); - Assert.assertEquals(result.getFieldType().getType(), Types.MinorType.STRUCT.getType()); - Assert.assertEquals(result.getFieldType().getMetadata().get("physicalType"), "SB16"); - Assert.assertEquals(result.getFieldType().getMetadata().get("scale"), "0"); - Assert.assertEquals(result.getFieldType().getMetadata().get("logicalType"), "TIMESTAMP_TZ"); - Assert.assertTrue(result.getFieldType().isNullable()); - Assert.assertEquals(result.getChildren().size(), 3); - Assert.assertEquals( - result.getChildren().get(0).getFieldType().getType(), Types.MinorType.BIGINT.getType()); - Assert.assertEquals( - result.getChildren().get(1).getFieldType().getType(), Types.MinorType.INT.getType()); - Assert.assertEquals( - result.getChildren().get(2).getFieldType().getType(), Types.MinorType.INT.getType()); - } - - @Test - public void buildFieldTimestampDate() { - ColumnMetadata testCol = - ColumnMetadataBuilder.newBuilder() - .logicalType("DATE") - .physicalType("SB8") - .nullable(true) - .build(); - - Field result = this.rowBufferOnErrorContinue.buildField(testCol); - - Assert.assertEquals("TESTCOL", result.getName()); - Assert.assertEquals(result.getFieldType().getType(), Types.MinorType.DATEDAY.getType()); - Assert.assertEquals(result.getFieldType().getMetadata().get("physicalType"), "SB8"); - Assert.assertEquals(result.getFieldType().getMetadata().get("scale"), "0"); - Assert.assertEquals(result.getFieldType().getMetadata().get("logicalType"), "DATE"); - Assert.assertTrue(result.getFieldType().isNullable()); - Assert.assertEquals(result.getChildren().size(), 0); - } - - @Test - public void buildFieldTimeSB4() { - ColumnMetadata testCol = - ColumnMetadataBuilder.newBuilder() - .logicalType("TIME") - .physicalType("SB4") - .nullable(true) - .build(); - - Field result = this.rowBufferOnErrorContinue.buildField(testCol); - - Assert.assertEquals("TESTCOL", result.getName()); - Assert.assertEquals(result.getFieldType().getType(), Types.MinorType.INT.getType()); - Assert.assertEquals(result.getFieldType().getMetadata().get("physicalType"), "SB4"); - Assert.assertEquals(result.getFieldType().getMetadata().get("scale"), "0"); - Assert.assertEquals(result.getFieldType().getMetadata().get("logicalType"), "TIME"); - Assert.assertTrue(result.getFieldType().isNullable()); - Assert.assertEquals(result.getChildren().size(), 0); - } - - @Test - public void buildFieldTimeSB8() { - ColumnMetadata testCol = - ColumnMetadataBuilder.newBuilder() - .logicalType("TIME") - .physicalType("SB8") - .nullable(true) - .build(); - - Field result = this.rowBufferOnErrorContinue.buildField(testCol); - - Assert.assertEquals("TESTCOL", result.getName()); - Assert.assertEquals(result.getFieldType().getType(), Types.MinorType.BIGINT.getType()); - Assert.assertEquals(result.getFieldType().getMetadata().get("physicalType"), "SB8"); - Assert.assertEquals(result.getFieldType().getMetadata().get("scale"), "0"); - Assert.assertEquals(result.getFieldType().getMetadata().get("logicalType"), "TIME"); - Assert.assertTrue(result.getFieldType().isNullable()); - Assert.assertEquals(result.getChildren().size(), 0); - } - - @Test - public void buildFieldBoolean() { - ColumnMetadata testCol = - ColumnMetadataBuilder.newBuilder() - .logicalType("BOOLEAN") - .physicalType("BINARY") - .nullable(true) - .build(); - - Field result = this.rowBufferOnErrorContinue.buildField(testCol); - - Assert.assertEquals("TESTCOL", result.getName()); - Assert.assertEquals(result.getFieldType().getType(), Types.MinorType.BIT.getType()); - Assert.assertEquals(result.getFieldType().getMetadata().get("physicalType"), "BINARY"); - Assert.assertEquals(result.getFieldType().getMetadata().get("scale"), "0"); - Assert.assertEquals(result.getFieldType().getMetadata().get("logicalType"), "BOOLEAN"); - Assert.assertTrue(result.getFieldType().isNullable()); - Assert.assertEquals(result.getChildren().size(), 0); - } - - @Test - public void buildFieldRealSB16() { - ColumnMetadata testCol = - ColumnMetadataBuilder.newBuilder() - .logicalType("REAL") - .physicalType("SB16") - .nullable(true) - .build(); - - Field result = this.rowBufferOnErrorContinue.buildField(testCol); - - Assert.assertEquals("TESTCOL", result.getName()); - Assert.assertEquals(result.getFieldType().getType(), Types.MinorType.FLOAT8.getType()); - Assert.assertEquals(result.getFieldType().getMetadata().get("physicalType"), "SB16"); - Assert.assertEquals(result.getFieldType().getMetadata().get("scale"), "0"); - Assert.assertEquals(result.getFieldType().getMetadata().get("logicalType"), "REAL"); - Assert.assertTrue(result.getFieldType().isNullable()); - Assert.assertEquals(result.getChildren().size(), 0); - } - - @Test - public void testArrowE2ETimestampLTZ() { - testArrowE2ETimestampLTZHelper(OpenChannelRequest.OnErrorOption.ABORT); - testArrowE2ETimestampLTZHelper(OpenChannelRequest.OnErrorOption.CONTINUE); - } - - private void testArrowE2ETimestampLTZHelper(OpenChannelRequest.OnErrorOption onErrorOption) { - ArrowRowBuffer innerBuffer = createTestBuffer(onErrorOption); - - ColumnMetadata colTimestampLtzSB8 = new ColumnMetadata(); - colTimestampLtzSB8.setName("COLTIMESTAMPLTZ_SB8"); - colTimestampLtzSB8.setPhysicalType("SB8"); - colTimestampLtzSB8.setNullable(false); - colTimestampLtzSB8.setLogicalType("TIMESTAMP_LTZ"); - colTimestampLtzSB8.setScale(0); - - ColumnMetadata colTimestampLtzSB16 = new ColumnMetadata(); - colTimestampLtzSB16.setName("COLTIMESTAMPLTZ_SB16"); - colTimestampLtzSB16.setPhysicalType("SB16"); - colTimestampLtzSB16.setNullable(false); - colTimestampLtzSB16.setLogicalType("TIMESTAMP_LTZ"); - colTimestampLtzSB16.setScale(9); - - innerBuffer.setupSchema(Arrays.asList(colTimestampLtzSB8, colTimestampLtzSB16)); - - Map row = new HashMap<>(); - row.put("COLTIMESTAMPLTZ_SB8", "1621899220"); - row.put("COLTIMESTAMPLTZ_SB16", "1621899220123456789"); - - InsertValidationResponse response = - innerBuffer.insertRows(Collections.singletonList(row), null); - Assert.assertFalse(response.hasErrors()); - Assert.assertEquals( - 1621899220L, innerBuffer.vectorsRoot.getVector("COLTIMESTAMPLTZ_SB8").getObject(0)); - Assert.assertEquals( - "epoch", - innerBuffer - .vectorsRoot - .getVector("COLTIMESTAMPLTZ_SB16") - .getChildrenFromFields() - .get(0) - .getName()); - Assert.assertEquals( - 1621899220L, - innerBuffer - .vectorsRoot - .getVector("COLTIMESTAMPLTZ_SB16") - .getChildrenFromFields() - .get(0) - .getObject(0)); - Assert.assertEquals( - "fraction", - innerBuffer - .vectorsRoot - .getVector("COLTIMESTAMPLTZ_SB16") - .getChildrenFromFields() - .get(1) - .getName()); - Assert.assertEquals( - 123456789, - innerBuffer - .vectorsRoot - .getVector("COLTIMESTAMPLTZ_SB16") - .getChildrenFromFields() - .get(1) - .getObject(0)); - } -} diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index 12e77ab7b..5d3e62ef2 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -46,9 +46,6 @@ import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.ParameterProvider; import net.snowflake.ingest.utils.SFException; -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.memory.RootAllocator; -import org.apache.arrow.vector.VectorSchemaRoot; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -62,8 +59,7 @@ public class FlushServiceTest { @Parameterized.Parameters(name = "{0}") public static Collection testContextFactory() { - return Arrays.asList( - new Object[][] {{ArrowTestContext.createFactory()}, {ParquetTestContext.createFactory()}}); + return Arrays.asList(new Object[][] {{ParquetTestContext.createFactory()}}); } public FlushServiceTest(TestContextFactory testContextFactory) { @@ -244,58 +240,6 @@ static RowSetBuilder newBuilder() { } } - private static class ArrowTestContext extends TestContext { - private final BufferAllocator allocator = new RootAllocator(); - - SnowflakeStreamingIngestChannelInternal createChannel( - String name, - String dbName, - String schemaName, - String tableName, - String offsetToken, - Long channelSequencer, - Long rowSequencer, - String encryptionKey, - Long encryptionKeyId, - OpenChannelRequest.OnErrorOption onErrorOption, - ZoneId defaultTimezone) { - return new SnowflakeStreamingIngestChannelInternal<>( - name, - dbName, - schemaName, - tableName, - offsetToken, - channelSequencer, - rowSequencer, - client, - encryptionKey, - encryptionKeyId, - onErrorOption, - defaultTimezone, - Constants.BdecVersion.ONE, - allocator); - } - - @Override - public void close() { - try { - // Close allocator to make sure no memory leak - allocator.close(); - } catch (Exception e) { - Assert.fail(String.format("Allocator close failure. Caused by %s", e.getMessage())); - } - } - - static TestContextFactory createFactory() { - return new TestContextFactory("Arrow") { - @Override - TestContext create() { - return new ArrowTestContext(); - } - }; - } - } - private static class ParquetTestContext extends TestContext>> { SnowflakeStreamingIngestChannelInternal>> createChannel( @@ -323,8 +267,7 @@ SnowflakeStreamingIngestChannelInternal>> createChannel( encryptionKeyId, onErrorOption, defaultTimezone, - Constants.BdecVersion.THREE, - null); + Constants.BdecVersion.THREE); } @Override diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/OpenManyChannelsIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/OpenManyChannelsIT.java index c3752a660..1934b3f77 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/OpenManyChannelsIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/OpenManyChannelsIT.java @@ -51,7 +51,7 @@ public void setUp() throws Exception { String.format( "create or replace table %s.%s.%s (col int)", databaseName, SCHEMA_NAME, TABLE_NAME)); - Properties props = TestUtils.getProperties(Constants.BdecVersion.ONE); + Properties props = TestUtils.getProperties(Constants.BdecVersion.THREE); if (props.getProperty(ROLE).equals("DEFAULT_ROLE")) { props.setProperty(ROLE, "ACCOUNTADMIN"); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index b6143e6ce..81472f46f 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -20,8 +20,6 @@ import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; -import org.apache.arrow.memory.RootAllocator; -import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.commons.codec.binary.Hex; import org.apache.commons.lang3.NotImplementedException; import org.junit.Assert; @@ -35,10 +33,7 @@ public class RowBufferTest { @Parameterized.Parameters(name = "{0}") public static Collection bdecVersion() { return Arrays.asList( - new Object[][] { - {"Arrow", Constants.BdecVersion.ONE}, - {"Parquet_w/o_optimization", Constants.BdecVersion.THREE} - }); + new Object[][] {{"Parquet_w/o_optimization", Constants.BdecVersion.THREE}}); } private final Constants.BdecVersion bdecVersion; @@ -127,7 +122,6 @@ private AbstractRowBuffer createTestBuffer(OpenChannelRequest.OnErrorOption o return AbstractRowBuffer.createRowBuffer( onErrorOption, UTC, - new RootAllocator(), bdecVersion, "test.buffer", rs -> {}, @@ -1564,20 +1558,6 @@ public void testOnErrorAbortRowsWithError() { SFException.class, () -> innerBufferOnErrorAbort.insertRows(mixedRows, "3")); switch (bdecVersion) { - case ONE: - VectorSchemaRoot snapshotContinueArrow = - ((VectorSchemaRoot) innerBufferOnErrorContinue.getSnapshot("fake/filePath").get()); - // validRows and only the good row from mixedRows are in the buffer - Assert.assertEquals(2, snapshotContinueArrow.getRowCount()); - Assert.assertEquals("[a, b]", snapshotContinueArrow.getVector(0).toString()); - - VectorSchemaRoot snapshotAbortArrow = - ((VectorSchemaRoot) innerBufferOnErrorAbort.getSnapshot("fake/filePath").get()); - // only validRows and none of the mixedRows are in the buffer - Assert.assertEquals(1, snapshotAbortArrow.getRowCount()); - Assert.assertEquals("[a]", snapshotAbortArrow.getVector(0).toString()); - break; - case THREE: List> snapshotContinueParquet = ((ParquetChunkData) innerBufferOnErrorContinue.getSnapshot("fake/filePath").get()).rows; @@ -1595,10 +1575,5 @@ public void testOnErrorAbortRowsWithError() { default: throw new NotImplementedException("Unsupported version!"); } - if (bdecVersion == Constants.BdecVersion.THREE) { - - } else if (bdecVersion == Constants.BdecVersion.ONE) { - - } } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index d34386bfa..23c95d792 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -38,7 +38,6 @@ import net.snowflake.ingest.utils.SFException; import net.snowflake.ingest.utils.SnowflakeURL; import net.snowflake.ingest.utils.Utils; -import org.apache.arrow.vector.VectorSchemaRoot; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; @@ -498,14 +497,7 @@ public void testOpenChannelSuccessResponse() throws Exception { @Test public void testInsertRow() { SnowflakeStreamingIngestClientInternal client; - boolean isArrowDefault = - ParameterProvider.BLOB_FORMAT_VERSION_DEFAULT == Constants.BdecVersion.ONE; - if (isArrowDefault) { - client = new SnowflakeStreamingIngestClientInternal("client_ARROW"); - } else { - client = new SnowflakeStreamingIngestClientInternal("client_PARQUET"); - } - + client = new SnowflakeStreamingIngestClientInternal("client_PARQUET"); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -550,11 +542,7 @@ public void testInsertRow() { data = channel.getData("my_snowpipe_streaming.bdec"); Assert.assertEquals(2, data.getRowCount()); Assert.assertEquals((Long) 1L, data.getRowSequencer()); - Assert.assertEquals( - 1, - isArrowDefault - ? ((ChannelData) data).getVectors().getFieldVectors().size() - : ((ChannelData) data).getVectors().rows.get(0).size()); + Assert.assertEquals(1, ((ChannelData) data).getVectors().rows.get(0).size()); Assert.assertEquals("2", data.getOffsetToken()); Assert.assertTrue(data.getBufferSize() > 0); Assert.assertTrue(insertStartTimeInMs <= data.getMinMaxInsertTimeInMs().getFirst()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index 3b50c8354..244457ccf 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -59,7 +59,6 @@ import net.snowflake.ingest.utils.SFException; import net.snowflake.ingest.utils.SnowflakeURL; import net.snowflake.ingest.utils.Utils; -import org.apache.arrow.memory.RootAllocator; import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; @@ -69,6 +68,8 @@ public class SnowflakeStreamingIngestClientTest { private static final ObjectMapper objectMapper = new ObjectMapper(); + private static final Constants.BdecVersion BDEC_VERSION = Constants.BdecVersion.THREE; + SnowflakeStreamingIngestChannelInternal channel1; SnowflakeStreamingIngestChannelInternal channel2; SnowflakeStreamingIngestChannelInternal channel3; @@ -92,8 +93,7 @@ public void setup() { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - Constants.BdecVersion.ONE, - new RootAllocator()); + BDEC_VERSION); channel2 = new SnowflakeStreamingIngestChannelInternal<>( "channel2", @@ -108,8 +108,7 @@ public void setup() { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - Constants.BdecVersion.ONE, - new RootAllocator()); + BDEC_VERSION); channel3 = new SnowflakeStreamingIngestChannelInternal<>( "channel3", @@ -124,8 +123,7 @@ public void setup() { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - Constants.BdecVersion.ONE, - new RootAllocator()); + BDEC_VERSION); channel4 = new SnowflakeStreamingIngestChannelInternal<>( "channel4", @@ -140,8 +138,7 @@ public void setup() { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - Constants.BdecVersion.ONE, - new RootAllocator()); + BDEC_VERSION); } @Test @@ -358,8 +355,7 @@ public void testGetChannelsStatusWithRequest() throws Exception { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - Constants.BdecVersion.ONE, - new RootAllocator()); + BDEC_VERSION); ChannelsStatusRequest.ChannelStatusRequestDTO dto = new ChannelsStatusRequest.ChannelStatusRequestDTO(channel); @@ -418,8 +414,7 @@ public void testGetChannelsStatusWithRequestError() throws Exception { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - Constants.BdecVersion.ONE, - new RootAllocator()); + BDEC_VERSION); try { client.getChannelsStatus(Collections.singletonList(channel)); @@ -460,8 +455,7 @@ public void testRegisterBlobRequestCreationSuccess() throws Exception { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - Constants.BdecVersion.ONE, - new RootAllocator()); + BDEC_VERSION); ChannelMetadata channelMetadata = ChannelMetadata.builder() diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java index 5ef3951c1..96792ee0f 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java @@ -180,7 +180,7 @@ protected void expectNumberOutOfRangeError( maxPowerOf10Exclusive, maxPowerOf10Exclusive)))); } - protected void expectArrowNotSupported(String dataType, T value) throws Exception { + protected void expectNotSupported(String dataType, T value) throws Exception { expectError( dataType, value, diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/DateTimeIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/DateTimeIT.java index 8a8d9bafc..3b0133ff1 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/DateTimeIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/DateTimeIT.java @@ -101,7 +101,7 @@ public void testTimestampWithTimeZone() throws Exception { "2024-02-29 23:59:59.999999999 Z", new StringProvider(), new StringProvider()); - expectArrowNotSupported("TIMESTAMP_TZ", "2023-02-29T23:59:59.999999999"); + expectNotSupported("TIMESTAMP_TZ", "2023-02-29T23:59:59.999999999"); // Numeric strings testJdbcTypeCompatibility( @@ -345,7 +345,7 @@ public void testTimestampWithLocalTimeZone() throws Exception { "2024-02-29 15:59:59.999999999 -0800", new StringProvider(), new StringProvider()); - expectArrowNotSupported("TIMESTAMP_LTZ", "2023-02-29T23:59:59.999999999"); + expectNotSupported("TIMESTAMP_LTZ", "2023-02-29T23:59:59.999999999"); // Numeric strings testJdbcTypeCompatibility( @@ -587,7 +587,7 @@ public void testTimestampWithoutTimeZone() throws Exception { "2024-02-29 23:59:59.999999999 Z", new StringProvider(), new StringProvider()); - expectArrowNotSupported("TIMESTAMP_NTZ", "2023-02-29T23:59:59.999999999"); + expectNotSupported("TIMESTAMP_NTZ", "2023-02-29T23:59:59.999999999"); // Numeric strings testJdbcTypeCompatibility( @@ -927,7 +927,7 @@ public void testDate() throws Exception { "2024-02-29", new StringProvider(), new StringProvider()); - expectArrowNotSupported("DATE", "2023-02-29T23:59:59.999999999"); + expectNotSupported("DATE", "2023-02-29T23:59:59.999999999"); // Test numeric strings testJdbcTypeCompatibility( diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/StringsIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/StringsIT.java index cfa59e0b1..c767f6069 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/StringsIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/StringsIT.java @@ -36,7 +36,7 @@ public void testStrings() throws Exception { testJdbcTypeCompatibility("CHAR(5)", true, "true", new BooleanProvider(), new StringProvider()); testJdbcTypeCompatibility( "CHAR(5)", false, "false", new BooleanProvider(), new StringProvider()); - expectArrowNotSupported("CHAR(4)", false); + expectNotSupported("CHAR(4)", false); // test numbers testJdbcTypeCompatibility( @@ -95,23 +95,23 @@ public void testMaxAllowedString() throws Exception { // 1-byte chars String maxString = buildString("a", MB_16); testIngestion("VARCHAR", maxString, new StringProvider()); - expectArrowNotSupported("VARCHAR", maxString + "a"); + expectNotSupported("VARCHAR", maxString + "a"); // 2-byte chars maxString = buildString("ลก", MB_16 / 2); testIngestion("VARCHAR", maxString, new StringProvider()); - expectArrowNotSupported("VARCHAR", maxString + "a"); + expectNotSupported("VARCHAR", maxString + "a"); // 3-byte chars maxString = buildString("โ„", MB_16 / 3); testIngestion("VARCHAR", maxString, new StringProvider()); - expectArrowNotSupported("VARCHAR", maxString + "aa"); + expectNotSupported("VARCHAR", maxString + "aa"); // 4-byte chars maxString = buildString("๐Ÿž", MB_16 / 4); testIngestion("VARCHAR", maxString, new StringProvider()); - expectArrowNotSupported("VARCHAR", maxString + "a"); + expectNotSupported("VARCHAR", maxString + "a"); } @Test From 063bcfc90a801b5222aae69599104f76a9093c22 Mon Sep 17 00:00:00 2001 From: Lukas Sembera Date: Tue, 30 May 2023 09:45:40 +0000 Subject: [PATCH 2/3] Code review comments #1 --- .../net/snowflake/ingest/utils/Constants.java | 6 -- .../streaming/internal/RowBufferTest.java | 62 ++++++------------- 2 files changed, 20 insertions(+), 48 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/utils/Constants.java b/src/main/java/net/snowflake/ingest/utils/Constants.java index 41465304a..00313a874 100644 --- a/src/main/java/net/snowflake/ingest/utils/Constants.java +++ b/src/main/java/net/snowflake/ingest/utils/Constants.java @@ -63,12 +63,6 @@ public enum WriteMode { /** The write mode to generate BDEC file. */ public enum BdecVersion { - // Unused (previously Arrow) - // ONE(1), - - // Unused (previously Arrow with per column compression. - // TWO(2), - /** * Uses Parquet to generate BDEC chunks with {@link * net.snowflake.ingest.streaming.internal.ParquetRowBuffer} (page-level compression). This diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index 81472f46f..b6e035c90 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -8,7 +8,6 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; @@ -21,28 +20,17 @@ import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; import org.apache.commons.codec.binary.Hex; -import org.apache.commons.lang3.NotImplementedException; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -@RunWith(Parameterized.class) public class RowBufferTest { - @Parameterized.Parameters(name = "{0}") - public static Collection bdecVersion() { - return Arrays.asList( - new Object[][] {{"Parquet_w/o_optimization", Constants.BdecVersion.THREE}}); - } - private final Constants.BdecVersion bdecVersion; private final boolean enableParquetMemoryOptimization; private AbstractRowBuffer rowBufferOnErrorContinue; private AbstractRowBuffer rowBufferOnErrorAbort; - public RowBufferTest(@SuppressWarnings("unused") String name, Constants.BdecVersion bdecVersion) { - this.bdecVersion = bdecVersion; + public RowBufferTest() { this.enableParquetMemoryOptimization = false; } @@ -122,7 +110,7 @@ private AbstractRowBuffer createTestBuffer(OpenChannelRequest.OnErrorOption o return AbstractRowBuffer.createRowBuffer( onErrorOption, UTC, - bdecVersion, + Constants.BdecVersion.THREE, "test.buffer", rs -> {}, initialState, @@ -487,12 +475,10 @@ private void testFlushHelper(AbstractRowBuffer rowBuffer) { Assert.assertEquals(offsetToken, data.getOffsetToken()); Assert.assertEquals(bufferSize, data.getBufferSize(), 0); - if (bdecVersion == Constants.BdecVersion.THREE) { - final ParquetChunkData chunkData = (ParquetChunkData) data.getVectors(); - Assert.assertEquals( - StreamingIngestUtils.getShortname(filename), - chunkData.metadata.get(Constants.PRIMARY_FILE_ID_KEY)); - } + final ParquetChunkData chunkData = (ParquetChunkData) data.getVectors(); + Assert.assertEquals( + StreamingIngestUtils.getShortname(filename), + chunkData.metadata.get(Constants.PRIMARY_FILE_ID_KEY)); } @Test @@ -759,10 +745,8 @@ private void testStatsE2EHelper(AbstractRowBuffer rowBuffer) { Assert.assertEquals(0, columnEpStats.get("COLCHAR").getCurrentNullCount()); Assert.assertEquals(-1, columnEpStats.get("COLCHAR").getDistinctValues()); - if (bdecVersion == Constants.BdecVersion.THREE) { - final ParquetChunkData chunkData = (ParquetChunkData) result.getVectors(); - Assert.assertEquals(filename, chunkData.metadata.get(Constants.PRIMARY_FILE_ID_KEY)); - } + final ParquetChunkData chunkData = (ParquetChunkData) result.getVectors(); + Assert.assertEquals(filename, chunkData.metadata.get(Constants.PRIMARY_FILE_ID_KEY)); // Confirm we reset ChannelData resetResults = rowBuffer.flush("my_snowpipe_streaming.bdec"); @@ -1557,23 +1541,17 @@ public void testOnErrorAbortRowsWithError() { Assert.assertThrows( SFException.class, () -> innerBufferOnErrorAbort.insertRows(mixedRows, "3")); - switch (bdecVersion) { - case THREE: - List> snapshotContinueParquet = - ((ParquetChunkData) innerBufferOnErrorContinue.getSnapshot("fake/filePath").get()).rows; - // validRows and only the good row from mixedRows are in the buffer - Assert.assertEquals(2, snapshotContinueParquet.size()); - Assert.assertEquals(Arrays.asList("a"), snapshotContinueParquet.get(0)); - Assert.assertEquals(Arrays.asList("b"), snapshotContinueParquet.get(1)); - - List> snapshotAbortParquet = - ((ParquetChunkData) innerBufferOnErrorAbort.getSnapshot("fake/filePath").get()).rows; - // only validRows and none of the mixedRows are in the buffer - Assert.assertEquals(1, snapshotAbortParquet.size()); - Assert.assertEquals(Arrays.asList("a"), snapshotAbortParquet.get(0)); - break; - default: - throw new NotImplementedException("Unsupported version!"); - } + List> snapshotContinueParquet = + ((ParquetChunkData) innerBufferOnErrorContinue.getSnapshot("fake/filePath").get()).rows; + // validRows and only the good row from mixedRows are in the buffer + Assert.assertEquals(2, snapshotContinueParquet.size()); + Assert.assertEquals(Arrays.asList("a"), snapshotContinueParquet.get(0)); + Assert.assertEquals(Arrays.asList("b"), snapshotContinueParquet.get(1)); + + List> snapshotAbortParquet = + ((ParquetChunkData) innerBufferOnErrorAbort.getSnapshot("fake/filePath").get()).rows; + // only validRows and none of the mixedRows are in the buffer + Assert.assertEquals(1, snapshotAbortParquet.size()); + Assert.assertEquals(Arrays.asList("a"), snapshotAbortParquet.get(0)); } } From 446e9aea966fcc5038ceae7841e1d09b762e1635 Mon Sep 17 00:00:00 2001 From: Lukas Sembera Date: Wed, 31 May 2023 09:30:30 +0000 Subject: [PATCH 3/3] Code review fixes --- .../streaming/internal/BlobBuilder.java | 19 ++++++++----------- .../net/snowflake/ingest/utils/Constants.java | 6 ++++++ .../java/net/snowflake/ingest/TestUtils.java | 7 ------- .../streaming/internal/FlushServiceTest.java | 13 ++----------- .../internal/StreamingIngestBigFilesIT.java | 17 +---------------- .../streaming/internal/StreamingIngestIT.java | 18 +----------------- .../datatypes/AbstractDataTypeTest.java | 18 +----------------- .../internal/datatypes/BinaryIT.java | 5 ----- .../internal/datatypes/DateTimeIT.java | 5 ----- .../internal/datatypes/LogicalTypesIT.java | 5 ----- .../streaming/internal/datatypes/NullIT.java | 5 ----- .../internal/datatypes/NumericTypesIT.java | 5 ----- .../internal/datatypes/SemiStructuredIT.java | 5 ----- .../internal/datatypes/StringsIT.java | 5 ----- .../streaming/internal/it/ColumnNamesIT.java | 5 ----- 15 files changed, 19 insertions(+), 119 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java index 6ab0ce18b..9442b2774 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java @@ -86,8 +86,8 @@ static Blob constructBlobAndMetadata( ByteArrayOutputStream chunkData = serializedChunk.chunkData; Pair paddedChunk = padChunk(chunkData, Constants.ENCRYPTION_ALGORITHM_BLOCK_SIZE_BYTES); - byte[] compressedAndPaddedChunkData = paddedChunk.getFirst(); - int compressedChunkLength = paddedChunk.getSecond(); + byte[] paddedChunkData = paddedChunk.getFirst(); + int paddedChunkLength = paddedChunk.getSecond(); // Encrypt the compressed chunk data, the encryption key is derived using the key from // server with the full blob path. @@ -97,13 +97,10 @@ static Blob constructBlobAndMetadata( long iv = curDataSize / Constants.ENCRYPTION_ALGORITHM_BLOCK_SIZE_BYTES; byte[] encryptedCompressedChunkData = Cryptor.encrypt( - compressedAndPaddedChunkData, - firstChannelFlushContext.getEncryptionKey(), - filePath, - iv); + paddedChunkData, firstChannelFlushContext.getEncryptionKey(), filePath, iv); // Compute the md5 of the chunk data - String md5 = computeMD5(encryptedCompressedChunkData, compressedChunkLength); + String md5 = computeMD5(encryptedCompressedChunkData, paddedChunkLength); int encryptedCompressedChunkDataSize = encryptedCompressedChunkData.length; // Create chunk metadata @@ -114,9 +111,9 @@ static Blob constructBlobAndMetadata( // The start offset will be updated later in BlobBuilder#build to include the blob // header .setChunkStartOffset(startOffset) - // The compressedChunkLength is used because it is the actual data size used for + // The paddedChunkLength is used because it is the actual data size used for // decompression and md5 calculation on server side. - .setChunkLength(compressedChunkLength) + .setChunkLength(paddedChunkLength) .setChannelList(serializedChunk.channelsMetadataList) .setChunkMD5(md5) .setEncryptionKeyId(firstChannelFlushContext.getEncryptionKeyId()) @@ -135,14 +132,14 @@ static Blob constructBlobAndMetadata( logger.logInfo( "Finish building chunk in blob={}, table={}, rowCount={}, startOffset={}," - + " uncompressedSize={}, compressedChunkLength={}, encryptedCompressedSize={}," + + " uncompressedSize={}, paddedChunkLength={}, encryptedCompressedSize={}," + " bdecVersion={}", filePath, firstChannelFlushContext.getFullyQualifiedTableName(), serializedChunk.rowCount, startOffset, serializedChunk.chunkUncompressedSize, - compressedChunkLength, + paddedChunkLength, encryptedCompressedChunkDataSize, bdecVersion); } diff --git a/src/main/java/net/snowflake/ingest/utils/Constants.java b/src/main/java/net/snowflake/ingest/utils/Constants.java index 00313a874..41465304a 100644 --- a/src/main/java/net/snowflake/ingest/utils/Constants.java +++ b/src/main/java/net/snowflake/ingest/utils/Constants.java @@ -63,6 +63,12 @@ public enum WriteMode { /** The write mode to generate BDEC file. */ public enum BdecVersion { + // Unused (previously Arrow) + // ONE(1), + + // Unused (previously Arrow with per column compression. + // TWO(2), + /** * Uses Parquet to generate BDEC chunks with {@link * net.snowflake.ingest.streaming.internal.ParquetRowBuffer} (page-level compression). This diff --git a/src/test/java/net/snowflake/ingest/TestUtils.java b/src/test/java/net/snowflake/ingest/TestUtils.java index eede847b8..e0d7a3e07 100644 --- a/src/test/java/net/snowflake/ingest/TestUtils.java +++ b/src/test/java/net/snowflake/ingest/TestUtils.java @@ -30,8 +30,6 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; -import java.util.Arrays; -import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -148,11 +146,6 @@ private static String getTestProfilePath() { return testProfilePath; } - /** @return list of Bdec versions for which to execute IT tests. */ - public static Collection getBdecVersionItCases() { - return Arrays.asList(new Object[][] {{"Parquet", Constants.BdecVersion.THREE}}); - } - public static String getUser() throws Exception { if (profile == null) { init(); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index 5d3e62ef2..fc049c297 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -28,7 +28,6 @@ import java.util.Arrays; import java.util.Base64; import java.util.Calendar; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; @@ -49,21 +48,13 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; import org.mockito.Mockito; -@RunWith(Parameterized.class) public class FlushServiceTest { - @Parameterized.Parameters(name = "{0}") - public static Collection testContextFactory() { - return Arrays.asList(new Object[][] {{ParquetTestContext.createFactory()}}); - } - - public FlushServiceTest(TestContextFactory testContextFactory) { - this.testContextFactory = testContextFactory; + public FlushServiceTest() { + this.testContextFactory = ParquetTestContext.createFactory(); } private abstract static class TestContextFactory { diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestBigFilesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestBigFilesIT.java index f12484062..4d7e71ac7 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestBigFilesIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestBigFilesIT.java @@ -17,17 +17,9 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; /** Ingest large amount of rows. */ -@RunWith(Parameterized.class) public class StreamingIngestBigFilesIT { - @Parameterized.Parameters(name = "{0}") - public static Collection bdecVersion() { - return TestUtils.getBdecVersionItCases(); - } - private static final String TEST_DB_PREFIX = "STREAMING_INGEST_TEST_DB"; private static final String TEST_SCHEMA = "STREAMING_INGEST_TEST_SCHEMA"; @@ -37,13 +29,6 @@ public static Collection bdecVersion() { private Connection jdbcConnection; private String testDb; - private final Constants.BdecVersion bdecVersion; - - public StreamingIngestBigFilesIT( - @SuppressWarnings("unused") String name, Constants.BdecVersion bdecVersion) { - this.bdecVersion = bdecVersion; - } - @Before public void beforeAll() throws Exception { testDb = TEST_DB_PREFIX + "_" + UUID.randomUUID().toString().substring(0, 4); @@ -62,7 +47,7 @@ public void beforeAll() throws Exception { .createStatement() .execute(String.format("use warehouse %s", TestUtils.getWarehouse())); - prop = TestUtils.getProperties(bdecVersion); + prop = TestUtils.getProperties(Constants.BdecVersion.THREE); if (prop.getProperty(ROLE).equals("DEFAULT_ROLE")) { prop.setProperty(ROLE, "ACCOUNTADMIN"); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java index 1f9ea3914..2bcfbcdb2 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java @@ -16,7 +16,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Calendar; -import java.util.Collection; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -47,19 +46,11 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import org.mockito.ArgumentMatchers; import org.mockito.Mockito; /** Example streaming ingest sdk integration test */ -@RunWith(Parameterized.class) public class StreamingIngestIT { - @Parameterized.Parameters(name = "{0}") - public static Collection bdecVersion() { - return TestUtils.getBdecVersionItCases(); - } - private static final String TEST_TABLE = "STREAMING_INGEST_TEST_TABLE"; private static final String TEST_DB_PREFIX = "STREAMING_INGEST_TEST_DB"; private static final String TEST_SCHEMA = "STREAMING_INGEST_TEST_SCHEMA"; @@ -74,13 +65,6 @@ public static Collection bdecVersion() { private Connection jdbcConnection; private String testDb; - private final Constants.BdecVersion bdecVersion; - - public StreamingIngestIT( - @SuppressWarnings("unused") String name, Constants.BdecVersion bdecVersion) { - this.bdecVersion = bdecVersion; - } - @Before public void beforeAll() throws Exception { testDb = TEST_DB_PREFIX + "_" + UUID.randomUUID().toString().substring(0, 4); @@ -105,7 +89,7 @@ public void beforeAll() throws Exception { .createStatement() .execute(String.format("use warehouse %s", TestUtils.getWarehouse())); - prop = TestUtils.getProperties(bdecVersion); + prop = TestUtils.getProperties(Constants.BdecVersion.THREE); if (prop.getProperty(ROLE).equals("DEFAULT_ROLE")) { prop.setProperty(ROLE, "ACCOUNTADMIN"); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java index 96792ee0f..4ff61e142 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java @@ -10,7 +10,6 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.time.ZoneId; -import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -27,16 +26,8 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -@RunWith(Parameterized.class) public abstract class AbstractDataTypeTest { - @Parameterized.Parameters(name = "{0}") - public static Collection bdecVersion() { - return TestUtils.getBdecVersionItCases(); - } - private static final String SOURCE_COLUMN_NAME = "source"; private static final String VALUE_COLUMN_NAME = "value"; @@ -65,13 +56,6 @@ public static Collection bdecVersion() { private SnowflakeStreamingIngestClient client; private static final ObjectMapper objectMapper = new ObjectMapper(); - private final Constants.BdecVersion bdecVersion; - - public AbstractDataTypeTest( - @SuppressWarnings("unused") String name, Constants.BdecVersion bdecVersion) { - this.bdecVersion = bdecVersion; - } - @Before public void before() throws Exception { databaseName = String.format("SDK_DATATYPE_COMPATIBILITY_IT_%s", getRandomIdentifier()); @@ -82,7 +66,7 @@ public void before() throws Exception { conn.createStatement().execute(String.format("use warehouse %s;", TestUtils.getWarehouse())); - Properties props = TestUtils.getProperties(bdecVersion); + Properties props = TestUtils.getProperties(Constants.BdecVersion.THREE); if (props.getProperty(ROLE).equals("DEFAULT_ROLE")) { props.setProperty(ROLE, "ACCOUNTADMIN"); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/BinaryIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/BinaryIT.java index d79d6836b..d9e6e3ddd 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/BinaryIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/BinaryIT.java @@ -1,15 +1,10 @@ package net.snowflake.ingest.streaming.internal.datatypes; import net.snowflake.client.jdbc.internal.org.bouncycastle.util.encoders.Hex; -import net.snowflake.ingest.utils.Constants; import org.junit.Test; public class BinaryIT extends AbstractDataTypeTest { - public BinaryIT(String name, Constants.BdecVersion bdecVersion) { - super(name, bdecVersion); - } - @Test public void testBinary() throws Exception { testJdbcTypeCompatibility("BINARY", new byte[0], new ByteArrayProvider()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/DateTimeIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/DateTimeIT.java index 3b0133ff1..35dfe2626 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/DateTimeIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/DateTimeIT.java @@ -9,7 +9,6 @@ import java.time.ZoneId; import java.time.ZoneOffset; import java.time.ZonedDateTime; -import net.snowflake.ingest.utils.Constants; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -20,10 +19,6 @@ public class DateTimeIT extends AbstractDataTypeTest { private static final ZoneId TZ_BERLIN = ZoneId.of("Europe/Berlin"); private static final ZoneId TZ_TOKYO = ZoneId.of("Asia/Tokyo"); - public DateTimeIT(String name, Constants.BdecVersion bdecVersion) { - super(name, bdecVersion); - } - @Before public void setup() throws Exception { // Set to a random time zone not to interfere with any of the tests diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/LogicalTypesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/LogicalTypesIT.java index 48936b950..6bc769a94 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/LogicalTypesIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/LogicalTypesIT.java @@ -2,15 +2,10 @@ import java.math.BigDecimal; import java.math.BigInteger; -import net.snowflake.ingest.utils.Constants; import org.junit.Test; public class LogicalTypesIT extends AbstractDataTypeTest { - public LogicalTypesIT(String name, Constants.BdecVersion bdecVersion) { - super(name, bdecVersion); - } - @Test public void testLogicalTypes() throws Exception { // Test booleans diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/NullIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/NullIT.java index 22c59d3ea..fa01eebf0 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/NullIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/NullIT.java @@ -1,15 +1,10 @@ package net.snowflake.ingest.streaming.internal.datatypes; import java.util.Arrays; -import net.snowflake.ingest.utils.Constants; import org.junit.Test; public class NullIT extends AbstractDataTypeTest { - public NullIT(String name, Constants.BdecVersion bdecVersion) { - super(name, bdecVersion); - } - @Test public void testNullIngestion() throws Exception { for (String type : diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/NumericTypesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/NumericTypesIT.java index e9361e4a0..4e6ade712 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/NumericTypesIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/NumericTypesIT.java @@ -2,15 +2,10 @@ import java.math.BigDecimal; import java.math.BigInteger; -import net.snowflake.ingest.utils.Constants; import org.junit.Test; public class NumericTypesIT extends AbstractDataTypeTest { - public NumericTypesIT(String name, Constants.BdecVersion bdecVersion) { - super(name, bdecVersion); - } - @Test public void testIntegers() throws Exception { // test bytes diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/SemiStructuredIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/SemiStructuredIT.java index bf55b3396..49e994180 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/SemiStructuredIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/SemiStructuredIT.java @@ -15,7 +15,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; -import net.snowflake.ingest.utils.Constants; import org.junit.Assert; import org.junit.Test; @@ -25,10 +24,6 @@ public class SemiStructuredIT extends AbstractDataTypeTest { // server-side representation. Validation leaves a small buffer for this difference. private static final int MAX_ALLOWED_LENGTH = 16 * 1024 * 1024 - 64; - public SemiStructuredIT(String name, Constants.BdecVersion bdecVersion) { - super(name, bdecVersion); - } - @Test public void testVariant() throws Exception { // Test dates diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/StringsIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/StringsIT.java index c767f6069..63dc515f5 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/StringsIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/StringsIT.java @@ -6,7 +6,6 @@ import java.math.BigInteger; import java.nio.charset.StandardCharsets; import java.sql.SQLException; -import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; import org.junit.Assert; @@ -17,10 +16,6 @@ public class StringsIT extends AbstractDataTypeTest { private static final int MB_16 = 16 * 1024 * 1024; - public StringsIT(String name, Constants.BdecVersion bdecVersion) { - super(name, bdecVersion); - } - @Test public void testStrings() throws Exception { testJdbcTypeCompatibility("VARCHAR", "", new StringProvider()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/it/ColumnNamesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/it/ColumnNamesIT.java index fc2b220f4..254da0d52 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/it/ColumnNamesIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/it/ColumnNamesIT.java @@ -13,7 +13,6 @@ import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel; import net.snowflake.ingest.streaming.internal.datatypes.AbstractDataTypeTest; -import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.SFException; import org.junit.Assert; import org.junit.Test; @@ -21,10 +20,6 @@ public class ColumnNamesIT extends AbstractDataTypeTest { private static final int INGEST_VALUE = 1; - public ColumnNamesIT(String name, Constants.BdecVersion bdecVersion) { - super(name, bdecVersion); - } - @Test public void testColumnNamesSupport() throws Exception { // Test simple case