Skip to content

Commit

Permalink
Add support for AWS Glacier restore
Browse files Browse the repository at this point in the history
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
  • Loading branch information
pditommaso committed Dec 11, 2022
1 parent e188bbf commit b611076
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 4 deletions.
2 changes: 2 additions & 0 deletions docs/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ anonymous Allow the access of public S3 buckets without the ne
s3Acl Allow the setting of a predefined bucket permissions also known as *canned ACL*. Permitted values are ``Private``, ``PublicRead``, ``PublicReadWrite``, ``AuthenticatedRead``, ``LogDeliveryWrite``, ``BucketOwnerRead``, ``BucketOwnerFullControl`` and ``AwsExecRead``. See `Amazon docs <https://docs.aws.amazon.com/AmazonS3/latest/userguide/acl-overview.html#canned-acl>`_ for details.
connectionTimeout The amount of time to wait (in milliseconds) when initially establishing a connection before giving up and timing out.
endpoint The AWS S3 API entry point e.g. `s3-us-west-1.amazonaws.com`.
glacierAutoRetrieval Enable auto retrieval of S3 objects stored with Glacier class store (EXPERIMENTAL. default: ``false``, requires version ``22.12.0-edge`` or later).
glacierExpirationDays The time, in days, between when an object is restored to the bucket and when it expires (EXPERIMENTAL. default: ``7``, requires version ``22.12.0-edge`` or later).
maxConnections The maximum number of allowed open HTTP connections.
maxErrorRetry The maximum number of retry attempts for failed retryable requests.
protocol The protocol (i.e. HTTP or HTTPS) to use when connecting to AWS.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.Headers;
import com.amazonaws.services.s3.model.AccessControlList;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.Bucket;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
Expand All @@ -85,6 +86,7 @@
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.RestoreObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.SSEAlgorithm;
import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams;
Expand Down Expand Up @@ -132,7 +134,11 @@ public class AmazonS3Client {

private Integer uploadMaxThreads = 10;

public AmazonS3Client(AmazonS3 client){
private boolean glacierAutoRetrieval;

private int glacierExpirationDays = 7;

public AmazonS3Client(AmazonS3 client) {
this.client = client;
}

Expand All @@ -141,9 +147,10 @@ public AmazonS3Client(ClientConfiguration config, AWSCredentials creds, String r
.standard()
.withCredentials(new AWSStaticCredentialsProvider(creds))
.withClientConfiguration(config)
.withRegion( region )
.withRegion(region)
.build();
}

/**
* @see com.amazonaws.services.s3.AmazonS3Client#listBuckets()
*/
Expand Down Expand Up @@ -332,6 +339,33 @@ public CannedAccessControlList getCannedAcl() {
return cannedAcl;
}

public void setGlacierAutoRetrieval(boolean value) {
this.glacierAutoRetrieval = value;
log.debug("Setting S3 glacierAutoRetrieval={}", glacierAutoRetrieval);
}

public void setGlacierAutoRetrieval(String value) {
if( value==null )
return;
setGlacierAutoRetrieval(Boolean.parseBoolean(value));
}

public void setGlacierExpirationDays(int days) {
this.glacierExpirationDays = days;
log.debug("Setting S3 glacierExpirationDays={}", glacierExpirationDays);
}

public void setGlacierExpirationDays(String days) {
if( days==null )
return;
try {
setGlacierExpirationDays(Integer.parseInt(days));
}
catch( NumberFormatException e ) {
log.warn("Not a valid AWS S3 glacierExpirationDays: `{}` -- Using default", days);
}
}

public AmazonS3 getClient() {
return client;
}
Expand Down Expand Up @@ -535,6 +569,60 @@ public void downloadFile(S3Path source, File target) {
log.debug("S3 download file: s3://{}/{} interrupted",source.getBucket(), source.getKey());
Thread.currentThread().interrupt();
}
catch (AmazonS3Exception e) {
handleAmazonException(source, target, e);
}
}

private void handleAmazonException(S3Path source, File target, AmazonS3Exception e) {
// the following message is returned when accessing a Glacier stored file
// "The operation is not valid for the object's storage class"
final boolean isGlacierError = e.getMessage().contains("storage class")
&& e.getErrorCode().equals("InvalidObjectState");

if( isGlacierError && glacierAutoRetrieval ) {
log.info("S3 download s3://{}/{} failed due to invalid storage class -- Retrieving from Glacier", source.getBucket(), source.getKey());
restoreFromGlacier(source.getBucket(), source.getKey());
downloadFile(source, target);
}
else {
throw e;
}
}

protected void restoreFromGlacier(String bucketName, String key) {
final int sleepMillis = 30_000;
final long _5_mins = 5 * 60 * 1_000;

try {
client.restoreObjectV2(new RestoreObjectRequest(bucketName, key, glacierExpirationDays));
}
catch (AmazonS3Exception e) {
if( e.getMessage().contains("RestoreAlreadyInProgress") ) {
log.debug("S3 Glacier restore already initiated for object s3://{}/{}", bucketName, key);
}
else {
throw e;
}
}

try {
boolean ongoingRestore = true;
long begin = System.currentTimeMillis();
while( ongoingRestore ) {
final long now = System.currentTimeMillis();
if( now-begin>_5_mins ) {
log.info("S3 Glacier restore ongoing for object s3://{}/{}", bucketName, key);
begin = now;
}
Thread.sleep(sleepMillis);
ongoingRestore = client.getObjectMetadata(bucketName, key).getOngoingRestore();
}
}
catch (InterruptedException e) {
log.debug("S3 Glacier restore s3://{}/{} interrupted", bucketName, key);
Thread.currentThread().interrupt();
}
}

public void downloadDirectory(S3Path source, File targetFile) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,8 @@ else if (accessKey == null && secretKey == null) {
client.setKmsKeyId(props.getProperty("storage_kms_key_id"));
client.setUploadChunkSize(props.getProperty("upload_chunk_size"));
client.setUploadMaxThreads(props.getProperty("upload_max_threads"));
client.setGlacierAutoRetrieval(props.getProperty("glacier_auto_retrieval"));
client.setGlacierExpirationDays(props.getProperty("glacier_expiration_days"));

if (uri.getHost() != null) {
client.setEndpoint(uri.getHost());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import java.nio.file.StandardOpenOption
import java.nio.file.attribute.BasicFileAttributes

import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.AmazonS3Exception
import com.amazonaws.services.s3.model.Tag
import groovy.util.logging.Slf4j
import nextflow.Global
Expand All @@ -30,7 +31,6 @@ import spock.lang.Shared
import spock.lang.Specification
import spock.lang.Timeout
import spock.lang.Unroll

/**
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Expand Down Expand Up @@ -1396,5 +1396,49 @@ class AwsS3NioTest extends Specification implements AwsS3BaseSpec {
cleanup:
deleteBucket(bucket1)
}


@Ignore // takes too long to test via CI server
def 'should restore from glacier' () {
given:
def TEXT = randomText(10_000)
def folder = Files.createTempDirectory('test')
def sourceFile = Files.write(folder.resolve('foo.data'), TEXT.bytes)
def downloadFile = folder.resolve('copy.data')
and:
def bucket1 = createBucket()

// upload a file to a remote bucket
when:
def target = s3path("s3://$bucket1/foo.data")
and:
target.setStorageClass('GLACIER')
def client = target.getFileSystem().getClient()
and:
FileHelper.copyPath(sourceFile, target)
// the file exist
then:
Files.exists(target)
and:
client
.getObjectMetadata(target.getBucket(), target.getKey())
.getStorageClass() == 'GLACIER'

when:
FileHelper.copyPath(target, downloadFile)
then:
thrown(AmazonS3Exception)

when:
client.setGlacierAutoRetrieval(true)
and:
FileHelper.copyPath(target, downloadFile)
then:
Files.exists(downloadFile)

cleanup:
client?.setGlacierAutoRetrieval(false)
folder?.delete()
deleteBucket(bucket1)
}

}

0 comments on commit b611076

Please sign in to comment.