From 54b5a966c5508879e0a973ef5814d02ae7a5dbd3 Mon Sep 17 00:00:00 2001 From: Bryan Keller Date: Sun, 13 Aug 2023 15:28:32 -0700 Subject: [PATCH] Updates from master (#138) * Spark: Update antlr4 to match Spark 3.4 (#7824) * Parquet: Revert workaround for resource usage with zstd (#7834) * GCP: fix single byte read in GCSInputStream (#8071) * GCP: fix byte read in GCSInputStream * add test * Parquet: Cache codecs by name and level (#8182) * GCP: Add prefix and bulk operations to GCSFileIO (#8168) * AWS, GCS: Allow access to underlying storage client (#8208) * spotless --- .../org/apache/iceberg/aws/s3/S3FileIO.java | 2 +- .../org/apache/iceberg/gcp/GCPProperties.java | 19 +++++++++++++++++ .../org/apache/iceberg/gcp/gcs/GCSFileIO.java | 21 +++++++++---------- .../apache/iceberg/gcp/gcs/GCSLocation.java | 2 +- .../apache/iceberg/gcp/gcs/GCSFileIOTest.java | 6 +++--- .../iceberg/gcp/gcs/GCSInputStreamTest.java | 16 ++++++++++++++ .../org/apache/iceberg/parquet/Parquet.java | 4 ---- .../iceberg/parquet/ParquetCodecFactory.java | 6 +++--- spark/v3.4/build.gradle | 4 ++-- 9 files changed, 55 insertions(+), 25 deletions(-) diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java index 93793157e2e1..9ba2f545b2eb 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java @@ -323,7 +323,7 @@ public void deletePrefix(String prefix) { deleteFiles(() -> Streams.stream(listPrefix(prefix)).map(FileInfo::location).iterator()); } - private S3Client client() { + public S3Client client() { if (client == null) { synchronized (this) { if (client == null) { diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java b/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java index 109a4d21ed3c..521eb4c6c867 100644 --- a/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java +++ b/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java @@ -22,6 +22,7 @@ import java.util.Date; import java.util.Map; import java.util.Optional; +import org.apache.iceberg.util.PropertyUtil; public class GCPProperties implements Serializable { // Service Options @@ -40,6 +41,14 @@ public class GCPProperties implements Serializable { public static final String GCS_OAUTH2_TOKEN = "gcs.oauth2.token"; public static final String GCS_OAUTH2_TOKEN_EXPIRES_AT = "gcs.oauth2.token-expires-at"; + /** Configure the batch size used when deleting multiple files from a given GCS bucket */ + public static final String GCS_DELETE_BATCH_SIZE = "gcs.delete.batch-size"; + /** + * Max possible batch size for deletion. Currently, a max of 100 keys is advised, so we default to + * a number below that. https://cloud.google.com/storage/docs/batch + */ + public static final int GCS_DELETE_BATCH_SIZE_DEFAULT = 50; + private String projectId; private String clientLibToken; private String serviceHost; @@ -54,6 +63,8 @@ public class GCPProperties implements Serializable { private String gcsOAuth2Token; private Date gcsOAuth2TokenExpiresAt; + private int gcsDeleteBatchSize = GCS_DELETE_BATCH_SIZE_DEFAULT; + public GCPProperties() {} public GCPProperties(Map properties) { @@ -78,6 +89,10 @@ public GCPProperties(Map properties) { gcsOAuth2TokenExpiresAt = new Date(Long.parseLong(properties.get(GCS_OAUTH2_TOKEN_EXPIRES_AT))); } + + gcsDeleteBatchSize = + PropertyUtil.propertyAsInt( + properties, GCS_DELETE_BATCH_SIZE, GCS_DELETE_BATCH_SIZE_DEFAULT); } public Optional channelReadChunkSize() { @@ -119,4 +134,8 @@ public Optional oauth2Token() { public Optional oauth2TokenExpiresAt() { return Optional.ofNullable(gcsOAuth2TokenExpiresAt); } + + public int deleteBatchSize() { + return gcsDeleteBatchSize; + } } diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java index 177528d0b5a3..28f9aedde1e3 100644 --- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java +++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java @@ -26,6 +26,7 @@ import com.google.cloud.storage.StorageOptions; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; import org.apache.iceberg.common.DynConstructors; import org.apache.iceberg.gcp.GCPProperties; import org.apache.iceberg.io.BulkDeletionFailureException; @@ -36,7 +37,7 @@ import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.io.SupportsPrefixOperations; import org.apache.iceberg.metrics.MetricsContext; -import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Iterators; import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.util.SerializableMap; import org.apache.iceberg.util.SerializableSupplier; @@ -119,7 +120,7 @@ public Map properties() { return properties.immutableMap(); } - private Storage client() { + public Storage client() { if (storage == null) { synchronized (this) { if (storage == null) { @@ -195,11 +196,11 @@ public Iterable listPrefix(String prefix) { new FileInfo( String.format("gs://%s/%s", blob.getBucket(), blob.getName()), blob.getSize(), - getCreateTimeMillis(blob))) + createTimeMillis(blob))) .iterator(); } - private long getCreateTimeMillis(Blob blob) { + private long createTimeMillis(Blob blob) { if (blob.getCreateTimeOffsetDateTime() == null) { return 0; } @@ -209,19 +210,17 @@ private long getCreateTimeMillis(Blob blob) { @Override public void deletePrefix(String prefix) { internalDeleteFiles( - () -> - Streams.stream(listPrefix(prefix)) - .map(fileInfo -> BlobId.fromGsUtilUri(fileInfo.location())) - .iterator()); + Streams.stream(listPrefix(prefix)) + .map(fileInfo -> BlobId.fromGsUtilUri(fileInfo.location()))); } @Override public void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailureException { - internalDeleteFiles(() -> Streams.stream(pathsToDelete).map(BlobId::fromGsUtilUri).iterator()); + internalDeleteFiles(Streams.stream(pathsToDelete).map(BlobId::fromGsUtilUri)); } - private void internalDeleteFiles(Iterable blobIdsToDelete) { - Streams.stream(Iterables.partition(blobIdsToDelete, DELETE_BATCH_SIZE)) + private void internalDeleteFiles(Stream blobIdsToDelete) { + Streams.stream(Iterators.partition(blobIdsToDelete.iterator(), gcpProperties.deleteBatchSize())) .forEach(batch -> client().delete(batch)); } } diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSLocation.java b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSLocation.java index 8b7c6c9d799f..e1de27dcd577 100644 --- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSLocation.java +++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSLocation.java @@ -43,7 +43,7 @@ class GCSLocation { * @param location fully qualified URI */ GCSLocation(String location) { - Preconditions.checkNotNull(location, "Location cannot be null."); + Preconditions.checkArgument(location != null, "Invalid location: null"); String[] schemeSplit = location.split(SCHEME_DELIM, -1); ValidationException.check( diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java index c73a3b11654d..4c875921a77b 100644 --- a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java +++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java @@ -24,7 +24,6 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; -import com.google.api.client.util.Lists; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.Storage; @@ -43,6 +42,7 @@ import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -58,9 +58,9 @@ public void before() { // LocalStorageHelper doesn't support batch operations, so mock that here doAnswer( invoke -> { - Iterable it = invoke.getArgument(0); + Iterable iter = invoke.getArgument(0); List answer = Lists.newArrayList(); - it.forEach( + iter.forEach( blobId -> { answer.add(storage.delete(blobId)); }); diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSInputStreamTest.java b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSInputStreamTest.java index 1a5200345b7a..733d9101869b 100644 --- a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSInputStreamTest.java +++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSInputStreamTest.java @@ -78,6 +78,22 @@ public void testRead() throws Exception { } } + @Test + public void testReadSingle() throws Exception { + BlobId uri = BlobId.fromGsUtilUri("gs://bucket/path/to/read.dat"); + int i0 = 1; + int i1 = 255; + byte[] data = {(byte) i0, (byte) i1}; + + writeGCSData(uri, data); + + try (SeekableInputStream in = + new GCSInputStream(storage, uri, gcpProperties, MetricsContext.nullMetrics())) { + assertThat(in.read()).isEqualTo(i0); + assertThat(in.read()).isEqualTo(i1); + } + } + private void readAndCheck( SeekableInputStream in, long rangeStart, int size, byte[] original, boolean buffered) throws IOException { diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index cdfb9d59b059..2735d9a1e720 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -1030,12 +1030,8 @@ public CloseableIterable build() { conf.unset(property); } optionsBuilder = HadoopReadOptions.builder(conf); - // page size not used by decompressors - optionsBuilder.withCodecFactory(new ParquetCodecFactory(conf, 0)); } else { optionsBuilder = ParquetReadOptions.builder(); - // page size not used by decompressors - optionsBuilder.withCodecFactory(new ParquetCodecFactory(new Configuration(), 0)); } for (Map.Entry entry : properties.entrySet()) { diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java index beef07a570b2..bfcece6259a6 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java @@ -77,10 +77,10 @@ private String cacheKey(CompressionCodecName codecName) { level = configuration.get("compression.brotli.quality"); break; case ZSTD: - // keep "io.compression.codec.zstd.level" for backwards compatibility - level = configuration.get("io.compression.codec.zstd.level"); + level = configuration.get("parquet.compression.codec.zstd.level"); if (level == null) { - level = configuration.get("parquet.compression.codec.zstd.level"); + // keep "io.compression.codec.zstd.level" for backwards compatibility + level = configuration.get("io.compression.codec.zstd.level"); } break; default: diff --git a/spark/v3.4/build.gradle b/spark/v3.4/build.gradle index a6514a9dffea..dd01a60c1d80 100644 --- a/spark/v3.4/build.gradle +++ b/spark/v3.4/build.gradle @@ -165,8 +165,8 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer testImplementation "org.apache.parquet:parquet-hadoop" // Required because we remove antlr plugin dependencies from the compile configuration, see note above - runtimeOnly "org.antlr:antlr4-runtime:4.8" - antlr "org.antlr:antlr4:4.8" + runtimeOnly "org.antlr:antlr4-runtime:4.9.3" + antlr "org.antlr:antlr4:4.9.3" } generateGrammarSource {