Skip to content

Commit

Permalink
Update use of deprecated AWS SDK methods
Browse files Browse the repository at this point in the history
  • Loading branch information
nezihyigitbasi committed Dec 21, 2017
1 parent 90a198e commit dc4dbd6
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 28 deletions.
Expand Up @@ -22,17 +22,17 @@
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
import com.amazonaws.event.ProgressEvent;
import com.amazonaws.event.ProgressEventType;
import com.amazonaws.event.ProgressListener;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Builder;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.AmazonS3EncryptionClient;
import com.amazonaws.services.s3.S3ClientOptions;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CryptoConfiguration;
import com.amazonaws.services.s3.model.EncryptionMaterialsProvider;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.KMSEncryptionMaterialsProvider;
Expand All @@ -45,7 +45,7 @@
import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams;
import com.amazonaws.services.s3.transfer.Transfer;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerConfiguration;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import com.amazonaws.services.s3.transfer.Upload;
import com.facebook.presto.hadoop.HadoopFileStatus;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -87,6 +87,7 @@
import java.util.Map;
import java.util.Optional;

import static com.amazonaws.regions.Regions.US_EAST_1;
import static com.amazonaws.services.s3.Headers.SERVER_SIDE_ENCRYPTION;
import static com.amazonaws.services.s3.Headers.UNENCRYPTED_CONTENT_LENGTH;
import static com.facebook.presto.hive.RetryDriver.retry;
Expand Down Expand Up @@ -122,6 +123,7 @@
import static com.google.common.base.Strings.nullToEmpty;
import static com.google.common.base.Throwables.throwIfInstanceOf;
import static com.google.common.base.Throwables.throwIfUnchecked;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.Iterables.toArray;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.lang.Math.max;
Expand All @@ -148,8 +150,6 @@ public class PrestoS3FileSystem
private static final String PATH_SEPARATOR = "/";
private static final Duration BACKOFF_MIN_SLEEP = new Duration(1, SECONDS);

private final TransferManagerConfiguration transferConfig = new TransferManagerConfiguration();

private URI uri;
private Path workingDirectory;
private AmazonS3 s3;
Expand All @@ -163,6 +163,8 @@ public class PrestoS3FileSystem
private PrestoS3SseType sseType;
private String sseKmsKeyId;
private boolean isPathStyleAccess;
private long multiPartUploadMinFileSize;
private long multiPartUploadMinPartSize;

@Override
public void initialize(URI uri, Configuration conf)
Expand All @@ -186,11 +188,13 @@ public void initialize(URI uri, Configuration conf)
Duration connectTimeout = Duration.valueOf(conf.get(S3_CONNECT_TIMEOUT, defaults.getS3ConnectTimeout().toString()));
Duration socketTimeout = Duration.valueOf(conf.get(S3_SOCKET_TIMEOUT, defaults.getS3SocketTimeout().toString()));
int maxConnections = conf.getInt(S3_MAX_CONNECTIONS, defaults.getS3MaxConnections());
long minFileSize = conf.getLong(S3_MULTIPART_MIN_FILE_SIZE, defaults.getS3MultipartMinFileSize().toBytes());
long minPartSize = conf.getLong(S3_MULTIPART_MIN_PART_SIZE, defaults.getS3MultipartMinPartSize().toBytes());
this.multiPartUploadMinFileSize = conf.getLong(S3_MULTIPART_MIN_FILE_SIZE, defaults.getS3MultipartMinFileSize().toBytes());
this.multiPartUploadMinPartSize = conf.getLong(S3_MULTIPART_MIN_PART_SIZE, defaults.getS3MultipartMinPartSize().toBytes());
this.isPathStyleAccess = conf.getBoolean(S3_PATH_STYLE_ACCESS, defaults.isS3PathStyleAccess());
this.useInstanceCredentials = conf.getBoolean(S3_USE_INSTANCE_CREDENTIALS, defaults.isS3UseInstanceCredentials());
this.pinS3ClientToCurrentRegion = conf.getBoolean(S3_PIN_CLIENT_TO_CURRENT_REGION, defaults.isPinS3ClientToCurrentRegion());
verify((pinS3ClientToCurrentRegion && conf.get(S3_ENDPOINT) == null) || !pinS3ClientToCurrentRegion,
"Invalid configuration: either endpoint can be set or S3 client can be pinned to the current region");
this.sseEnabled = conf.getBoolean(S3_SSE_ENABLED, defaults.isS3SseEnabled());
this.sseType = PrestoS3SseType.valueOf(conf.get(S3_SSE_TYPE, defaults.getS3SseType().name()));
this.sseKmsKeyId = conf.get(S3_SSE_KMS_KEY_ID, defaults.getS3SseKmsKeyId());
Expand All @@ -206,9 +210,6 @@ public void initialize(URI uri, Configuration conf)
.withUserAgentSuffix(S3_USER_AGENT_SUFFIX);

this.s3 = createAmazonS3Client(uri, conf, configuration);

transferConfig.setMultipartUploadThreshold(minFileSize);
transferConfig.setMinimumUploadPartSize(minPartSize);
}

@Override
Expand Down Expand Up @@ -359,7 +360,7 @@ public FSDataOutputStream create(Path path, FsPermission permission, boolean ove

String key = keyFromPath(qualifiedPath(path));
return new FSDataOutputStream(
new PrestoS3OutputStream(s3, transferConfig, getBucketName(uri), key, tempFile, sseEnabled, sseType, sseKmsKeyId),
new PrestoS3OutputStream(s3, getBucketName(uri), key, tempFile, sseEnabled, sseType, sseKmsKeyId, multiPartUploadMinFileSize, multiPartUploadMinPartSize),
statistics);
}

Expand Down Expand Up @@ -620,41 +621,57 @@ private static String keyFromPath(Path path)
return key;
}

private AmazonS3Client createAmazonS3Client(URI uri, Configuration hadoopConfig, ClientConfiguration clientConfig)
private AmazonS3 createAmazonS3Client(URI uri, Configuration hadoopConfig, ClientConfiguration clientConfig)
{
AWSCredentialsProvider credentials = getAwsCredentialsProvider(uri, hadoopConfig);
Optional<EncryptionMaterialsProvider> emp = createEncryptionMaterialsProvider(hadoopConfig);
AmazonS3Client client;
Optional<EncryptionMaterialsProvider> encryptionMaterialsProvider = createEncryptionMaterialsProvider(hadoopConfig);
AmazonS3Builder<? extends AmazonS3Builder, ? extends AmazonS3> clientBuilder;

String signerType = hadoopConfig.get(S3_SIGNER_TYPE);
if (signerType != null) {
clientConfig.withSignerOverride(signerType);
}
if (emp.isPresent()) {
client = new AmazonS3EncryptionClient(credentials, emp.get(), clientConfig, new CryptoConfiguration(), METRIC_COLLECTOR);

if (encryptionMaterialsProvider.isPresent()) {
clientBuilder = AmazonS3EncryptionClient.encryptionBuilder()
.withCredentials(credentials)
.withEncryptionMaterials(encryptionMaterialsProvider.get())
.withClientConfiguration(clientConfig)
.withMetricsCollector(METRIC_COLLECTOR);
}
else {
client = new AmazonS3Client(credentials, clientConfig, METRIC_COLLECTOR);
clientBuilder = AmazonS3Client.builder()
.withCredentials(credentials)
.withClientConfiguration(clientConfig)
.withMetricsCollector(METRIC_COLLECTOR);
}

if (isPathStyleAccess) {
S3ClientOptions clientOptions = S3ClientOptions.builder().setPathStyleAccess(true).build();
client.setS3ClientOptions(clientOptions);
}
boolean regionOrEndpointSet = false;

// use local region when running inside of EC2
if (pinS3ClientToCurrentRegion) {
Region region = Regions.getCurrentRegion();
if (region != null) {
client.setRegion(region);
clientBuilder = clientBuilder.withRegion(region.getName());
regionOrEndpointSet = true;
}
}

String endpoint = hadoopConfig.get(S3_ENDPOINT);
if (endpoint != null) {
client.setEndpoint(endpoint);
clientBuilder = clientBuilder.withEndpointConfiguration(new EndpointConfiguration(endpoint, null));
regionOrEndpointSet = true;
}

if (isPathStyleAccess) {
clientBuilder = clientBuilder.enablePathStyleAccess();
}

if (!regionOrEndpointSet) {
clientBuilder = clientBuilder.withRegion(US_EAST_1);
}

return client;
return clientBuilder.build();
}

private static Optional<EncryptionMaterialsProvider> createEncryptionMaterialsProvider(Configuration hadoopConfig)
Expand Down Expand Up @@ -693,7 +710,7 @@ private AWSCredentialsProvider getAwsCredentialsProvider(URI uri, Configuration
}

if (useInstanceCredentials) {
return new InstanceProfileCredentialsProvider();
return InstanceProfileCredentialsProvider.getInstance();
}

String providerClass = conf.get(S3_CREDENTIALS_PROVIDER);
Expand Down Expand Up @@ -959,13 +976,24 @@ private static class PrestoS3OutputStream

private boolean closed;

public PrestoS3OutputStream(AmazonS3 s3, TransferManagerConfiguration config, String host, String key, File tempFile, boolean sseEnabled, PrestoS3SseType sseType, String sseKmsKeyId)
public PrestoS3OutputStream(
AmazonS3 s3,
String host,
String key,
File tempFile,
boolean sseEnabled,
PrestoS3SseType sseType,
String sseKmsKeyId,
long multiPartUploadMinFileSize,
long multiPartUploadMinPartSize)
throws IOException
{
super(new BufferedOutputStream(new FileOutputStream(requireNonNull(tempFile, "tempFile is null"))));

transferManager = new TransferManager(requireNonNull(s3, "s3 is null"));
transferManager.setConfiguration(requireNonNull(config, "config is null"));
transferManager = TransferManagerBuilder.standard()
.withS3Client(requireNonNull(s3, "s3 is null"))
.withMinimumUploadPartSize(multiPartUploadMinPartSize)
.withMultipartUploadThreshold(multiPartUploadMinFileSize).build();

this.host = requireNonNull(host, "host is null");
this.key = requireNonNull(key, "key is null");
Expand Down
Expand Up @@ -27,6 +27,7 @@
import com.amazonaws.services.s3.model.EncryptionMaterialsProvider;
import com.facebook.presto.hive.s3.PrestoS3FileSystem.UnrecoverableS3OperationException;
import com.google.common.base.Throwables;
import com.google.common.base.VerifyException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
Expand All @@ -52,6 +53,7 @@
import static com.facebook.presto.hive.s3.S3ConfigurationUpdater.S3_MAX_CLIENT_RETRIES;
import static com.facebook.presto.hive.s3.S3ConfigurationUpdater.S3_MAX_RETRY_TIME;
import static com.facebook.presto.hive.s3.S3ConfigurationUpdater.S3_PATH_STYLE_ACCESS;
import static com.facebook.presto.hive.s3.S3ConfigurationUpdater.S3_PIN_CLIENT_TO_CURRENT_REGION;
import static com.facebook.presto.hive.s3.S3ConfigurationUpdater.S3_SECRET_KEY;
import static com.facebook.presto.hive.s3.S3ConfigurationUpdater.S3_SIGNER_TYPE;
import static com.facebook.presto.hive.s3.S3ConfigurationUpdater.S3_STAGING_DIRECTORY;
Expand Down Expand Up @@ -105,6 +107,18 @@ public void testCompatibleStaticCredentials()
}
}

@Test(expectedExceptions = VerifyException.class, expectedExceptionsMessageRegExp = "Invalid configuration: either endpoint can be set or S3 client can be pinned to the current region")
public void testEndpointWithPinToCurrentRegionConfiguration()
throws Exception
{
Configuration config = new Configuration();
config.set(S3_ENDPOINT, "test.example.endpoint.com");
config.set(S3_PIN_CLIENT_TO_CURRENT_REGION, "true");
try (PrestoS3FileSystem fs = new PrestoS3FileSystem()) {
fs.initialize(new URI("s3a://test-bucket/"), config);
}
}

@Test
public void testInstanceCredentialsEnabled()
throws Exception
Expand Down

0 comments on commit dc4dbd6

Please sign in to comment.