Skip to content

Commit

Permalink
Updates from master (#138)
Browse files Browse the repository at this point in the history
* Spark: Update antlr4 to match Spark 3.4 (apache#7824)

* Parquet: Revert workaround for resource usage with zstd (apache#7834)

* GCP: fix single byte read in GCSInputStream (apache#8071)

* GCP: fix byte read in GCSInputStream

* add test

* Parquet: Cache codecs by name and level (apache#8182)

* GCP: Add prefix and bulk operations to GCSFileIO (apache#8168)

* AWS, GCS: Allow access to underlying storage client (apache#8208)

* spotless
  • Loading branch information
bryanck committed Aug 13, 2023
1 parent 5df8df1 commit 54b5a96
Show file tree
Hide file tree
Showing 9 changed files with 55 additions and 25 deletions.
2 changes: 1 addition & 1 deletion aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
Expand Up @@ -323,7 +323,7 @@ public void deletePrefix(String prefix) {
deleteFiles(() -> Streams.stream(listPrefix(prefix)).map(FileInfo::location).iterator());
}

private S3Client client() {
public S3Client client() {
if (client == null) {
synchronized (this) {
if (client == null) {
Expand Down
19 changes: 19 additions & 0 deletions gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java
Expand Up @@ -22,6 +22,7 @@
import java.util.Date;
import java.util.Map;
import java.util.Optional;
import org.apache.iceberg.util.PropertyUtil;

public class GCPProperties implements Serializable {
// Service Options
Expand All @@ -40,6 +41,14 @@ public class GCPProperties implements Serializable {
public static final String GCS_OAUTH2_TOKEN = "gcs.oauth2.token";
public static final String GCS_OAUTH2_TOKEN_EXPIRES_AT = "gcs.oauth2.token-expires-at";

/** Configure the batch size used when deleting multiple files from a given GCS bucket */
public static final String GCS_DELETE_BATCH_SIZE = "gcs.delete.batch-size";
/**
* Max possible batch size for deletion. Currently, a max of 100 keys is advised, so we default to
* a number below that. https://cloud.google.com/storage/docs/batch
*/
public static final int GCS_DELETE_BATCH_SIZE_DEFAULT = 50;

private String projectId;
private String clientLibToken;
private String serviceHost;
Expand All @@ -54,6 +63,8 @@ public class GCPProperties implements Serializable {
private String gcsOAuth2Token;
private Date gcsOAuth2TokenExpiresAt;

private int gcsDeleteBatchSize = GCS_DELETE_BATCH_SIZE_DEFAULT;

public GCPProperties() {}

public GCPProperties(Map<String, String> properties) {
Expand All @@ -78,6 +89,10 @@ public GCPProperties(Map<String, String> properties) {
gcsOAuth2TokenExpiresAt =
new Date(Long.parseLong(properties.get(GCS_OAUTH2_TOKEN_EXPIRES_AT)));
}

gcsDeleteBatchSize =
PropertyUtil.propertyAsInt(
properties, GCS_DELETE_BATCH_SIZE, GCS_DELETE_BATCH_SIZE_DEFAULT);
}

public Optional<Integer> channelReadChunkSize() {
Expand Down Expand Up @@ -119,4 +134,8 @@ public Optional<String> oauth2Token() {
public Optional<Date> oauth2TokenExpiresAt() {
return Optional.ofNullable(gcsOAuth2TokenExpiresAt);
}

public int deleteBatchSize() {
return gcsDeleteBatchSize;
}
}
21 changes: 10 additions & 11 deletions gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
Expand Up @@ -26,6 +26,7 @@
import com.google.cloud.storage.StorageOptions;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.gcp.GCPProperties;
import org.apache.iceberg.io.BulkDeletionFailureException;
Expand All @@ -36,7 +37,7 @@
import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.io.SupportsPrefixOperations;
import org.apache.iceberg.metrics.MetricsContext;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.util.SerializableMap;
import org.apache.iceberg.util.SerializableSupplier;
Expand Down Expand Up @@ -119,7 +120,7 @@ public Map<String, String> properties() {
return properties.immutableMap();
}

private Storage client() {
public Storage client() {
if (storage == null) {
synchronized (this) {
if (storage == null) {
Expand Down Expand Up @@ -195,11 +196,11 @@ public Iterable<FileInfo> listPrefix(String prefix) {
new FileInfo(
String.format("gs://%s/%s", blob.getBucket(), blob.getName()),
blob.getSize(),
getCreateTimeMillis(blob)))
createTimeMillis(blob)))
.iterator();
}

private long getCreateTimeMillis(Blob blob) {
private long createTimeMillis(Blob blob) {
if (blob.getCreateTimeOffsetDateTime() == null) {
return 0;
}
Expand All @@ -209,19 +210,17 @@ private long getCreateTimeMillis(Blob blob) {
@Override
public void deletePrefix(String prefix) {
internalDeleteFiles(
() ->
Streams.stream(listPrefix(prefix))
.map(fileInfo -> BlobId.fromGsUtilUri(fileInfo.location()))
.iterator());
Streams.stream(listPrefix(prefix))
.map(fileInfo -> BlobId.fromGsUtilUri(fileInfo.location())));
}

@Override
public void deleteFiles(Iterable<String> pathsToDelete) throws BulkDeletionFailureException {
internalDeleteFiles(() -> Streams.stream(pathsToDelete).map(BlobId::fromGsUtilUri).iterator());
internalDeleteFiles(Streams.stream(pathsToDelete).map(BlobId::fromGsUtilUri));
}

private void internalDeleteFiles(Iterable<BlobId> blobIdsToDelete) {
Streams.stream(Iterables.partition(blobIdsToDelete, DELETE_BATCH_SIZE))
private void internalDeleteFiles(Stream<BlobId> blobIdsToDelete) {
Streams.stream(Iterators.partition(blobIdsToDelete.iterator(), gcpProperties.deleteBatchSize()))
.forEach(batch -> client().delete(batch));
}
}
Expand Up @@ -43,7 +43,7 @@ class GCSLocation {
* @param location fully qualified URI
*/
GCSLocation(String location) {
Preconditions.checkNotNull(location, "Location cannot be null.");
Preconditions.checkArgument(location != null, "Invalid location: null");

String[] schemeSplit = location.split(SCHEME_DELIM, -1);
ValidationException.check(
Expand Down
Expand Up @@ -24,7 +24,6 @@
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;

import com.google.api.client.util.Lists;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
Expand All @@ -43,6 +42,7 @@
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand All @@ -58,9 +58,9 @@ public void before() {
// LocalStorageHelper doesn't support batch operations, so mock that here
doAnswer(
invoke -> {
Iterable<BlobId> it = invoke.getArgument(0);
Iterable<BlobId> iter = invoke.getArgument(0);
List<Boolean> answer = Lists.newArrayList();
it.forEach(
iter.forEach(
blobId -> {
answer.add(storage.delete(blobId));
});
Expand Down
Expand Up @@ -78,6 +78,22 @@ public void testRead() throws Exception {
}
}

@Test
public void testReadSingle() throws Exception {
BlobId uri = BlobId.fromGsUtilUri("gs://bucket/path/to/read.dat");
int i0 = 1;
int i1 = 255;
byte[] data = {(byte) i0, (byte) i1};

writeGCSData(uri, data);

try (SeekableInputStream in =
new GCSInputStream(storage, uri, gcpProperties, MetricsContext.nullMetrics())) {
assertThat(in.read()).isEqualTo(i0);
assertThat(in.read()).isEqualTo(i1);
}
}

private void readAndCheck(
SeekableInputStream in, long rangeStart, int size, byte[] original, boolean buffered)
throws IOException {
Expand Down
4 changes: 0 additions & 4 deletions parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
Expand Up @@ -1030,12 +1030,8 @@ public <D> CloseableIterable<D> build() {
conf.unset(property);
}
optionsBuilder = HadoopReadOptions.builder(conf);
// page size not used by decompressors
optionsBuilder.withCodecFactory(new ParquetCodecFactory(conf, 0));
} else {
optionsBuilder = ParquetReadOptions.builder();
// page size not used by decompressors
optionsBuilder.withCodecFactory(new ParquetCodecFactory(new Configuration(), 0));
}

for (Map.Entry<String, String> entry : properties.entrySet()) {
Expand Down
Expand Up @@ -77,10 +77,10 @@ private String cacheKey(CompressionCodecName codecName) {
level = configuration.get("compression.brotli.quality");
break;
case ZSTD:
// keep "io.compression.codec.zstd.level" for backwards compatibility
level = configuration.get("io.compression.codec.zstd.level");
level = configuration.get("parquet.compression.codec.zstd.level");
if (level == null) {
level = configuration.get("parquet.compression.codec.zstd.level");
// keep "io.compression.codec.zstd.level" for backwards compatibility
level = configuration.get("io.compression.codec.zstd.level");
}
break;
default:
Expand Down
4 changes: 2 additions & 2 deletions spark/v3.4/build.gradle
Expand Up @@ -165,8 +165,8 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer
testImplementation "org.apache.parquet:parquet-hadoop"

// Required because we remove antlr plugin dependencies from the compile configuration, see note above
runtimeOnly "org.antlr:antlr4-runtime:4.8"
antlr "org.antlr:antlr4:4.8"
runtimeOnly "org.antlr:antlr4-runtime:4.9.3"
antlr "org.antlr:antlr4:4.9.3"
}

generateGrammarSource {
Expand Down

0 comments on commit 54b5a96

Please sign in to comment.