Skip to content

Commit

Permalink
Committing operations: cleanup metadata after failures
Browse files Browse the repository at this point in the history
  • Loading branch information
snazy committed Jun 22, 2024
1 parent ef91f86 commit c88804f
Show file tree
Hide file tree
Showing 13 changed files with 465 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import org.projectnessie.storage.uri.StorageUri;

public interface ObjectIO {
Expand All @@ -26,4 +27,6 @@ public interface ObjectIO {
InputStream readObject(StorageUri uri) throws IOException;

OutputStream writeObject(StorageUri uri) throws IOException;

void deleteObjects(List<StorageUri> uris) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
*/
package org.projectnessie.catalog.files;

import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import org.projectnessie.catalog.files.adls.AdlsClientSupplier;
import org.projectnessie.catalog.files.adls.AdlsObjectIO;
import org.projectnessie.catalog.files.api.ObjectIO;
Expand All @@ -34,9 +39,17 @@ public ResolvingObjectIO(
S3ClientSupplier s3ClientSupplier,
AdlsClientSupplier adlsClientSupplier,
GcsStorageSupplier gcsStorageSupplier) {
this.s3ObjectIO = new S3ObjectIO(s3ClientSupplier, Clock.systemUTC());
this.gcsObjectIO = new GcsObjectIO(gcsStorageSupplier);
this.adlsObjectIO = new AdlsObjectIO(adlsClientSupplier);
this(
new S3ObjectIO(s3ClientSupplier, Clock.systemUTC()),
new GcsObjectIO(gcsStorageSupplier),
new AdlsObjectIO(adlsClientSupplier));
}

public ResolvingObjectIO(
S3ObjectIO s3ObjectIO, GcsObjectIO gcsObjectIO, AdlsObjectIO adlsObjectIO) {
this.s3ObjectIO = s3ObjectIO;
this.gcsObjectIO = gcsObjectIO;
this.adlsObjectIO = adlsObjectIO;
}

@Override
Expand All @@ -59,4 +72,25 @@ protected ObjectIO resolve(StorageUri uri) {
throw new IllegalArgumentException("Unknown or unsupported scheme: " + scheme);
}
}

@Override
public void deleteObjects(List<StorageUri> uris) throws IOException {
IOException ex = null;
Map<ObjectIO, List<StorageUri>> perObjectIO = new IdentityHashMap<>();
uris.forEach(uri -> perObjectIO.computeIfAbsent(resolve(uri), x -> new ArrayList<>()).add(uri));
for (Map.Entry<ObjectIO, List<StorageUri>> perObjIO : perObjectIO.entrySet()) {
try {
perObjIO.getKey().deleteObjects(perObjIO.getValue());
} catch (IOException e) {
if (ex == null) {
ex = e;
} else {
ex.addSuppressed(e);
}
}
}
if (ex != null) {
throw ex;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static org.projectnessie.catalog.files.adls.AdlsLocation.adlsLocation;

import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.models.ParallelTransferOptions;
import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
Expand All @@ -26,10 +27,16 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.projectnessie.catalog.files.api.ObjectIO;
import org.projectnessie.storage.uri.StorageUri;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AdlsObjectIO implements ObjectIO {
private static final Logger LOGGER = LoggerFactory.getLogger(AdlsObjectIO.class);

private final AdlsClientSupplier clientSupplier;

Expand All @@ -50,11 +57,18 @@ public void ping(StorageUri uri) throws IOException {
}

@Override
public InputStream readObject(StorageUri uri) {
public InputStream readObject(StorageUri uri) throws IOException {
DataLakeFileClient file = clientSupplier.fileClientForLocation(uri);
DataLakeFileInputStreamOptions options = new DataLakeFileInputStreamOptions();
clientSupplier.adlsOptions().readBlockSize().ifPresent(options::setBlockSize);
return file.openInputStream(options).getInputStream();
try {
return file.openInputStream(options).getInputStream();
} catch (BlobStorageException e) {
if (e.getStatusCode() == 404) {
throw new IOException(e.getServiceMessage(), e);
}
throw e;
}
}

@Override
Expand All @@ -66,4 +80,40 @@ public OutputStream writeObject(StorageUri uri) {
options.setParallelTransferOptions(transferOptions);
return new BufferedOutputStream(file.getOutputStream(options));
}

@Override
public void deleteObjects(List<StorageUri> uris) throws IOException {
// Note: the default container is mapped to an empty string
Map<String, List<AdlsLocation>> bucketToUris =
uris.stream()
.map(AdlsLocation::adlsLocation)
.collect(Collectors.groupingBy(l -> l.container().orElse("")));

IOException ex = null;
for (List<AdlsLocation> locations : bucketToUris.values()) {
DataLakeFileSystemClient fileSystem = clientSupplier.fileSystemClient(locations.get(0));

// No batch-delete ... yay
for (AdlsLocation location : locations) {
String path = location.path();
if (path.startsWith("/")) {
path = path.substring(1);
}
try {
fileSystem.deleteFileIfExists(path);
} catch (BlobStorageException e) {
if (e.getStatusCode() != 404) {
if (ex == null) {
ex = new IOException(e.getServiceMessage(), e);
} else {
ex.addSuppressed(e);
}
}
}
}
}
if (ex != null) {
throw ex;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,17 @@
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.projectnessie.catalog.files.api.ObjectIO;
import org.projectnessie.catalog.secrets.KeySecret;
import org.projectnessie.storage.uri.StorageUri;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GcsObjectIO implements ObjectIO {
private static final Logger LOGGER = LoggerFactory.getLogger(GcsObjectIO.class);

private final GcsStorageSupplier storageSupplier;

public GcsObjectIO(GcsStorageSupplier storageSupplier) {
Expand Down Expand Up @@ -95,4 +101,31 @@ public OutputStream writeObject(StorageUri uri) {
bucketOptions.writeChunkSize().ifPresent(channel::setChunkSize);
return Channels.newOutputStream(channel);
}

@Override
public void deleteObjects(List<StorageUri> uris) {
Map<String, List<GcsLocation>> bucketToUris =
uris.stream()
.map(GcsLocation::gcsLocation)
.collect(Collectors.groupingBy(GcsLocation::bucket));

for (List<GcsLocation> locations : bucketToUris.values()) {
GcsBucketOptions bucketOptions = storageSupplier.bucketOptions(locations.get(0));
@SuppressWarnings("resource")
Storage client = storageSupplier.forLocation(bucketOptions);

List<BlobId> blobIds =
locations.stream()
.map(location -> BlobId.of(location.bucket(), location.path()))
.collect(Collectors.toList());

// This is rather a hack to make the `AbstractClients` test pass, because our object storage
// mock doesn't implement GCS batch requests (yet).
if (blobIds.size() == 1) {
client.delete(blobIds.get(0));
} else {
client.delete(blobIds);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import org.projectnessie.catalog.files.api.ObjectIO;
import org.projectnessie.storage.uri.StorageUri;

Expand Down Expand Up @@ -53,6 +54,26 @@ public OutputStream writeObject(StorageUri uri) throws IOException {
}
}

@Override
public void deleteObjects(List<StorageUri> uris) throws IOException {
IOException ex = null;
for (StorageUri uri : uris) {
Path path = LocalObjectIO.filePath(uri);
try {
Files.deleteIfExists(path);
} catch (IOException e) {
if (ex == null) {
ex = e;
} else {
ex.addSuppressed(e);
}
}
}
if (ex != null) {
throw ex;
}
}

private static Path filePath(StorageUri uri) {
return Paths.get(uri.requiredPath());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ public S3Client getClient(StorageUri location) {
checkArgument(isS3scheme(scheme), "Invalid S3 scheme: %s", location);
String bucketName = location.requiredAuthority();

// Supply an empty profile file
return getClient(bucketName);
}

public S3Client getClient(String bucketName) {
S3BucketOptions bucketOptions =
s3options.effectiveOptionsForBucket(Optional.of(bucketName), secretsProvider);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,26 @@
import java.io.OutputStream;
import java.time.Clock;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.projectnessie.catalog.files.api.BackendThrottledException;
import org.projectnessie.catalog.files.api.NonRetryableException;
import org.projectnessie.catalog.files.api.ObjectIO;
import org.projectnessie.storage.uri.StorageUri;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkServiceException;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;

public class S3ObjectIO implements ObjectIO {
private static final Logger LOGGER = LoggerFactory.getLogger(S3ObjectIO.class);

private final S3ClientSupplier s3clientSupplier;
private final Clock clock;
Expand Down Expand Up @@ -104,6 +113,31 @@ public void close() throws IOException {
};
}

@Override
public void deleteObjects(List<StorageUri> uris) {
Map<String, List<StorageUri>> bucketToUris =
uris.stream().collect(Collectors.groupingBy(StorageUri::requiredAuthority));

for (Map.Entry<String, List<StorageUri>> bucketDeletes : bucketToUris.entrySet()) {
String bucket = bucketDeletes.getKey();
S3Client s3client = s3clientSupplier.getClient(bucket);

List<StorageUri> locations = bucketDeletes.getValue();
List<ObjectIdentifier> objectIdentifiers =
locations.stream()
.map(S3ObjectIO::withoutLeadingSlash)
.map(key -> ObjectIdentifier.builder().key(key).build())
.collect(Collectors.toList());

DeleteObjectsRequest.Builder deleteObjectsRequest =
DeleteObjectsRequest.builder()
.bucket(bucket)
.delete(Delete.builder().objects(objectIdentifiers).build());

s3client.deleteObjects(deleteObjectsRequest.build());
}
}

private static String withoutLeadingSlash(StorageUri uri) {
String path = uri.requiredPath();
return path.startsWith("/") ? path.substring(1) : path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.projectnessie.objectstoragemock.HeapStorageBucket.newHeapStorageBucket;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import org.assertj.core.api.SoftAssertions;
import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
Expand All @@ -38,8 +40,9 @@ public abstract class AbstractClients {
public static final String BUCKET_1 = "bucket1";
public static final String BUCKET_2 = "bucket2";

@SuppressWarnings("resource")
@Test
public void writeRead() throws Exception {
public void writeReadDelete() throws Exception {
try (ObjectStorageMock.MockServer server1 =
ObjectStorageMock.builder()
.putBuckets(BUCKET_1, newHeapStorageBucket().bucket())
Expand All @@ -58,6 +61,13 @@ public void writeRead() throws Exception {
response1 = new String(input.readAllBytes());
}
soft.assertThat(response1).isEqualTo("hello world");

objectIO.deleteObjects(List.of(uri));
// should not throw
objectIO.deleteObjects(List.of(uri));

soft.assertThatThrownBy(() -> objectIO.readObject(uri).readAllBytes())
.isInstanceOf(IOException.class);
}
}

Expand Down
Loading

0 comments on commit c88804f

Please sign in to comment.