Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for ACL config in native S3 file system #21176

Merged
merged 2 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/src/main/sphinx/object-storage/file-system-s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ support:
- Required region name for S3.
* - `s3.path-style-access`
- Use path-style access for all requests to S3
* - `s3.canned-acl`
- [Canned ACL](https://docs.aws.amazon.com/AmazonS3/latest/userguide/acl-overview.html#canned-acl)
to use when uploading files to S3. Defaults to `NONE`, which has the same
effect as `PRIVATE`. If the files are to be uploaded to an S3 bucket owned
by a different AWS user, the canned ACL may be set to one of the following:
`PRIVATE`, `PUBLIC_READ`, `PUBLIC_READ_WRITE`, `AUTHENTICATED_READ`,
`BUCKET_OWNER_READ`, or `BUCKET_OWNER_FULL_CONTROL`.
* - `s3.sse.type`
- Set the type of S3 server-side encryption (SSE) to use. Defaults to `NONE`
for no encryption. Other valid values are `S3` for encryption by S3 managed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.filesystem.s3;

import io.trino.filesystem.s3.S3FileSystemConfig.ObjectCannedAcl;
import io.trino.filesystem.s3.S3FileSystemConfig.S3SseType;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
Expand All @@ -23,7 +24,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

record S3Context(int partSize, boolean requesterPays, S3SseType sseType, String sseKmsKeyId, Optional<AwsCredentialsProvider> credentialsProviderOverride)
record S3Context(int partSize, boolean requesterPays, S3SseType sseType, String sseKmsKeyId, Optional<AwsCredentialsProvider> credentialsProviderOverride, ObjectCannedAcl cannedAcl)
{
private static final int MIN_PART_SIZE = 5 * 1024 * 1024; // S3 requirement

Expand All @@ -47,7 +48,8 @@ public S3Context withCredentialsProviderOverride(AwsCredentialsProvider credenti
requesterPays,
sseType,
sseKmsKeyId,
Optional.of(credentialsProviderOverride));
Optional.of(credentialsProviderOverride),
cannedAcl);
}

public void applyCredentialProviderOverride(AwsRequestOverrideConfiguration.Builder builder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.airlift.units.MinDataSize;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
anusudarsan marked this conversation as resolved.
Show resolved Hide resolved

import java.util.Optional;

Expand All @@ -35,6 +36,30 @@ public enum S3SseType
NONE, S3, KMS
}

public enum ObjectCannedAcl
{
NONE,
PRIVATE,
PUBLIC_READ,
PUBLIC_READ_WRITE,
AUTHENTICATED_READ,
BUCKET_OWNER_READ,
BUCKET_OWNER_FULL_CONTROL;

public static ObjectCannedACL getCannedAcl(S3FileSystemConfig.ObjectCannedAcl cannedAcl)
{
return switch (cannedAcl) {
case NONE -> null;
case PRIVATE -> ObjectCannedACL.PRIVATE;
case PUBLIC_READ -> ObjectCannedACL.PUBLIC_READ;
case PUBLIC_READ_WRITE -> ObjectCannedACL.PUBLIC_READ_WRITE;
case AUTHENTICATED_READ -> ObjectCannedACL.AUTHENTICATED_READ;
case BUCKET_OWNER_READ -> ObjectCannedACL.BUCKET_OWNER_READ;
case BUCKET_OWNER_FULL_CONTROL -> ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL;
};
}
}

private String awsAccessKey;
private String awsSecretKey;
private String endpoint;
Expand All @@ -57,6 +82,7 @@ public enum S3SseType
private boolean tcpKeepAlive;
private HostAndPort httpProxy;
private boolean httpProxySecure;
private ObjectCannedAcl objectCannedAcl = ObjectCannedAcl.NONE;

public String getAwsAccessKey()
{
Expand Down Expand Up @@ -184,6 +210,20 @@ public S3FileSystemConfig setStsRegion(String stsRegion)
return this;
}

@NotNull
public ObjectCannedAcl getCannedAcl()
{
return objectCannedAcl;
}

@Config("s3.canned-acl")
anusudarsan marked this conversation as resolved.
Show resolved Hide resolved
@ConfigDescription("Canned ACL (predefined grants) to manage access to objects")
public S3FileSystemConfig setCannedAcl(ObjectCannedAcl objectCannedAcl)
anusudarsan marked this conversation as resolved.
Show resolved Hide resolved
{
this.objectCannedAcl = objectCannedAcl;
return this;
}

@NotNull
public S3SseType getSseType()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ public S3FileSystemFactory(OpenTelemetry openTelemetry, S3FileSystemConfig confi
config.isRequesterPays(),
config.getSseType(),
config.getSseKmsKeyId(),
Optional.empty());
Optional.empty(),
config.getCannedAcl());
}

@PreDestroy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.RequestPayer;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
Expand All @@ -39,6 +40,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import static io.trino.filesystem.s3.S3FileSystemConfig.ObjectCannedAcl.getCannedAcl;
import static java.lang.Math.clamp;
import static java.lang.Math.max;
import static java.lang.Math.min;
Expand All @@ -60,6 +62,7 @@ final class S3OutputStream
private final RequestPayer requestPayer;
private final S3SseType sseType;
private final String sseKmsKeyId;
private final ObjectCannedACL cannedAcl;

private int currentPartNumber;
private byte[] buffer = new byte[0];
Expand All @@ -86,6 +89,7 @@ public S3OutputStream(AggregatedMemoryContext memoryContext, S3Client client, S3
this.requestPayer = context.requestPayer();
this.sseType = context.sseType();
this.sseKmsKeyId = context.sseKmsKeyId();
this.cannedAcl = getCannedAcl(context.cannedAcl());
}

@SuppressWarnings("NumericCastThatLosesPrecision")
Expand Down Expand Up @@ -195,6 +199,7 @@ private void flushBuffer(boolean finished)
if (finished && !multipartUploadStarted) {
PutObjectRequest request = PutObjectRequest.builder()
.overrideConfiguration(context::applyCredentialProviderOverride)
.acl(cannedAcl)
anusudarsan marked this conversation as resolved.
Show resolved Hide resolved
.requestPayer(requestPayer)
.bucket(location.bucket())
.key(location.key())
Expand Down Expand Up @@ -272,6 +277,7 @@ private CompletedPart uploadPage(byte[] data, int length)
if (uploadId.isEmpty()) {
CreateMultipartUploadRequest request = CreateMultipartUploadRequest.builder()
.overrideConfiguration(context::applyCredentialProviderOverride)
.acl(cannedAcl)
anusudarsan marked this conversation as resolved.
Show resolved Hide resolved
.requestPayer(requestPayer)
.bucket(location.bucket())
.key(location.key())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.common.net.HostAndPort;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.filesystem.s3.S3FileSystemConfig.ObjectCannedAcl;
import io.trino.filesystem.s3.S3FileSystemConfig.S3SseType;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -44,6 +45,7 @@ public void testDefaults()
.setExternalId(null)
.setStsEndpoint(null)
.setStsRegion(null)
.setCannedAcl(ObjectCannedAcl.NONE)
.setSseType(S3SseType.NONE)
.setSseKmsKeyId(null)
.setStreamingPartSize(DataSize.of(16, MEGABYTE))
Expand Down Expand Up @@ -72,6 +74,7 @@ public void testExplicitPropertyMappings()
.put("s3.external-id", "myid")
.put("s3.sts.endpoint", "sts.example.com")
.put("s3.sts.region", "us-west-2")
.put("s3.canned-acl", "BUCKET_OWNER_FULL_CONTROL")
.put("s3.sse.type", "KMS")
.put("s3.sse.kms-key-id", "mykey")
.put("s3.streaming.part-size", "42MB")
Expand All @@ -97,6 +100,7 @@ public void testExplicitPropertyMappings()
.setExternalId("myid")
.setStsEndpoint("sts.example.com")
.setStsRegion("us-west-2")
.setCannedAcl(ObjectCannedAcl.BUCKET_OWNER_FULL_CONTROL)
.setStreamingPartSize(DataSize.of(42, MEGABYTE))
.setSseType(S3SseType.KMS)
.setSseKmsKeyId("mykey")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ hive.metastore.uri=https://${ENV:DATABRICKS_HOST}:443/api/2.0/unity-hms-proxy/me
hive.metastore.http.client.bearer-token=${ENV:DATABRICKS_TOKEN}
hive.metastore.http.client.additional-headers=X-Databricks-Catalog-Name:${ENV:DATABRICKS_UNITY_CATALOG_NAME}
hive.metastore.http.client.authentication.type=BEARER
fs.hadoop.enabled=false
fs.native-s3.enabled=true
# We need to give access to bucket owner (the AWS account integrated with Databricks), otherwise files won't be readable from Databricks
hive.s3.upload-acl-type=BUCKET_OWNER_FULL_CONTROL
s3.canned-acl=BUCKET_OWNER_FULL_CONTROL
anusudarsan marked this conversation as resolved.
Show resolved Hide resolved
delta.enable-non-concurrent-writes=true
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,7 @@ hive.metastore.uri=https://${ENV:DATABRICKS_HOST}:443/api/2.0/unity-hms-proxy/me
hive.metastore.http.client.bearer-token=${ENV:DATABRICKS_TOKEN}
hive.metastore.http.client.additional-headers=X-Databricks-Catalog-Name:${ENV:DATABRICKS_UNITY_CATALOG_NAME}
hive.metastore.http.client.authentication.type=BEARER
fs.hadoop.enabled=false
fs.native-s3.enabled=true
# We need to give access to bucket owner (the AWS account integrated with Databricks), otherwise files won't be readable from Databricks
hive.s3.upload-acl-type=BUCKET_OWNER_FULL_CONTROL
s3.canned-acl=BUCKET_OWNER_FULL_CONTROL
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
connector.name=delta_lake
hive.metastore=glue
hive.metastore.glue.region=${ENV:AWS_REGION}
fs.hadoop.enabled=true
fs.hadoop.enabled=false
fs.native-s3.enabled=true
# We need to give access to bucket owner (the AWS account integrated with Databricks), otherwise files won't be readable from Databricks
hive.s3.upload-acl-type=BUCKET_OWNER_FULL_CONTROL
s3.canned-acl=BUCKET_OWNER_FULL_CONTROL
delta.enable-non-concurrent-writes=true
delta.hive-catalog-name=hive
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
connector.name=hive
hive.metastore=glue
hive.metastore.glue.region=${ENV:AWS_REGION}
fs.hadoop.enabled=true
fs.hadoop.enabled=false
fs.native-s3.enabled=true
# We need to give access to bucket owner (the AWS account integrated with Databricks), otherwise files won't be readable from Databricks
hive.s3.upload-acl-type=BUCKET_OWNER_FULL_CONTROL
s3.canned-acl=BUCKET_OWNER_FULL_CONTROL
hive.non-managed-table-writes-enabled=true
# Required by some product tests
hive.hive-views.enabled=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,8 @@ public class TestDeltaLakeJmx
public void testJmxTablesExposedByDeltaLakeConnectorBackedByGlueMetastore()
{
assertThat(onTrino().executeQuery("SHOW TABLES IN jmx.current LIKE '%name=delta%'")).containsOnly(
row("io.trino.hdfs:name=delta,type=trinofilesystemcachestats"),
row("io.trino.hdfs:name=delta,type=trinohdfsfilesystemstats"),
row("io.trino.plugin.hive.metastore.cache:name=delta,type=cachinghivemetastore"),
row("io.trino.plugin.hive.metastore.glue:name=delta,type=gluehivemetastore"),
row("io.trino.hdfs.s3:name=delta,type=trinos3filesystem"),
row("io.trino.plugin.hive:catalog=delta,name=delta,type=fileformatdatasourcestats"),
row("trino.plugin.deltalake.transactionlog:catalog=delta,name=delta,type=transactionlogaccess"));
}
Expand Down