Skip to content

Commit

Permalink
Add glacier skipping logic
Browse files Browse the repository at this point in the history
  • Loading branch information
bangtim authored and pettyjamesm committed Apr 23, 2024
1 parent 6e104f2 commit 9962596
Show file tree
Hide file tree
Showing 11 changed files with 216 additions and 19 deletions.
3 changes: 3 additions & 0 deletions docs/src/main/sphinx/object-storage/legacy-s3.md
Expand Up @@ -111,6 +111,9 @@ Trino uses its own S3 filesystem for the URI prefixes
* - `hive.s3.sts.region`
- Optional override for the sts region given that IAM role based
authentication via sts is used.
* - `hive.s3.storage-class-filter`
- Filter based on storage class of S3 object, defaults to `READ_ALL`.

:::

(hive-s3-credentials)=
Expand Down
Expand Up @@ -13,15 +13,20 @@
*/
package io.trino.filesystem.s3;

import com.google.common.collect.ImmutableSet;
import io.trino.filesystem.FileEntry;
import io.trino.filesystem.FileIterator;
import io.trino.filesystem.Location;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.model.ObjectStorageClass;
import software.amazon.awssdk.services.s3.model.RestoreStatus;
import software.amazon.awssdk.services.s3.model.S3Object;

import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Optional;
import java.util.Set;

import static com.google.common.base.Verify.verify;
import static java.util.Objects.requireNonNull;
Expand All @@ -32,6 +37,8 @@ final class S3FileIterator
private final S3Location location;
private final Iterator<S3Object> iterator;
private final Location baseLocation;
private static final String S3_GLACIER_TAG = "s3:glacier";
private static final String S3_GLACIER_AND_RESTORED_TAG = "s3:glacierRestored";

public S3FileIterator(S3Location location, Iterator<S3Object> iterator)
{
Expand Down Expand Up @@ -61,11 +68,21 @@ public FileEntry next()

verify(object.key().startsWith(location.key()), "S3 listed key [%s] does not start with prefix [%s]", object.key(), location.key());

Set<String> tags = ImmutableSet.of();
if (object.storageClass() == ObjectStorageClass.GLACIER || object.storageClass() == ObjectStorageClass.DEEP_ARCHIVE) {
tags = new HashSet<>();
tags.add(S3_GLACIER_TAG);
if (Optional.ofNullable(object.restoreStatus()).map(RestoreStatus::restoreExpiryDate).isPresent()) {
tags.add(S3_GLACIER_AND_RESTORED_TAG);
}
tags = ImmutableSet.copyOf(tags);
}
return new FileEntry(
baseLocation.appendPath(object.key()),
object.size(),
object.lastModified(),
Optional.empty());
Optional.empty(),
tags);
}
catch (SdkException e) {
throw new IOException("Failed to list location: " + location, e);
Expand Down
Expand Up @@ -15,12 +15,22 @@

import io.airlift.units.DataSize;
import io.opentelemetry.api.OpenTelemetry;
import io.trino.filesystem.FileEntry;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.ObjectStorageClass;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;

import java.io.IOException;
import java.util.List;

import static com.google.common.collect.Iterables.getOnlyElement;
import static java.util.Objects.requireNonNull;
import static org.assertj.core.api.Assertions.assertThat;

public class TestS3FileSystemAwsS3
extends AbstractTestS3FileSystem
Expand Down Expand Up @@ -68,4 +78,32 @@ private static String environmentVariable(String name)
{
return requireNonNull(System.getenv(name), "Environment variable not set: " + name);
}

@Test
void testS3FileIteratorFileEntryTags()
throws IOException
{
try (S3Client s3Client = createS3Client()) {
String key = "test/tagsGlacier";
ObjectStorageClass storageClass = ObjectStorageClass.GLACIER;
PutObjectRequest putObjectRequestBuilder = PutObjectRequest.builder()
.bucket(bucket())
.key(key)
.storageClass(storageClass.toString())
.build();
s3Client.putObject(
putObjectRequestBuilder,
RequestBody.empty());

try {
List<FileEntry> listing = toList(getFileSystem().listFiles(getRootLocation().appendPath("test")));
FileEntry fileEntry = getOnlyElement(listing);

assertThat(fileEntry.tags().contains("s3:glacier")).isTrue();
}
finally {
s3Client.deleteObject(delete -> delete.bucket(bucket()).key(key));
}
}
}
}
Expand Up @@ -14,10 +14,12 @@
package io.trino.filesystem;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;

import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.Set;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
Expand All @@ -26,14 +28,20 @@
import static java.util.Comparator.comparing;
import static java.util.Objects.requireNonNull;

public record FileEntry(Location location, long length, Instant lastModified, Optional<List<Block>> blocks)
public record FileEntry(Location location, long length, Instant lastModified, Optional<List<Block>> blocks, Set<String> tags)
{
public FileEntry
{
checkArgument(length >= 0, "length is negative");
requireNonNull(location, "location is null");
requireNonNull(blocks, "blocks is null");
blocks = blocks.map(locations -> validatedBlocks(locations, length));
tags = ImmutableSet.copyOf(requireNonNull(tags, "tags is null"));
}

public FileEntry(Location location, long length, Instant lastModified, Optional<List<Block>> blocks)
{
this(location, length, lastModified, blocks, ImmutableSet.of());
}

public record Block(List<String> hosts, long offset, long length)
Expand Down
Expand Up @@ -176,6 +176,8 @@ public class HiveConfig

private boolean partitionProjectionEnabled;

private S3StorageClassFilter s3StorageClassFilter = S3StorageClassFilter.READ_ALL;

public boolean isSingleStatementWritesOnly()
{
return singleStatementWritesOnly;
Expand Down Expand Up @@ -1252,4 +1254,17 @@ public HiveConfig setPartitionProjectionEnabled(boolean enabledAthenaPartitionPr
this.partitionProjectionEnabled = enabledAthenaPartitionProjection;
return this;
}

public S3StorageClassFilter getS3StorageClassFilter()
{
return s3StorageClassFilter;
}

@Config("hive.s3.storage-class-filter")
@ConfigDescription("Filter based on storage class of S3 object")
public HiveConfig setS3StorageClassFilter(S3StorageClassFilter s3StorageClassFilter)
{
this.s3StorageClassFilter = s3StorageClassFilter;
return this;
}
}
@@ -0,0 +1,59 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hive;

import io.trino.filesystem.FileEntry;

import java.util.function.Predicate;

import static com.google.common.base.Predicates.alwaysTrue;

public enum S3StorageClassFilter {
READ_ALL,
READ_NON_GLACIER,
READ_NON_GLACIER_AND_RESTORED;

private static final String S3_GLACIER_TAG = "s3:glacier";
private static final String S3_GLACIER_AND_RESTORED_TAG = "s3:glacierRestored";

/**
* Checks if the S3 object is not an object with a storage class of glacier/deep_archive
*
* @return boolean that helps identify if FileEntry object contains tags for glacier object
*/
private static boolean isNotGlacierObject(FileEntry fileEntry)
{
return !fileEntry.tags().contains(S3_GLACIER_TAG);
}

/**
* Only restored objects will have the restoreExpiryDate set.
* Ignore not-restored objects and in-progress restores.
*
* @return boolean that helps identify if FileEntry object contains tags for glacier or glacierRestored object
*/
private static boolean isCompletedRestoredObject(FileEntry fileEntry)
{
return isNotGlacierObject(fileEntry) || fileEntry.tags().contains(S3_GLACIER_AND_RESTORED_TAG);
}

public Predicate<FileEntry> toFileEntryPredicate()
{
return switch (this) {
case READ_ALL -> alwaysTrue();
case READ_NON_GLACIER -> S3StorageClassFilter::isNotGlacierObject;
case READ_NON_GLACIER_AND_RESTORED -> S3StorageClassFilter::isCompletedRestoredObject;
};
}
}
Expand Up @@ -20,6 +20,7 @@
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.cache.EvictableCacheBuilder;
import io.trino.filesystem.FileEntry;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.plugin.hive.HiveConfig;
Expand All @@ -35,6 +36,7 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
Expand All @@ -52,14 +54,16 @@ public class CachingDirectoryLister
// to deal more efficiently with cache invalidation scenarios for partitioned tables.
private final Cache<Location, ValueHolder> cache;
private final List<SchemaTablePrefix> tablePrefixes;
private final Predicate<FileEntry> filterPredicate;

@Inject
public CachingDirectoryLister(HiveConfig hiveClientConfig)
{
this(hiveClientConfig.getFileStatusCacheExpireAfterWrite(), hiveClientConfig.getFileStatusCacheMaxRetainedSize(), hiveClientConfig.getFileStatusCacheTables());
this(hiveClientConfig.getFileStatusCacheExpireAfterWrite(), hiveClientConfig.getFileStatusCacheMaxRetainedSize(),
hiveClientConfig.getFileStatusCacheTables(), hiveClientConfig.getS3StorageClassFilter().toFileEntryPredicate());
}

public CachingDirectoryLister(Duration expireAfterWrite, DataSize maxSize, List<String> tables)
public CachingDirectoryLister(Duration expireAfterWrite, DataSize maxSize, List<String> tables, Predicate<FileEntry> filterPredicate)
{
this.cache = EvictableCacheBuilder.newBuilder()
.maximumWeight(maxSize.toBytes())
Expand All @@ -71,6 +75,7 @@ public CachingDirectoryLister(Duration expireAfterWrite, DataSize maxSize, List<
this.tablePrefixes = tables.stream()
.map(CachingDirectoryLister::parseTableName)
.collect(toImmutableList());
this.filterPredicate = filterPredicate;
}

private static SchemaTablePrefix parseTableName(String tableName)
Expand All @@ -93,7 +98,7 @@ public RemoteIterator<TrinoFileStatus> listFilesRecursively(TrinoFileSystem fs,
throws IOException
{
if (!isCacheEnabledFor(table.getSchemaTableName())) {
return new TrinoFileStatusRemoteIterator(fs.listFiles(location));
return new TrinoFileStatusRemoteIterator(fs.listFiles(location), filterPredicate);
}

return listInternal(fs, location);
Expand All @@ -107,13 +112,13 @@ private RemoteIterator<TrinoFileStatus> listInternal(TrinoFileSystem fs, Locatio
return new SimpleRemoteIterator(cachedValueHolder.getFiles().get().iterator());
}

return cachingRemoteIterator(cachedValueHolder, createListingRemoteIterator(fs, location), location);
return cachingRemoteIterator(cachedValueHolder, createListingRemoteIterator(fs, location, filterPredicate), location);
}

private static RemoteIterator<TrinoFileStatus> createListingRemoteIterator(TrinoFileSystem fs, Location location)
private static RemoteIterator<TrinoFileStatus> createListingRemoteIterator(TrinoFileSystem fs, Location location, Predicate<FileEntry> filterPredicate)
throws IOException
{
return new TrinoFileStatusRemoteIterator(fs.listFiles(location));
return new TrinoFileStatusRemoteIterator(fs.listFiles(location), filterPredicate);
}

@Override
Expand Down
Expand Up @@ -13,33 +13,62 @@
*/
package io.trino.plugin.hive.fs;

import io.trino.filesystem.FileEntry;
import io.trino.filesystem.FileIterator;
import jakarta.annotation.Nullable;

import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.function.Predicate;

import static java.util.Objects.requireNonNull;

public class TrinoFileStatusRemoteIterator
implements RemoteIterator<TrinoFileStatus>
{
private final FileIterator iterator;
private final Predicate<FileEntry> filterPredicate;
@Nullable
private TrinoFileStatus nextElement;

public TrinoFileStatusRemoteIterator(FileIterator iterator)
public TrinoFileStatusRemoteIterator(FileIterator iterator, Predicate<FileEntry> filterPredicate)
throws IOException
{
this.iterator = requireNonNull(iterator, "iterator is null");
this.filterPredicate = requireNonNull(filterPredicate, "filterPredicate is null");
this.nextElement = findNextElement();
}

@Override
public boolean hasNext()
throws IOException
{
return iterator.hasNext();
return nextElement != null;
}

@Override
public TrinoFileStatus next()
throws IOException
{
return new TrinoFileStatus(iterator.next());
if (!hasNext()) {
throw new NoSuchElementException();
}

TrinoFileStatus thisElement = nextElement;
this.nextElement = findNextElement();
return thisElement;
}

private TrinoFileStatus findNextElement()
throws IOException
{
while (iterator.hasNext()) {
FileEntry candidate = iterator.next();

if (filterPredicate.test(candidate)) {
return new TrinoFileStatus(candidate);
}
}
return null;
}
}

0 comments on commit 9962596

Please sign in to comment.