Skip to content

Commit

Permalink
Add GCS testcontainer + IT for Nessie-GC
Browse files Browse the repository at this point in the history
  • Loading branch information
snazy committed Mar 20, 2024
1 parent 7e0eb45 commit e5d1de1
Show file tree
Hide file tree
Showing 9 changed files with 293 additions and 77 deletions.
1 change: 1 addition & 0 deletions bom/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ dependencies {
api(project(":nessie-compatibility-jersey"))
api(project(":nessie-gc-base"))
api(project(":nessie-gc-repository-jdbc"))
api(project(":nessie-gcs-testcontainer"))
api(project(":nessie-model"))
api(project(":nessie-jaxrs"))
api(project(":nessie-jaxrs-testextension"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.projectnessie.gc.iceberg.files;

import com.google.errorprone.annotations.CanIgnoreReturnValue;
import com.google.errorprone.annotations.MustBeClosed;
import java.io.IOException;
import java.net.URI;
Expand All @@ -34,6 +35,8 @@
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.ResolvingFileIO;
import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.io.SupportsPrefixOperations;
import org.immutables.value.Value;
import org.projectnessie.gc.files.DeleteResult;
import org.projectnessie.gc.files.DeleteSummary;
Expand Down Expand Up @@ -71,12 +74,16 @@ static URI ensureTrailingSlash(URI uri) {
}

public interface Builder {
@CanIgnoreReturnValue
Builder hadoopConfiguration(Configuration hadoopConfiguration);

@CanIgnoreReturnValue
Builder putProperties(String key, String value);

@CanIgnoreReturnValue
Builder putAllProperties(Map<String, ? extends String> entries);

@CanIgnoreReturnValue
Builder properties(Map<String, ? extends String> entries);

IcebergFiles build();
Expand All @@ -92,9 +99,6 @@ Configuration hadoopConfiguration() {
@SuppressWarnings("immutables:incompat")
private volatile boolean hasResolvingFileIO;

@SuppressWarnings("immutables:incompat")
private volatile boolean hasS3FileIO;

@Value.Lazy
public FileIO resolvingFileIO() {
ResolvingFileIO fileIO = new ResolvingFileIO();
Expand All @@ -105,32 +109,21 @@ public FileIO resolvingFileIO() {
return fileIO;
}

@Value.Lazy
S3FileIO s3() {
S3FileIO fileIO = new S3FileIO();
fileIO.initialize(properties());
hasS3FileIO = true;
LOGGER.debug("Instantiated Iceberg's S3FileIO");
return fileIO;
}

@Override
public void close() {
try {
if (hasS3FileIO) {
s3().close();
}
} finally {
if (hasResolvingFileIO) {
resolvingFileIO().close();
}
if (hasResolvingFileIO) {
resolvingFileIO().close();
}
}

private boolean isS3(URI uri) {
private boolean canBulkAndPrefix(URI uri) {
switch (uri.getScheme()) {
case "s3":
case "s3a":
case "s3n":
case "gs":
case "abfs":
case "abfss":
return true;
default:
return false;
Expand All @@ -141,10 +134,10 @@ private boolean isS3(URI uri) {
@MustBeClosed
public Stream<FileReference> listRecursively(URI path) throws NessieFileIOException {
URI basePath = ensureTrailingSlash(path);
if (isS3(path)) {
if (canBulkAndPrefix(path)) {

@SuppressWarnings("resource")
S3FileIO fileIo = s3();
SupportsPrefixOperations fileIo = (SupportsPrefixOperations) resolvingFileIO();
return StreamSupport.stream(fileIo.listPrefix(basePath.toString()).spliterator(), false)
.map(
f ->
Expand Down Expand Up @@ -205,7 +198,7 @@ public DeleteResult delete(FileReference fileReference) {
try {
URI absolutePath = fileReference.absolutePath();
@SuppressWarnings("resource")
FileIO fileIO = isS3(absolutePath) ? s3() : resolvingFileIO();
FileIO fileIO = resolvingFileIO();
fileIO.deleteFile(absolutePath.toString());
return DeleteResult.SUCCESS;
} catch (Exception e) {
Expand All @@ -218,15 +211,15 @@ public DeleteResult delete(FileReference fileReference) {
public DeleteSummary deleteMultiple(URI baseUri, Stream<FileReference> fileObjects) {
Stream<String> filesAsStrings = filesAsStrings(fileObjects);

if (isS3(baseUri)) {
if (canBulkAndPrefix(baseUri)) {
return s3DeleteMultiple(filesAsStrings);
}
return hadoopDeleteMultiple(filesAsStrings);
}

private DeleteSummary s3DeleteMultiple(Stream<String> filesAsStrings) {
@SuppressWarnings("resource")
S3FileIO fileIo = s3();
SupportsBulkOperations fileIo = (SupportsBulkOperations) resolvingFileIO();

List<String> files = filesAsStrings.collect(Collectors.toList());
long failed = 0L;
Expand Down
1 change: 1 addition & 0 deletions gc/gc-iceberg-inttest/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ dependencies {
intTestRuntimeOnly(libs.google.cloud.gcs.connector)

intTestImplementation(nessieProject("nessie-azurite-testcontainer"))
intTestImplementation(nessieProject("nessie-gcs-testcontainer"))
intTestRuntimeOnly(libs.hadoop.azure)

intTestCompileOnly(libs.immutables.builder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,57 +15,29 @@
*/
package org.projectnessie.gc.iceberg.inttest;

import com.google.cloud.NoCredentials;
import com.google.cloud.ServiceOptions;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.gcp.GCPProperties;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.projectnessie.gc.iceberg.files.IcebergFiles;
import org.projectnessie.testing.gcs.GCSContainer;

@Disabled("There is no official emulator for Google's Cloud Storage")
// There is one emulator though: docker.io/oittaa/gcp-storage-emulator /
// https://github.com/oittaa/gcp-storage-emulator
// That one seems to work, but then GCS authn in Iceberg has no support for "no credentials" - so
// testing is currently impossible - and this test class is unfinished.
public class ITSparkIcebergNessieGCP extends AbstractITSparkIcebergNessieObjectStorage {

private static GCSContainer gcsContainer;
private static Storage gcsService;
private static String googleProjectId;
private static String googleServiceHost;

private static String bucket;
private static String bucketUri;

@BeforeAll
static void setupGcs() {
googleProjectId = "prj" + ThreadLocalRandom.current().nextInt(100000);
googleServiceHost = "http://[::1]:8080";

gcsService =
StorageOptions.newBuilder()
.setProjectId(googleProjectId)
.setHost(googleServiceHost)
.setCredentials(NoCredentials.getInstance())
.setRetrySettings(ServiceOptions.getNoRetrySettings())
.build()
.getService();
gcsContainer = new GCSContainer();
gcsContainer.start();

bucket = "bucket" + ThreadLocalRandom.current().nextInt(100000);
bucketUri = String.format("gs://%s/", bucket);

gcsService.create(BucketInfo.of(bucket));
gcsService = gcsContainer.newStorage();
}

@AfterAll
Expand All @@ -75,49 +47,38 @@ static void stopGcs() throws Exception {

@Override
protected String warehouseURI() {
return bucketUri;
return gcsContainer.bucketUri();
}

@Override
protected Map<String, String> sparkHadoop() {
Map<String, String> r = new HashMap<>();
r.put("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem");
r.put("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS");
return r;
return gcsContainer.hadoopConfig();
}

@Override
protected Map<String, String> nessieParams() {
Map<String, String> r = new HashMap<>(super.nessieParams());
r.put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.gcp.gcs.GCSFileIO");
r.put(GCPProperties.GCS_PROJECT_ID, googleProjectId);
r.put(GCPProperties.GCS_SERVICE_HOST, googleServiceHost);
r.putAll(gcsContainer.icebergProperties());
return r;
}

@AfterEach
void purgeGcs() {
gcsService.list(bucket).iterateAll().forEach(Blob::delete);
gcsService.list(gcsContainer.bucket()).iterateAll().forEach(Blob::delete);
}

@Override
IcebergFiles icebergFiles() {
Map<String, String> props = new HashMap<>();
// props.put("s3.access-key-id", accessKey());
// props.put("s3.secret-access-key", secretKey());
// props.put("s3.endpoint", s3endpoint());
// props.put("http-client.type", "urlconnection");
Map<String, String> props = gcsContainer.icebergProperties();

Configuration conf = new Configuration();
// conf.set("fs.s3a.access.key", accessKey());
// conf.set("fs.s3a.secret.key", secretKey());
// conf.set("fs.s3a.endpoint", s3endpoint());
gcsContainer.hadoopConfig().forEach(conf::set);

return IcebergFiles.builder().properties(props).hadoopConfiguration(conf).build();
}

@Override
protected URI s3BucketUri() {
return URI.create(bucketUri);
return URI.create(gcsContainer.bucketUri());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public abstract class IcebergContentToFiles implements ContentToFiles {
private static final Logger LOGGER = LoggerFactory.getLogger(IcebergContentToFiles.class);
public static final String S3_KEY_NOT_FOUND =
"software.amazon.awssdk.services.s3.model.NoSuchKeyException";
public static final String GCS_STORAGE_EXCEPTION = "com.google.cloud.storage.StorageException";
public static final String GCS_NOT_FOUND_START = "404 Not Found";

public static Builder builder() {
return ImmutableIcebergContentToFiles.builder();
Expand Down Expand Up @@ -77,10 +79,29 @@ public Stream<FileReference> extractFiles(ContentReference contentReference) {
try {
tableMetadata = TableMetadataParser.read(io, contentReference.metadataLocation());
} catch (Exception notFoundCandidate) {
boolean notFound = false;
if (notFoundCandidate instanceof NotFoundException
// Iceberg does not map software.amazon.awssdk.services.s3.model.NoSuchKeyException to
// its native org.apache.iceberg.exceptions.NotFoundException,
|| S3_KEY_NOT_FOUND.equals(notFoundCandidate.getClass().getName())) {
notFound = true;
} else {
for (Throwable c = notFoundCandidate.getCause(); ; ) {
if (GCS_STORAGE_EXCEPTION.equals(c.getClass().getName())
&& c.getMessage().startsWith(GCS_NOT_FOUND_START)) {
notFound = true;
break;
}

Throwable next = c.getCause();
if (c == c.getCause()) {
break;
}
c = next;
}
}

if (notFound) {
// It is safe to assume that a missing table-metadata means no referenced files.
// A table-metadata can be missing, because a previous Nessie GC "sweep" phase deleted it.
LOGGER.info(
Expand All @@ -91,6 +112,7 @@ public Stream<FileReference> extractFiles(ContentReference contentReference) {
contentReference.commitId());
return Stream.empty();
}

throw new RuntimeException(notFoundCandidate);
}

Expand Down
1 change: 1 addition & 0 deletions gradle/projects.main.properties
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ nessie-model=api/model
nessie-gc-base=gc/gc-base
nessie-gc-base-tests=gc/gc-base-tests
nessie-gc-repository-jdbc=gc/gc-repository-jdbc
nessie-gcs-testcontainer=testing/gcs-container
nessie-perftest-gatling=perftest/gatling
nessie-perftest-simulations=perftest/simulations
nessie-jaxrs=servers/jax-rs
Expand Down
35 changes: 35 additions & 0 deletions testing/gcs-container/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (C) 2023 Dremio
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

plugins { id("nessie-conventions-iceberg") }

extra["maven.name"] = "Nessie - GCS testcontainer"

dependencies {
implementation(libs.slf4j.api)
api(platform(libs.testcontainers.bom))
api("org.testcontainers:testcontainers")

api(platform(libs.google.cloud.storage.bom))
api("com.google.cloud:google-cloud-storage")

compileOnly(libs.jakarta.annotation.api)
compileOnly(libs.findbugs.jsr305)
compileOnly(libs.errorprone.annotations)

compileOnly(libs.immutables.value.annotations)
annotationProcessor(libs.immutables.value.processor)
}

0 comments on commit e5d1de1

Please sign in to comment.