Skip to content

Commit

Permalink
SNOW-818891 Remove Arrow and other unneeded dependenies
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-lsembera committed May 24, 2023
1 parent 5e2a90b commit e5e6cce
Show file tree
Hide file tree
Showing 33 changed files with 84 additions and 1,764 deletions.
39 changes: 20 additions & 19 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@

<!-- Set our Language Level to Java 8 -->
<properties>
<arrow.version>10.0.0</arrow.version>
<codehaus.version>1.9.13</codehaus.version>
<commonscodec.version>1.15</commonscodec.version>
<commonscollections.version>3.2.2</commonscollections.version>
Expand Down Expand Up @@ -157,6 +156,10 @@
<groupId>com.github.pjfanning</groupId>
<artifactId>jersey-json</artifactId>
</exclusion>
<exclusion>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
Expand All @@ -169,6 +172,10 @@
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-servlet</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>dnsjava</groupId>
<artifactId>dnsjava</artifactId>
Expand All @@ -189,10 +196,22 @@
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.kerby</groupId>
<artifactId>kerb-core</artifactId>
Expand Down Expand Up @@ -406,17 +425,6 @@
<groupId>net.snowflake</groupId>
<artifactId>snowflake-jdbc</artifactId>
</dependency>
<!-- Apache Arrow -->
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-core</artifactId>
<version>${arrow.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>${arrow.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
Expand All @@ -439,12 +447,6 @@
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
<version>${arrow.version}</version>
<scope>runtime</scope>
</dependency>
<!-- JUnit so that we can make some basic unit tests -->
<dependency>
<groupId>junit</groupId>
Expand Down Expand Up @@ -752,7 +754,6 @@
<includedLicense>Apache License 2.0</includedLicense>
<includedLicense>BSD 2-Clause License</includedLicense>
<includedLicense>3-Clause BSD License</includedLicense>
<includedLicense>Revised BSD</includedLicense>
<includedLicense>The MIT License</includedLicense>
<includedLicense>EDL 1.0</includedLicense>
<includedLicense>The Go license</includedLicense>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package net.snowflake.ingest.streaming.internal;

import com.google.common.annotations.VisibleForTesting;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -23,17 +24,13 @@
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.SFException;
import net.snowflake.ingest.utils.Utils;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.VisibleForTesting;

/**
* The abstract implementation of the buffer in the Streaming Ingest channel that holds the
* un-flushed rows, these rows will be converted to the underlying format implementation for faster
* processing
*
* @param <T> type of column data (Arrow {@link org.apache.arrow.vector.VectorSchemaRoot} or Parquet
* {@link ParquetChunkData})
* @param <T> type of column data ({@link ParquetChunkData} for Parquet)
*/
abstract class AbstractRowBuffer<T> implements RowBuffer<T> {
private static final Logging logger = new Logging(AbstractRowBuffer.class);
Expand Down Expand Up @@ -162,9 +159,6 @@ public int getOrdinal() {
// Metric callback to report size of inserted rows
private final Consumer<Float> rowSizeMetric;

// Allocator used to allocate the buffers
final BufferAllocator allocator;

// State of the owning channel
final ChannelRuntimeState channelState;

Expand All @@ -176,7 +170,6 @@ public int getOrdinal() {
AbstractRowBuffer(
OpenChannelRequest.OnErrorOption onErrorOption,
ZoneId defaultTimezone,
BufferAllocator allocator,
String fullyQualifiedChannelName,
Consumer<Float> rowSizeMetric,
ChannelRuntimeState channelRuntimeState) {
Expand All @@ -185,7 +178,6 @@ public int getOrdinal() {
this.rowSizeMetric = rowSizeMetric;
this.channelState = channelRuntimeState;
this.channelFullyQualifiedName = fullyQualifiedChannelName;
this.allocator = allocator;
this.nonNullableFieldNames = new HashSet<>();
this.flushLock = new ReentrantLock();
this.bufferedRowCount = 0;
Expand Down Expand Up @@ -505,30 +497,8 @@ void reset() {
/** Close the row buffer and release allocated memory for the channel. */
@Override
public synchronized void close(String name) {
long allocatedBeforeRelease = this.allocator.getAllocatedMemory();

closeInternal();

long allocatedAfterRelease = this.allocator.getAllocatedMemory();
logger.logInfo(
"Trying to close {} for channel={} from function={}, allocatedBeforeRelease={},"
+ " allocatedAfterRelease={}",
this.getClass().getSimpleName(),
channelFullyQualifiedName,
name,
allocatedBeforeRelease,
allocatedAfterRelease);
Utils.closeAllocator(this.allocator);

// If the channel is valid but still has leftover data, throw an exception because it should be
// cleaned up already before calling close
if (allocatedBeforeRelease > 0 && this.channelState.isValid()) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
String.format(
"Memory leaked=%d by allocator=%s, channel=%s",
allocatedBeforeRelease, this.allocator, channelFullyQualifiedName));
}
}

/**
Expand All @@ -554,31 +524,19 @@ static EpInfo buildEpInfoFromStats(long rowCount, Map<String, RowBufferStats> co
static <T> AbstractRowBuffer<T> createRowBuffer(
OpenChannelRequest.OnErrorOption onErrorOption,
ZoneId defaultTimezone,
BufferAllocator allocator,
Constants.BdecVersion bdecVersion,
String fullyQualifiedChannelName,
Consumer<Float> rowSizeMetric,
ChannelRuntimeState channelRuntimeState,
boolean enableParquetMemoryOptimization,
long maxChunkSizeInBytes) {
switch (bdecVersion) {
case ONE:
//noinspection unchecked
return (AbstractRowBuffer<T>)
new ArrowRowBuffer(
onErrorOption,
defaultTimezone,
allocator,
fullyQualifiedChannelName,
rowSizeMetric,
channelRuntimeState);
case THREE:
//noinspection unchecked
return (AbstractRowBuffer<T>)
new ParquetRowBuffer(
onErrorOption,
defaultTimezone,
allocator,
fullyQualifiedChannelName,
rowSizeMetric,
channelRuntimeState,
Expand Down

This file was deleted.

Loading

0 comments on commit e5e6cce

Please sign in to comment.