Skip to content

Commit

Permalink
Committing operations: cleanup metadata after failures
Browse files Browse the repository at this point in the history
  • Loading branch information
snazy committed Jun 21, 2024
1 parent 62819fe commit 0d36a84
Show file tree
Hide file tree
Showing 13 changed files with 469 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import org.projectnessie.storage.uri.StorageUri;

public interface ObjectIO {
InputStream readObject(StorageUri uri) throws IOException;

OutputStream writeObject(StorageUri uri) throws IOException;

void deleteObjects(List<StorageUri> uris) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
*/
package org.projectnessie.catalog.files;

import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import org.projectnessie.catalog.files.adls.AdlsClientSupplier;
import org.projectnessie.catalog.files.adls.AdlsObjectIO;
import org.projectnessie.catalog.files.api.ObjectIO;
Expand All @@ -34,9 +39,17 @@ public ResolvingObjectIO(
S3ClientSupplier s3ClientSupplier,
AdlsClientSupplier adlsClientSupplier,
GcsStorageSupplier gcsStorageSupplier) {
this.s3ObjectIO = new S3ObjectIO(s3ClientSupplier, Clock.systemUTC());
this.gcsObjectIO = new GcsObjectIO(gcsStorageSupplier);
this.adlsObjectIO = new AdlsObjectIO(adlsClientSupplier);
this(
new S3ObjectIO(s3ClientSupplier, Clock.systemUTC()),
new GcsObjectIO(gcsStorageSupplier),
new AdlsObjectIO(adlsClientSupplier));
}

public ResolvingObjectIO(
S3ObjectIO s3ObjectIO, GcsObjectIO gcsObjectIO, AdlsObjectIO adlsObjectIO) {
this.s3ObjectIO = s3ObjectIO;
this.gcsObjectIO = gcsObjectIO;
this.adlsObjectIO = adlsObjectIO;
}

@Override
Expand All @@ -59,4 +72,25 @@ protected ObjectIO resolve(StorageUri uri) {
throw new IllegalArgumentException("Unknown or unsupported scheme: " + scheme);
}
}

@Override
public void deleteObjects(List<StorageUri> uris) throws IOException {
IOException ex = null;
Map<ObjectIO, List<StorageUri>> perObjectIO = new IdentityHashMap<>();
uris.forEach(uri -> perObjectIO.computeIfAbsent(resolve(uri), x -> new ArrayList<>()).add(uri));
for (Map.Entry<ObjectIO, List<StorageUri>> perObjIO : perObjectIO.entrySet()) {
try {
perObjIO.getKey().deleteObjects(perObjIO.getValue());
} catch (IOException e) {
if (ex == null) {
ex = e;
} else {
ex.addSuppressed(e);
}
}
}
if (ex != null) {
throw ex;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public DataLakeFileClient fileClientForLocation(StorageUri uri) {
return fileSystem.getFileClient(path);
}

private DataLakeFileSystemClient fileSystemClient(AdlsLocation location) {
DataLakeFileSystemClient fileSystemClient(AdlsLocation location) {
ConfigurationBuilder clientConfig = new ConfigurationBuilder();
adlsOptions.configurationOptions().forEach(clientConfig::putProperty);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,26 @@
*/
package org.projectnessie.catalog.files.adls;

import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.models.ParallelTransferOptions;
import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.options.DataLakeFileInputStreamOptions;
import com.azure.storage.file.datalake.options.DataLakeFileOutputStreamOptions;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.projectnessie.catalog.files.api.ObjectIO;
import org.projectnessie.storage.uri.StorageUri;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AdlsObjectIO implements ObjectIO {
private static final Logger LOGGER = LoggerFactory.getLogger(AdlsObjectIO.class);

private final AdlsClientSupplier clientSupplier;

Expand All @@ -34,11 +43,18 @@ public AdlsObjectIO(AdlsClientSupplier clientSupplier) {
}

@Override
public InputStream readObject(StorageUri uri) {
public InputStream readObject(StorageUri uri) throws IOException {
DataLakeFileClient file = clientSupplier.fileClientForLocation(uri);
DataLakeFileInputStreamOptions options = new DataLakeFileInputStreamOptions();
clientSupplier.adlsOptions().readBlockSize().ifPresent(options::setBlockSize);
return file.openInputStream(options).getInputStream();
try {
return file.openInputStream(options).getInputStream();
} catch (BlobStorageException e) {
if (e.getStatusCode() == 404) {
throw new IOException(e.getServiceMessage(), e);
}
throw e;
}
}

@Override
Expand All @@ -50,4 +66,40 @@ public OutputStream writeObject(StorageUri uri) {
options.setParallelTransferOptions(transferOptions);
return new BufferedOutputStream(file.getOutputStream(options));
}

@Override
public void deleteObjects(List<StorageUri> uris) throws IOException {
// Note: the default container is mapped to an empty string
Map<String, List<AdlsLocation>> bucketToUris =
uris.stream()
.map(AdlsLocation::adlsLocation)
.collect(Collectors.groupingBy(l -> l.container().orElse("")));

IOException ex = null;
for (List<AdlsLocation> locations : bucketToUris.values()) {
DataLakeFileSystemClient fileSystem = clientSupplier.fileSystemClient(locations.get(0));

// No batch-delete ... yay
for (AdlsLocation location : locations) {
String path = location.path();
if (path.startsWith("/")) {
path = path.substring(1);
}
try {
fileSystem.deleteFileIfExists(path);
} catch (BlobStorageException e) {
if (e.getStatusCode() != 404) {
if (ex == null) {
ex = new IOException(e.getServiceMessage(), e);
} else {
ex.addSuppressed(e);
}
}
}
}
}
if (ex != null) {
throw ex;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,17 @@
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.projectnessie.catalog.files.api.ObjectIO;
import org.projectnessie.catalog.secrets.KeySecret;
import org.projectnessie.storage.uri.StorageUri;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GcsObjectIO implements ObjectIO {
private static final Logger LOGGER = LoggerFactory.getLogger(GcsObjectIO.class);

private final GcsStorageSupplier storageSupplier;

public GcsObjectIO(GcsStorageSupplier storageSupplier) {
Expand Down Expand Up @@ -81,4 +87,31 @@ public OutputStream writeObject(StorageUri uri) {
bucketOptions.writeChunkSize().ifPresent(channel::setChunkSize);
return Channels.newOutputStream(channel);
}

@Override
public void deleteObjects(List<StorageUri> uris) {
Map<String, List<GcsLocation>> bucketToUris =
uris.stream()
.map(GcsLocation::gcsLocation)
.collect(Collectors.groupingBy(GcsLocation::bucket));

for (List<GcsLocation> locations : bucketToUris.values()) {
GcsBucketOptions bucketOptions = storageSupplier.bucketOptions(locations.get(0));
@SuppressWarnings("resource")
Storage client = storageSupplier.forLocation(bucketOptions);

List<BlobId> blobIds =
locations.stream()
.map(location -> BlobId.of(location.bucket(), location.path()))
.collect(Collectors.toList());

// This is rather a hack to make the `AbstractClients` test pass, because our object storage
// mock doesn't implement GCS batch requests (yet).
if (blobIds.size() == 1) {
client.delete(blobIds.get(0));
} else {
client.delete(blobIds);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import org.projectnessie.catalog.files.api.ObjectIO;
import org.projectnessie.storage.uri.StorageUri;

Expand All @@ -44,6 +45,26 @@ public OutputStream writeObject(StorageUri uri) throws IOException {
}
}

@Override
public void deleteObjects(List<StorageUri> uris) throws IOException {
IOException ex = null;
for (StorageUri uri : uris) {
Path path = LocalObjectIO.filePath(uri);
try {
Files.deleteIfExists(path);
} catch (IOException e) {
if (ex == null) {
ex = e;
} else {
ex.addSuppressed(e);
}
}
}
if (ex != null) {
throw ex;
}
}

private static Path filePath(StorageUri uri) {
return Paths.get(uri.requiredPath());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ public S3Client getClient(StorageUri location) {
checkArgument(isS3scheme(scheme), "Invalid S3 scheme: %s", location);
String bucketName = location.requiredAuthority();

// Supply an empty profile file
return getClient(bucketName);
}

public S3Client getClient(String bucketName) {
S3BucketOptions bucketOptions =
s3options.effectiveOptionsForBucket(Optional.of(bucketName), secretsProvider);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,26 @@
import java.io.OutputStream;
import java.time.Clock;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.projectnessie.catalog.files.api.BackendThrottledException;
import org.projectnessie.catalog.files.api.NonRetryableException;
import org.projectnessie.catalog.files.api.ObjectIO;
import org.projectnessie.storage.uri.StorageUri;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkServiceException;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;

public class S3ObjectIO implements ObjectIO {
private static final Logger LOGGER = LoggerFactory.getLogger(S3ObjectIO.class);

private final S3ClientSupplier s3clientSupplier;
private final Clock clock;
Expand Down Expand Up @@ -94,6 +103,31 @@ public void close() throws IOException {
};
}

@Override
public void deleteObjects(List<StorageUri> uris) {
Map<String, List<StorageUri>> bucketToUris =
uris.stream().collect(Collectors.groupingBy(StorageUri::requiredAuthority));

for (Map.Entry<String, List<StorageUri>> bucketDeletes : bucketToUris.entrySet()) {
String bucket = bucketDeletes.getKey();
S3Client s3client = s3clientSupplier.getClient(bucket);

List<StorageUri> locations = bucketDeletes.getValue();
List<ObjectIdentifier> objectIdentifiers =
locations.stream()
.map(S3ObjectIO::withoutLeadingSlash)
.map(key -> ObjectIdentifier.builder().key(key).build())
.collect(Collectors.toList());

DeleteObjectsRequest.Builder deleteObjectsRequest =
DeleteObjectsRequest.builder()
.bucket(bucket)
.delete(Delete.builder().objects(objectIdentifiers).build());

s3client.deleteObjects(deleteObjectsRequest.build());
}
}

private static String withoutLeadingSlash(StorageUri uri) {
String path = uri.requiredPath();
return path.startsWith("/") ? path.substring(1) : path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.projectnessie.objectstoragemock.HeapStorageBucket.newHeapStorageBucket;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import org.assertj.core.api.SoftAssertions;
import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
Expand All @@ -38,8 +40,9 @@ public abstract class AbstractClients {
public static final String BUCKET_1 = "bucket1";
public static final String BUCKET_2 = "bucket2";

@SuppressWarnings("resource")
@Test
public void writeRead() throws Exception {
public void writeReadDelete() throws Exception {
try (ObjectStorageMock.MockServer server1 =
ObjectStorageMock.builder()
.putBuckets(BUCKET_1, newHeapStorageBucket().bucket())
Expand All @@ -58,6 +61,13 @@ public void writeRead() throws Exception {
response1 = new String(input.readAllBytes());
}
soft.assertThat(response1).isEqualTo("hello world");

objectIO.deleteObjects(List.of(uri));
// should not throw
objectIO.deleteObjects(List.of(uri));

soft.assertThatThrownBy(() -> objectIO.readObject(uri).readAllBytes())
.isInstanceOf(IOException.class);
}
}

Expand Down
Loading

0 comments on commit 0d36a84

Please sign in to comment.