From 0d36a84e1a816b6efa46245fb62d4d2c7cf9f68b Mon Sep 17 00:00:00 2001 From: Robert Stupp Date: Fri, 21 Jun 2024 12:00:55 +0200 Subject: [PATCH] Committing operations: cleanup metadata after failures Fixes #8728 --- .../catalog/files/api/ObjectIO.java | 3 + .../catalog/files/ResolvingObjectIO.java | 40 ++++- .../files/adls/AdlsClientSupplier.java | 2 +- .../catalog/files/adls/AdlsObjectIO.java | 56 +++++- .../catalog/files/gcs/GcsObjectIO.java | 33 ++++ .../catalog/files/local/LocalObjectIO.java | 21 +++ .../catalog/files/s3/S3ClientSupplier.java | 4 +- .../catalog/files/s3/S3ObjectIO.java | 34 ++++ .../catalog/files/AbstractClients.java | 12 +- .../catalog/files/TestResolvingObjectIO.java | 129 +++++++++++++ .../service/impl/CatalogServiceImpl.java | 169 +++++++----------- .../catalog/service/impl/IcebergStuff.java | 2 + .../service/impl/MultiTableUpdate.java | 76 ++++++++ 13 files changed, 469 insertions(+), 112 deletions(-) create mode 100644 catalog/files/impl/src/test/java/org/projectnessie/catalog/files/TestResolvingObjectIO.java create mode 100644 catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/MultiTableUpdate.java diff --git a/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIO.java b/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIO.java index 82d55d29ed..11525ef6c2 100644 --- a/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIO.java +++ b/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIO.java @@ -18,10 +18,13 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.List; import org.projectnessie.storage.uri.StorageUri; public interface ObjectIO { InputStream readObject(StorageUri uri) throws IOException; OutputStream writeObject(StorageUri uri) throws IOException; + + void deleteObjects(List uris) throws IOException; } diff --git a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/ResolvingObjectIO.java b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/ResolvingObjectIO.java index b6fc5ee311..d8a6ba86f7 100644 --- a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/ResolvingObjectIO.java +++ b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/ResolvingObjectIO.java @@ -15,7 +15,12 @@ */ package org.projectnessie.catalog.files; +import java.io.IOException; import java.time.Clock; +import java.util.ArrayList; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; import org.projectnessie.catalog.files.adls.AdlsClientSupplier; import org.projectnessie.catalog.files.adls.AdlsObjectIO; import org.projectnessie.catalog.files.api.ObjectIO; @@ -34,9 +39,17 @@ public ResolvingObjectIO( S3ClientSupplier s3ClientSupplier, AdlsClientSupplier adlsClientSupplier, GcsStorageSupplier gcsStorageSupplier) { - this.s3ObjectIO = new S3ObjectIO(s3ClientSupplier, Clock.systemUTC()); - this.gcsObjectIO = new GcsObjectIO(gcsStorageSupplier); - this.adlsObjectIO = new AdlsObjectIO(adlsClientSupplier); + this( + new S3ObjectIO(s3ClientSupplier, Clock.systemUTC()), + new GcsObjectIO(gcsStorageSupplier), + new AdlsObjectIO(adlsClientSupplier)); + } + + public ResolvingObjectIO( + S3ObjectIO s3ObjectIO, GcsObjectIO gcsObjectIO, AdlsObjectIO adlsObjectIO) { + this.s3ObjectIO = s3ObjectIO; + this.gcsObjectIO = gcsObjectIO; + this.adlsObjectIO = adlsObjectIO; } @Override @@ -59,4 +72,25 @@ protected ObjectIO resolve(StorageUri uri) { throw new IllegalArgumentException("Unknown or unsupported scheme: " + scheme); } } + + @Override + public void deleteObjects(List uris) throws IOException { + IOException ex = null; + Map> perObjectIO = new IdentityHashMap<>(); + uris.forEach(uri -> perObjectIO.computeIfAbsent(resolve(uri), x -> new ArrayList<>()).add(uri)); + for (Map.Entry> perObjIO : perObjectIO.entrySet()) { + try { + perObjIO.getKey().deleteObjects(perObjIO.getValue()); + } catch (IOException e) { + if (ex == null) { + ex = e; + } else { + ex.addSuppressed(e); + } + } + } + if (ex != null) { + throw ex; + } + } } diff --git a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/adls/AdlsClientSupplier.java b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/adls/AdlsClientSupplier.java index 84c9587b84..b314f63a76 100644 --- a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/adls/AdlsClientSupplier.java +++ b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/adls/AdlsClientSupplier.java @@ -61,7 +61,7 @@ public DataLakeFileClient fileClientForLocation(StorageUri uri) { return fileSystem.getFileClient(path); } - private DataLakeFileSystemClient fileSystemClient(AdlsLocation location) { + DataLakeFileSystemClient fileSystemClient(AdlsLocation location) { ConfigurationBuilder clientConfig = new ConfigurationBuilder(); adlsOptions.configurationOptions().forEach(clientConfig::putProperty); diff --git a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/adls/AdlsObjectIO.java b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/adls/AdlsObjectIO.java index b1a9fc665e..f55019265f 100644 --- a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/adls/AdlsObjectIO.java +++ b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/adls/AdlsObjectIO.java @@ -15,17 +15,26 @@ */ package org.projectnessie.catalog.files.adls; +import com.azure.storage.blob.models.BlobStorageException; import com.azure.storage.blob.models.ParallelTransferOptions; import com.azure.storage.file.datalake.DataLakeFileClient; +import com.azure.storage.file.datalake.DataLakeFileSystemClient; import com.azure.storage.file.datalake.options.DataLakeFileInputStreamOptions; import com.azure.storage.file.datalake.options.DataLakeFileOutputStreamOptions; import java.io.BufferedOutputStream; +import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import org.projectnessie.catalog.files.api.ObjectIO; import org.projectnessie.storage.uri.StorageUri; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class AdlsObjectIO implements ObjectIO { + private static final Logger LOGGER = LoggerFactory.getLogger(AdlsObjectIO.class); private final AdlsClientSupplier clientSupplier; @@ -34,11 +43,18 @@ public AdlsObjectIO(AdlsClientSupplier clientSupplier) { } @Override - public InputStream readObject(StorageUri uri) { + public InputStream readObject(StorageUri uri) throws IOException { DataLakeFileClient file = clientSupplier.fileClientForLocation(uri); DataLakeFileInputStreamOptions options = new DataLakeFileInputStreamOptions(); clientSupplier.adlsOptions().readBlockSize().ifPresent(options::setBlockSize); - return file.openInputStream(options).getInputStream(); + try { + return file.openInputStream(options).getInputStream(); + } catch (BlobStorageException e) { + if (e.getStatusCode() == 404) { + throw new IOException(e.getServiceMessage(), e); + } + throw e; + } } @Override @@ -50,4 +66,40 @@ public OutputStream writeObject(StorageUri uri) { options.setParallelTransferOptions(transferOptions); return new BufferedOutputStream(file.getOutputStream(options)); } + + @Override + public void deleteObjects(List uris) throws IOException { + // Note: the default container is mapped to an empty string + Map> bucketToUris = + uris.stream() + .map(AdlsLocation::adlsLocation) + .collect(Collectors.groupingBy(l -> l.container().orElse(""))); + + IOException ex = null; + for (List locations : bucketToUris.values()) { + DataLakeFileSystemClient fileSystem = clientSupplier.fileSystemClient(locations.get(0)); + + // No batch-delete ... yay + for (AdlsLocation location : locations) { + String path = location.path(); + if (path.startsWith("/")) { + path = path.substring(1); + } + try { + fileSystem.deleteFileIfExists(path); + } catch (BlobStorageException e) { + if (e.getStatusCode() != 404) { + if (ex == null) { + ex = new IOException(e.getServiceMessage(), e); + } else { + ex.addSuppressed(e); + } + } + } + } + } + if (ex != null) { + throw ex; + } + } } diff --git a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/gcs/GcsObjectIO.java b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/gcs/GcsObjectIO.java index 9b9c2d3860..e84406c70b 100644 --- a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/gcs/GcsObjectIO.java +++ b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/gcs/GcsObjectIO.java @@ -29,11 +29,17 @@ import java.nio.channels.Channels; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import org.projectnessie.catalog.files.api.ObjectIO; import org.projectnessie.catalog.secrets.KeySecret; import org.projectnessie.storage.uri.StorageUri; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class GcsObjectIO implements ObjectIO { + private static final Logger LOGGER = LoggerFactory.getLogger(GcsObjectIO.class); + private final GcsStorageSupplier storageSupplier; public GcsObjectIO(GcsStorageSupplier storageSupplier) { @@ -81,4 +87,31 @@ public OutputStream writeObject(StorageUri uri) { bucketOptions.writeChunkSize().ifPresent(channel::setChunkSize); return Channels.newOutputStream(channel); } + + @Override + public void deleteObjects(List uris) { + Map> bucketToUris = + uris.stream() + .map(GcsLocation::gcsLocation) + .collect(Collectors.groupingBy(GcsLocation::bucket)); + + for (List locations : bucketToUris.values()) { + GcsBucketOptions bucketOptions = storageSupplier.bucketOptions(locations.get(0)); + @SuppressWarnings("resource") + Storage client = storageSupplier.forLocation(bucketOptions); + + List blobIds = + locations.stream() + .map(location -> BlobId.of(location.bucket(), location.path())) + .collect(Collectors.toList()); + + // This is rather a hack to make the `AbstractClients` test pass, because our object storage + // mock doesn't implement GCS batch requests (yet). + if (blobIds.size() == 1) { + client.delete(blobIds.get(0)); + } else { + client.delete(blobIds); + } + } + } } diff --git a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/local/LocalObjectIO.java b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/local/LocalObjectIO.java index 5dc0ef0ae3..122bf150a8 100644 --- a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/local/LocalObjectIO.java +++ b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/local/LocalObjectIO.java @@ -22,6 +22,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.List; import org.projectnessie.catalog.files.api.ObjectIO; import org.projectnessie.storage.uri.StorageUri; @@ -44,6 +45,26 @@ public OutputStream writeObject(StorageUri uri) throws IOException { } } + @Override + public void deleteObjects(List uris) throws IOException { + IOException ex = null; + for (StorageUri uri : uris) { + Path path = LocalObjectIO.filePath(uri); + try { + Files.deleteIfExists(path); + } catch (IOException e) { + if (ex == null) { + ex = e; + } else { + ex.addSuppressed(e); + } + } + } + if (ex != null) { + throw ex; + } + } + private static Path filePath(StorageUri uri) { return Paths.get(uri.requiredPath()); } diff --git a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ClientSupplier.java b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ClientSupplier.java index 087a15fba3..af29599346 100644 --- a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ClientSupplier.java +++ b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ClientSupplier.java @@ -81,8 +81,10 @@ public S3Client getClient(StorageUri location) { checkArgument(isS3scheme(scheme), "Invalid S3 scheme: %s", location); String bucketName = location.requiredAuthority(); - // Supply an empty profile file + return getClient(bucketName); + } + public S3Client getClient(String bucketName) { S3BucketOptions bucketOptions = s3options.effectiveOptionsForBucket(Optional.of(bucketName), secretsProvider); diff --git a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ObjectIO.java b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ObjectIO.java index 97db347e0e..5cc6c0e8e5 100644 --- a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ObjectIO.java +++ b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ObjectIO.java @@ -25,17 +25,26 @@ import java.io.OutputStream; import java.time.Clock; import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import org.projectnessie.catalog.files.api.BackendThrottledException; import org.projectnessie.catalog.files.api.NonRetryableException; import org.projectnessie.catalog.files.api.ObjectIO; import org.projectnessie.storage.uri.StorageUri; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.exception.SdkServiceException; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.Delete; +import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.ObjectIdentifier; import software.amazon.awssdk.services.s3.model.PutObjectRequest; public class S3ObjectIO implements ObjectIO { + private static final Logger LOGGER = LoggerFactory.getLogger(S3ObjectIO.class); private final S3ClientSupplier s3clientSupplier; private final Clock clock; @@ -94,6 +103,31 @@ public void close() throws IOException { }; } + @Override + public void deleteObjects(List uris) { + Map> bucketToUris = + uris.stream().collect(Collectors.groupingBy(StorageUri::requiredAuthority)); + + for (Map.Entry> bucketDeletes : bucketToUris.entrySet()) { + String bucket = bucketDeletes.getKey(); + S3Client s3client = s3clientSupplier.getClient(bucket); + + List locations = bucketDeletes.getValue(); + List objectIdentifiers = + locations.stream() + .map(S3ObjectIO::withoutLeadingSlash) + .map(key -> ObjectIdentifier.builder().key(key).build()) + .collect(Collectors.toList()); + + DeleteObjectsRequest.Builder deleteObjectsRequest = + DeleteObjectsRequest.builder() + .bucket(bucket) + .delete(Delete.builder().objects(objectIdentifiers).build()); + + s3client.deleteObjects(deleteObjectsRequest.build()); + } + } + private static String withoutLeadingSlash(StorageUri uri) { String path = uri.requiredPath(); return path.startsWith("/") ? path.substring(1) : path; diff --git a/catalog/files/impl/src/test/java/org/projectnessie/catalog/files/AbstractClients.java b/catalog/files/impl/src/test/java/org/projectnessie/catalog/files/AbstractClients.java index a9e4da9c3f..da1503ca44 100644 --- a/catalog/files/impl/src/test/java/org/projectnessie/catalog/files/AbstractClients.java +++ b/catalog/files/impl/src/test/java/org/projectnessie/catalog/files/AbstractClients.java @@ -18,8 +18,10 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.projectnessie.objectstoragemock.HeapStorageBucket.newHeapStorageBucket; +import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.List; import org.assertj.core.api.SoftAssertions; import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; @@ -38,8 +40,9 @@ public abstract class AbstractClients { public static final String BUCKET_1 = "bucket1"; public static final String BUCKET_2 = "bucket2"; + @SuppressWarnings("resource") @Test - public void writeRead() throws Exception { + public void writeReadDelete() throws Exception { try (ObjectStorageMock.MockServer server1 = ObjectStorageMock.builder() .putBuckets(BUCKET_1, newHeapStorageBucket().bucket()) @@ -58,6 +61,13 @@ public void writeRead() throws Exception { response1 = new String(input.readAllBytes()); } soft.assertThat(response1).isEqualTo("hello world"); + + objectIO.deleteObjects(List.of(uri)); + // should not throw + objectIO.deleteObjects(List.of(uri)); + + soft.assertThatThrownBy(() -> objectIO.readObject(uri).readAllBytes()) + .isInstanceOf(IOException.class); } } diff --git a/catalog/files/impl/src/test/java/org/projectnessie/catalog/files/TestResolvingObjectIO.java b/catalog/files/impl/src/test/java/org/projectnessie/catalog/files/TestResolvingObjectIO.java new file mode 100644 index 0000000000..9b3e8d917d --- /dev/null +++ b/catalog/files/impl/src/test/java/org/projectnessie/catalog/files/TestResolvingObjectIO.java @@ -0,0 +1,129 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.catalog.files; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayInputStream; +import java.util.List; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.projectnessie.catalog.files.adls.AdlsObjectIO; +import org.projectnessie.catalog.files.gcs.GcsObjectIO; +import org.projectnessie.catalog.files.s3.S3ObjectIO; +import org.projectnessie.storage.uri.StorageUri; + +@ExtendWith({SoftAssertionsExtension.class, MockitoExtension.class}) +public class TestResolvingObjectIO { + @InjectSoftAssertions protected SoftAssertions soft; + + @Mock protected S3ObjectIO s3ObjectIO; + @Mock protected AdlsObjectIO adlsObjectIO; + @Mock protected GcsObjectIO gcsObjectIO; + + protected ResolvingObjectIO resolvingObjectIO; + + @BeforeEach + protected void setup() { + resolvingObjectIO = new ResolvingObjectIO(s3ObjectIO, gcsObjectIO, adlsObjectIO); + } + + @Test + public void resolve() { + soft.assertThat(resolvingObjectIO.resolve(StorageUri.of("s3://foo/bar"))).isSameAs(s3ObjectIO); + soft.assertThat(resolvingObjectIO.resolve(StorageUri.of("s3a://foo/bar"))).isSameAs(s3ObjectIO); + soft.assertThat(resolvingObjectIO.resolve(StorageUri.of("s3n://foo/bar"))).isSameAs(s3ObjectIO); + soft.assertThat(resolvingObjectIO.resolve(StorageUri.of("gs://foo/bar"))).isSameAs(gcsObjectIO); + soft.assertThat(resolvingObjectIO.resolve(StorageUri.of("abfs://foo/bar"))) + .isSameAs(adlsObjectIO); + soft.assertThat(resolvingObjectIO.resolve(StorageUri.of("abfss://foo/bar"))) + .isSameAs(adlsObjectIO); + soft.assertThatIllegalArgumentException() + .isThrownBy(() -> resolvingObjectIO.resolve(StorageUri.of("http://foo/bar"))); + } + + @Test + public void read() throws Exception { + StorageUri uri = StorageUri.of("s3://foo/bar"); + when(s3ObjectIO.readObject(uri)) + .thenReturn(new ByteArrayInputStream("hello s3".getBytes(UTF_8))); + + soft.assertThat(resolvingObjectIO.readObject(uri)).hasContent("hello s3"); + + verify(s3ObjectIO).readObject(uri); + verifyNoMoreInteractions(s3ObjectIO); + verifyNoInteractions(gcsObjectIO); + verifyNoInteractions(adlsObjectIO); + } + + @Test + public void deleteSingle() throws Exception { + StorageUri uri = StorageUri.of("s3://foo/bar"); + + resolvingObjectIO.deleteObjects(List.of(uri)); + + verify(s3ObjectIO).deleteObjects(List.of(uri)); + + verifyNoMoreInteractions(s3ObjectIO); + verifyNoInteractions(gcsObjectIO); + verifyNoInteractions(adlsObjectIO); + } + + @Test + public void deleteMultiple() throws Exception { + StorageUri uri1 = StorageUri.of("s3://foo/bar1"); + StorageUri uri2 = StorageUri.of("s3://foo/bar2"); + StorageUri uri3 = StorageUri.of("s3://foo/bar3"); + + resolvingObjectIO.deleteObjects(List.of(uri1, uri2, uri3)); + + verify(s3ObjectIO).deleteObjects(List.of(uri1, uri2, uri3)); + + verifyNoMoreInteractions(s3ObjectIO); + verifyNoInteractions(gcsObjectIO); + verifyNoInteractions(adlsObjectIO); + } + + @Test + public void deleteMultipleAllTypes() throws Exception { + StorageUri uri1 = StorageUri.of("s3://foo/bar1"); + StorageUri uri2 = StorageUri.of("gs://foo/bar2"); + StorageUri uri3 = StorageUri.of("abfs://foo/bar3"); + StorageUri uri4 = StorageUri.of("s3://foo/bar4"); + StorageUri uri5 = StorageUri.of("gs://foo/bar5"); + StorageUri uri6 = StorageUri.of("abfs://foo/bar6"); + + resolvingObjectIO.deleteObjects(List.of(uri1, uri2, uri3, uri4, uri5, uri6)); + + verify(s3ObjectIO).deleteObjects(List.of(uri1, uri4)); + verify(gcsObjectIO).deleteObjects(List.of(uri2, uri5)); + verify(adlsObjectIO).deleteObjects(List.of(uri3, uri6)); + + verifyNoMoreInteractions(s3ObjectIO); + verifyNoMoreInteractions(gcsObjectIO); + verifyNoMoreInteractions(adlsObjectIO); + } +} diff --git a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/CatalogServiceImpl.java b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/CatalogServiceImpl.java index 5658c59d33..01c8d1750e 100644 --- a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/CatalogServiceImpl.java +++ b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/CatalogServiceImpl.java @@ -54,6 +54,7 @@ import java.util.concurrent.Executor; import java.util.function.Consumer; import java.util.function.Supplier; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.projectnessie.api.v2.params.ParsedReference; import org.projectnessie.catalog.files.api.ObjectIO; @@ -81,12 +82,11 @@ import org.projectnessie.catalog.service.api.SnapshotResponse; import org.projectnessie.catalog.service.config.CatalogConfig; import org.projectnessie.catalog.service.config.WarehouseConfig; +import org.projectnessie.catalog.service.impl.MultiTableUpdate.SingleTableUpdate; import org.projectnessie.client.api.CommitMultipleOperationsBuilder; import org.projectnessie.client.api.GetContentBuilder; import org.projectnessie.client.api.NessieApiV2; import org.projectnessie.error.BaseNessieClientServerException; -import org.projectnessie.error.ErrorCode; -import org.projectnessie.error.ImmutableNessieError; import org.projectnessie.error.NessieContentNotFoundException; import org.projectnessie.error.NessieNotFoundException; import org.projectnessie.error.NessieReferenceConflictException; @@ -100,8 +100,6 @@ import org.projectnessie.model.GetMultipleContentsResponse; import org.projectnessie.model.IcebergTable; import org.projectnessie.model.IcebergView; -import org.projectnessie.model.Namespace; -import org.projectnessie.model.Operation; import org.projectnessie.model.Reference; import org.projectnessie.nessie.tasks.api.TasksService; import org.projectnessie.storage.uri.StorageUri; @@ -399,41 +397,50 @@ public CompletionStage> commit( CompletionStage committedStage = commitBuilderStage.thenCompose( updates -> { + CommitResponse commitResponse; try { - CommitResponse commitResponse = multiTableUpdate.nessieCommit.commitWithResponse(); - Map addedContentsMap = - commitResponse.getAddedContents() != null - ? commitResponse.toAddedContentsMap() - : emptyMap(); - CompletionStage> current = null; - for (SingleTableUpdate tableUpdate : multiTableUpdate.tableUpdates) { - Content content = tableUpdate.content; - if (content.getId() == null) { - content = content.withId(addedContentsMap.get(tableUpdate.key)); - } - NessieId snapshotId = objIdToNessieId(snapshotIdFromContent(content)); - - // Although the `TasksService` triggers the operation regardless of whether the - // `CompletionStage` returned by `storeSnapshot()` is consumed, we have to "wait" - // for those to complete so that we keep the "request scoped context" alive. - CompletionStage> stage = - icebergStuff.storeSnapshot(tableUpdate.snapshot.withId(snapshotId), content); - if (current == null) { - current = stage; - } else { - current = current.thenCombine(stage, (snap1, snap2) -> snap1); - } - } - return requireNonNull(current).thenApply(x -> commitResponse); + commitResponse = multiTableUpdate.nessieCommit().commitWithResponse(); } catch (Exception e) { - // TODO cleanup files that were written but are now obsolete/unreferenced + try { + objectIO.deleteObjects( + multiTableUpdate.storedLocations().stream() + .map(StorageUri::of) + .collect(Collectors.toList())); + } catch (Exception ed) { + e.addSuppressed(ed); + } throw new RuntimeException(e); } + Map addedContentsMap = + commitResponse.getAddedContents() != null + ? commitResponse.toAddedContentsMap() + : emptyMap(); + CompletionStage> current = null; + for (SingleTableUpdate tableUpdate : multiTableUpdate.tableUpdates()) { + Content content = tableUpdate.content; + if (content.getId() == null) { + // Need the content-ID especially to (eagerly) build the `NessieEntitySnapshot`. + content = content.withId(addedContentsMap.get(tableUpdate.key)); + } + NessieId snapshotId = objIdToNessieId(snapshotIdFromContent(content)); + + // Although the `TasksService` triggers the operation regardless of whether the + // `CompletionStage` returned by `storeSnapshot()` is consumed, we have to "wait" + // for those to complete so that we keep the "request scoped context" alive. + CompletionStage> stage = + icebergStuff.storeSnapshot(tableUpdate.snapshot.withId(snapshotId), content); + if (current == null) { + current = stage; + } else { + current = current.thenCombine(stage, (snap1, snap2) -> snap1); + } + } + return requireNonNull(current).thenApply(x -> commitResponse); }); return committedStage.thenApply( commitResponse -> - multiTableUpdate.tableUpdates.stream() + multiTableUpdate.tableUpdates().stream() .map( singleTableUpdate -> snapshotResponse( @@ -477,8 +484,7 @@ private CompletionStage applyIcebergTableCommitOperation( CatalogOperation op, Content content, MultiTableUpdate multiTableUpdate, - CompletionStage commitBuilderStage) - throws NessieContentNotFoundException { + CompletionStage commitBuilderStage) { // TODO serialize the changes as well, so that we can retrieve those later for content-aware // merges and automatic conflict resolution. @@ -521,17 +527,11 @@ private CompletionStage applyIcebergTableCommitOperation( String metadataJsonLocation = icebergMetadataJsonLocation(nessieSnapshot.icebergLocation()); IcebergTableMetadata icebergMetadata = - storeTableSnapshot(metadataJsonLocation, nessieSnapshot); + storeTableSnapshot(metadataJsonLocation, nessieSnapshot, multiTableUpdate); Content updated = icebergMetadataToContent(metadataJsonLocation, icebergMetadata, contentId); - ObjId snapshotId; - try { - snapshotId = snapshotIdFromContent(updated); - } catch (Exception e) { - e.printStackTrace(); - return null; - } + ObjId snapshotId = snapshotIdFromContent(updated); nessieSnapshot = nessieSnapshot.withId(objIdToNessieId(snapshotId)); SingleTableUpdate singleTableUpdate = @@ -551,8 +551,7 @@ private CompletionStage applyIcebergViewCommitOperation( CatalogOperation op, Content content, MultiTableUpdate multiTableUpdate, - CompletionStage commitBuilderStage) - throws NessieContentNotFoundException { + CompletionStage commitBuilderStage) { // TODO serialize the changes as well, so that we can retrieve those later for content-aware // merges and automatic conflict resolution. @@ -595,16 +594,10 @@ private CompletionStage applyIcebergViewCommitOperation( String metadataJsonLocation = icebergMetadataJsonLocation(nessieSnapshot.icebergLocation()); IcebergViewMetadata icebergMetadata = - storeViewSnapshot(metadataJsonLocation, nessieSnapshot); + storeViewSnapshot(metadataJsonLocation, nessieSnapshot, multiTableUpdate); Content updated = icebergMetadataToContent(metadataJsonLocation, icebergMetadata, contentId); - ObjId snapshotId; - try { - snapshotId = snapshotIdFromContent(updated); - } catch (Exception e) { - e.printStackTrace(); - return null; - } + ObjId snapshotId = snapshotIdFromContent(updated); nessieSnapshot = nessieSnapshot.withId(objIdToNessieId(snapshotId)); SingleTableUpdate singleTableUpdate = @@ -646,70 +639,43 @@ private List pruneUpdates(IcebergCatalogOperation op, boo return prunedUpdates; } - static final class MultiTableUpdate { - final CommitMultipleOperationsBuilder nessieCommit; - final List tableUpdates = new ArrayList<>(); - - MultiTableUpdate(CommitMultipleOperationsBuilder nessieCommit) { - this.nessieCommit = nessieCommit; - } - - void addUpdate(ContentKey key, SingleTableUpdate singleTableUpdate) { - synchronized (this) { - tableUpdates.add(singleTableUpdate); - nessieCommit.operation(Operation.Put.of(key, singleTableUpdate.content)); - } - } - } - - static final class SingleTableUpdate { - final NessieEntitySnapshot snapshot; - final Content content; - final ContentKey key; - - SingleTableUpdate(NessieEntitySnapshot snapshot, Content content, ContentKey key) { - this.snapshot = snapshot; - this.content = content; - this.key = key; - } - } - - private CompletionStage loadExistingTableSnapshot(Content content) - throws NessieContentNotFoundException { + private CompletionStage loadExistingTableSnapshot(Content content) { ObjId snapshotId = snapshotIdFromContent(content); return new IcebergStuff(objectIO, persist, tasksService, executor) .retrieveIcebergSnapshot(snapshotId, content); } - private CompletionStage loadExistingViewSnapshot(Content content) - throws NessieContentNotFoundException { + private CompletionStage loadExistingViewSnapshot(Content content) { ObjId snapshotId = snapshotIdFromContent(content); return new IcebergStuff(objectIO, persist, tasksService, executor) .retrieveIcebergSnapshot(snapshotId, content); } private IcebergTableMetadata storeTableSnapshot( - String metadataJsonLocation, NessieTableSnapshot snapshot) { + String metadataJsonLocation, + NessieTableSnapshot snapshot, + MultiTableUpdate multiTableUpdate) { IcebergTableMetadata tableMetadata = nessieTableSnapshotToIceberg(snapshot, Optional.empty(), p -> {}); - try (OutputStream out = objectIO.writeObject(StorageUri.of(metadataJsonLocation))) { - IcebergJson.objectMapper().writeValue(out, tableMetadata); - } catch (IOException ex) { - throw new RuntimeException(ex); - } - return tableMetadata; + return storeSnapshot(metadataJsonLocation, tableMetadata, multiTableUpdate); } private IcebergViewMetadata storeViewSnapshot( - String metadataJsonLocation, NessieViewSnapshot snapshot) { + String metadataJsonLocation, NessieViewSnapshot snapshot, MultiTableUpdate multiTableUpdate) { IcebergViewMetadata viewMetadata = nessieViewSnapshotToIceberg(snapshot, Optional.empty(), p -> {}); + return storeSnapshot(metadataJsonLocation, viewMetadata, multiTableUpdate); + } + + private M storeSnapshot( + String metadataJsonLocation, M metadata, MultiTableUpdate multiTableUpdate) { + multiTableUpdate.addStoredLocation(metadataJsonLocation); try (OutputStream out = objectIO.writeObject(StorageUri.of(metadataJsonLocation))) { - IcebergJson.objectMapper().writeValue(out, viewMetadata); + IcebergJson.objectMapper().writeValue(out, metadata); } catch (IOException ex) { throw new RuntimeException(ex); } - return viewMetadata; + return metadata; } private static Optional optionalIcebergSpec(OptionalInt specVersion) { @@ -719,7 +685,7 @@ private static Optional optionalIcebergSpec(OptionalInt specVersion } /** Compute the ID for the given Nessie {@link Content} object. */ - private ObjId snapshotIdFromContent(Content content) throws NessieContentNotFoundException { + private ObjId snapshotIdFromContent(Content content) { if (content instanceof IcebergTable) { IcebergTable icebergTable = (IcebergTable) content; return objIdHasher("ContentSnapshot") @@ -734,15 +700,10 @@ private ObjId snapshotIdFromContent(Content content) throws NessieContentNotFoun .hash(icebergView.getVersionId()) .generate(); } - if (content instanceof Namespace) { - throw new NessieContentNotFoundException( - ImmutableNessieError.builder() - .errorCode(ErrorCode.CONTENT_NOT_FOUND) - .message("No snapshots for Namespace: " + content) - .reason("Not a table") - .status(404) - .build()); - } - throw new UnsupportedOperationException("IMPLEMENT ME FOR " + content); + throw new UnsupportedOperationException( + "Support for content with type " + + content.getType() + + " not implemented, content = " + + content); } } diff --git a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/IcebergStuff.java b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/IcebergStuff.java index 8b158c5ec2..110c7610b5 100644 --- a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/IcebergStuff.java +++ b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/IcebergStuff.java @@ -65,6 +65,7 @@ public > CompletionStage retrieveIcebergSna } @SuppressWarnings("unchecked") + @Nonnull private > CompletionStage triggerIcebergSnapshot( EntitySnapshotTaskRequest snapshotTaskRequest) { // TODO Handle hash-collision - when entity-snapshot refers to a different(!) snapshot @@ -85,6 +86,7 @@ private > CompletionStage triggerIcebergSna }); } + @Nonnull public > CompletionStage storeSnapshot( S snapshot, Content content) { EntitySnapshotTaskRequest snapshotTaskRequest = diff --git a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/MultiTableUpdate.java b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/MultiTableUpdate.java new file mode 100644 index 0000000000..6c0afde3da --- /dev/null +++ b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/MultiTableUpdate.java @@ -0,0 +1,76 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.catalog.service.impl; + +import java.util.ArrayList; +import java.util.List; +import org.projectnessie.catalog.model.snapshot.NessieEntitySnapshot; +import org.projectnessie.client.api.CommitMultipleOperationsBuilder; +import org.projectnessie.model.Content; +import org.projectnessie.model.ContentKey; +import org.projectnessie.model.Operation; + +/** Maintains state across all individual updates of a commit. */ +final class MultiTableUpdate { + private final CommitMultipleOperationsBuilder nessieCommit; + private final List tableUpdates = new ArrayList<>(); + private final List storedLocations = new ArrayList<>(); + + MultiTableUpdate(CommitMultipleOperationsBuilder nessieCommit) { + this.nessieCommit = nessieCommit; + } + + CommitMultipleOperationsBuilder nessieCommit() { + return nessieCommit; + } + + List tableUpdates() { + synchronized (this) { + return tableUpdates; + } + } + + List storedLocations() { + synchronized (this) { + return storedLocations; + } + } + + void addUpdate(ContentKey key, SingleTableUpdate singleTableUpdate) { + synchronized (this) { + tableUpdates.add(singleTableUpdate); + nessieCommit.operation(Operation.Put.of(key, singleTableUpdate.content)); + } + } + + void addStoredLocation(String location) { + synchronized (this) { + storedLocations.add(location); + } + } + + static final class SingleTableUpdate { + final NessieEntitySnapshot snapshot; + final Content content; + final ContentKey key; + + SingleTableUpdate(NessieEntitySnapshot snapshot, Content content, ContentKey key) { + this.snapshot = snapshot; + this.content = content; + this.key = key; + } + } +}