Skip to content

Commit

Permalink
Add support for S3 storageClass to publishDir
Browse files Browse the repository at this point in the history
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
  • Loading branch information
pditommaso committed Dec 8, 2022
1 parent 7eecb26 commit 066f920
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 9 deletions.
1 change: 1 addition & 0 deletions docs/process.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2057,6 +2057,7 @@ saveAs A closure which, given the name of the file being published, ret
enabled Enable or disable the publish rule depending on the boolean value specified (default: ``true``).
failOnError When ``true`` abort the execution if some file can't be published to the specified target directory or bucket for any cause (default: ``false``)
contentType Allow specifying the media content type of the published file a.k.a. `MIME type <https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_Types>`_. If the boolean value ``true`` is specified the content type is inferred from the file extension (EXPERIMENTAL. Currently only supported by files stored on AWS S3. Default: ``false``, requires `22.10.0`` or later).
storageClass Allow specifying the *storage class* to be used for the published file (EXPERIMENTAL. Currently only supported by files stored on AWS S3. Requires version ``22.12.0-edge`` or later).
tags Allow the association of arbitrary tags with the published file e.g. ``tags: [FOO: 'Hello world']`` (EXPERIMENTAL. Currently only supported by files stored on AWS S3. Requires version ``21.12.0-edge`` or later).
=============== =================

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ class PublishDir {
*/
private contentType

/**
* The storage class to be used for the target file.
* Currently only supported by AWS S3.
*/
private String storageClass

private PathMatcher matcher

private FileSystem sourceFileSystem
Expand Down Expand Up @@ -197,6 +203,9 @@ class PublishDir {
else if( params.contentType )
result.contentType = params.contentType as String

if( params.storageClass )
result.storageClass = params.storageClass as String

return result
}

Expand Down Expand Up @@ -312,6 +321,10 @@ class PublishDir {
: this.contentType.toString()
destination.setContentType(type)
}
// storage class
if( storageClass && destination instanceof TagAwareFile ) {
destination.setStorageClass(storageClass)
}

if( inProcess ) {
safeProcessFile(source, destination)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,26 @@ class PublishDirTest extends Specification {

}

def 'should create publish dir with extended params' () {
given:
PublishDir publish

when:
publish = PublishDir.create(tags: ['foo','bar'])
then:
publish.@tags == ['foo','bar']

when:
publish = PublishDir.create(contentType: 'text/json')
then:
publish.@contentType == 'text/json'

when:
publish = PublishDir.create(storageClass: 'xyz')
then:
publish.@storageClass == 'xyz'
}

def 'should create symlinks for output files' () {

given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ interface TagAwareFile {

void setContentType(String type)

void setStorageClass(String storageClass)
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.SSEAlgorithm;
import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams;
import com.amazonaws.services.s3.model.StorageClass;
import com.amazonaws.services.s3.model.Tag;
import com.amazonaws.services.s3.transfer.Download;
import com.amazonaws.services.s3.transfer.MultipleFileUpload;
Expand Down Expand Up @@ -173,7 +174,7 @@ public PutObjectResult putObject(String bucket, String key, File file) {
return client.putObject(req);
}

private PutObjectRequest preparePutObjectRequest(PutObjectRequest req, ObjectMetadata metadata, List<Tag> tags, String contentType) {
private PutObjectRequest preparePutObjectRequest(PutObjectRequest req, ObjectMetadata metadata, List<Tag> tags, String contentType, String storageClass) {
req.withMetadata(metadata);
if( cannedAcl != null ) {
req.withCannedAcl(cannedAcl);
Expand All @@ -190,6 +191,9 @@ private PutObjectRequest preparePutObjectRequest(PutObjectRequest req, ObjectMet
if( contentType!=null ) {
metadata.setContentType(contentType);
}
if( storageClass!=null ) {
req.setStorageClass(storageClass);
}
return req;
}

Expand Down Expand Up @@ -228,7 +232,7 @@ public void deleteObject(String bucket, String key) {
/**
* @see com.amazonaws.services.s3.AmazonS3Client#copyObject(CopyObjectRequest)
*/
public void copyObject(CopyObjectRequest req, List<Tag> tags, String contentType) {
public void copyObject(CopyObjectRequest req, List<Tag> tags, String contentType, String storageClass) {
if( tags !=null && tags.size()>0 ) {
req.setNewObjectTagging(new ObjectTagging(tags));
}
Expand All @@ -248,6 +252,9 @@ public void copyObject(CopyObjectRequest req, List<Tag> tags, String contentType
meta.setContentType(contentType);
req.setNewObjectMetadata(meta);
}
if( storageClass!=null ) {
req.setStorageClass(storageClass);
}
if( log.isTraceEnabled() ) {
log.trace("S3 CopyObject request {}", req);
}
Expand Down Expand Up @@ -362,7 +369,7 @@ public ObjectListing listNextBatchOfObjects(ObjectListing objectListing) {
}


public void multipartCopyObject(S3Path s3Source, S3Path s3Target, Long objectSize, S3MultipartOptions opts, List<Tag> tags, String contentType ) {
public void multipartCopyObject(S3Path s3Source, S3Path s3Target, Long objectSize, S3MultipartOptions opts, List<Tag> tags, String contentType, String storageClass ) {

final String sourceBucketName = s3Source.getBucket();
final String sourceObjectKey = s3Source.getKey();
Expand Down Expand Up @@ -392,6 +399,11 @@ public void multipartCopyObject(S3Path s3Source, S3Path s3Target, Long objectSiz
meta.setContentType(contentType);
initiateRequest.withObjectMetadata(meta);
}

if( storageClass!=null ) {
initiateRequest.setStorageClass(StorageClass.fromValue(storageClass));
}

InitiateMultipartUploadResult initResult = client.initiateMultipartUpload(initiateRequest);


Expand Down Expand Up @@ -585,7 +597,7 @@ public FileVisitResult visitFile(Path current, BasicFileAttributes attr) {
public void uploadFile(File source, S3Path target) {
PutObjectRequest req = new PutObjectRequest(target.getBucket(), target.getKey(), source);
ObjectMetadata metadata = new ObjectMetadata();
preparePutObjectRequest(req, metadata, target.getTagsList(), target.getContentType());
preparePutObjectRequest(req, metadata, target.getTagsList(), target.getContentType(), target.getStorageClass());
// initiate transfer
Upload upload = transferManager() .upload(req);
// await for completion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,11 +406,12 @@ else if ( exits )
private S3OutputStream createUploaderOutputStream( S3Path fileToUpload ) {
AmazonS3Client s3 = fileToUpload.getFileSystem().getClient();

final String storageClass = fileToUpload.getStorageClass()!=null ? fileToUpload.getStorageClass() : props.getProperty("upload_storage_class");
final S3MultipartOptions opts = props != null ? new S3MultipartOptions(props) : new S3MultipartOptions();
final S3ObjectId objectId = fileToUpload.toS3ObjectId();
S3OutputStream stream = new S3OutputStream(s3.getClient(), objectId, opts)
.setCannedAcl(s3.getCannedAcl())
.setStorageClass(props.getProperty("upload_storage_class"))
.setStorageClass(storageClass)
.setStorageEncryption(props.getProperty("storage_encryption"))
.setKmsKeyId(props.getProperty("storage_kms_key_id"))
.setContentType(fileToUpload.getContentType())
Expand Down Expand Up @@ -615,15 +616,16 @@ public void copy(Path source, Path target, CopyOption... options)
final long length = sourceObjMetadata.getContentLength();
final List<Tag> tags = ((S3Path) target).getTagsList();
final String contentType = ((S3Path) target).getContentType();
final String storageClass = ((S3Path) target).getStorageClass();

if( length <= maxSize ) {
CopyObjectRequest copyObjRequest = new CopyObjectRequest(s3Source.getBucket(), s3Source.getKey(),s3Target.getBucket(), s3Target.getKey());
log.trace("Copy file via copy object - source: source={}, target={}, tags={}", s3Source, s3Target, tags);
client.copyObject(copyObjRequest, tags, contentType);
log.trace("Copy file via copy object - source: source={}, target={}, tags={}, storageClass={}", s3Source, s3Target, tags, storageClass);
client.copyObject(copyObjRequest, tags, contentType, storageClass);
}
else {
log.trace("Copy file via multipart upload - source: source={}, target={}, tags={}", s3Source, s3Target, tags);
client.multipartCopyObject(s3Source, s3Target, length, opts, tags, contentType);
log.trace("Copy file via multipart upload - source: source={}, target={}, tags={}, storageClass={}", s3Source, s3Target, tags, storageClass);
client.multipartCopyObject(s3Source, s3Target, length, opts, tags, contentType, storageClass);
}
}

Expand Down
11 changes: 11 additions & 0 deletions plugins/nf-amazon/src/main/com/upplication/s3fs/S3Path.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ public class S3Path implements Path, TagAwareFile {

private String contentType;

private String storageClass;

/**
* path must be a string of the form "/{bucket}", "/{bucket}/{key}" or just
* "{key}".
Expand Down Expand Up @@ -555,6 +557,11 @@ public void setContentType(String type) {
this.contentType = type;
}

@Override
public void setStorageClass(String storageClass) {
this.storageClass = storageClass;
}

public List<Tag> getTagsList() {
// nothing found, just return
if( tags==null )
Expand All @@ -571,6 +578,10 @@ public String getContentType() {
return contentType;
}

public String getStorageClass() {
return storageClass;
}

// ~ helpers methods

private static Function<String, String> strip(final String ... strs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1309,6 +1309,59 @@ class AwsS3NioTest extends Specification implements AwsS3BaseSpec {
_ | 11 * 1024 * 1024
}

@Unroll
def 'should set file storage class' () {
given:
def TEXT = randomText(FILE_SIZE)
def folder = Files.createTempDirectory('test')
def file = Files.write(folder.resolve('foo.data'), TEXT.bytes)
and:
def bucket1 = createBucket()
def bucket2 = createBucket()

// upload a file to a remote bucket
when:
def target1 = s3path("s3://$bucket1/foo.data")
and:
target1.setStorageClass('REDUCED_REDUNDANCY')
def client = target1.getFileSystem().getClient()
and:
FileHelper.copyPath(file, target1)
// the file exist
then:
Files.exists(target1)
and:
client
.getObjectMetadata(target1.getBucket(), target1.getKey())
.getStorageClass() == 'REDUCED_REDUNDANCY'

// copy a file across buckets
when:
def target2 = s3path("s3://$bucket2/foo.data")
and:
target2.setStorageClass('STANDARD_IA')
and:
FileHelper.copyPath(target1, target2)
// the file exist
then:
Files.exists(target2)
client
.getObjectMetadata(target2.getBucket(), target2.getKey())
.getStorageClass() == 'STANDARD_IA'

cleanup:
deleteBucket(bucket1)
deleteBucket(bucket2)
folder?.deleteDir()

// check the limits in the file `amazon.properties`
// in the test resources
where:
_ | FILE_SIZE
_ | 50 * 1024
_ | 11 * 1024 * 1024
}

def 'should overwrite a file' () {
given:
def bucket1 = createBucket()
Expand Down

0 comments on commit 066f920

Please sign in to comment.