Skip to content

Commit

Permalink
feat: add support for s3 crt client
Browse files Browse the repository at this point in the history
  • Loading branch information
scrocquesel committed Jan 14, 2024
1 parent 5b28d07 commit 69daef1
Show file tree
Hide file tree
Showing 12 changed files with 429 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -342,12 +342,13 @@ private void createClientBuilders(
syntheticBeans.produce(SyntheticBeanBuildItem
.configure(syncClientBuilderClass)
.defaultBean()
.scope(ApplicationScoped.class)
.setRuntimeInit()
.scope(ApplicationScoped.class)
.createWith(otelRecorder.configure(syncClientBuilder))
.addInjectionPoint(ClassType.create(OpenTelemetry.class)).done());
} else {
syntheticBeans.produce(SyntheticBeanBuildItem.configure(syncClientBuilderClass)
.defaultBean()
.setRuntimeInit()
.scope(ApplicationScoped.class)
.runtimeValue(syncClientBuilder)
Expand All @@ -362,12 +363,13 @@ private void createClientBuilders(
syntheticBeans.produce(SyntheticBeanBuildItem
.configure(asyncClientBuilderClass)
.defaultBean()
.scope(ApplicationScoped.class)
.setRuntimeInit()
.scope(ApplicationScoped.class)
.createWith(otelRecorder.configure(asyncClientBuilder))
.addInjectionPoint(ClassType.create(OpenTelemetry.class)).done());
} else {
syntheticBeans.produce(SyntheticBeanBuildItem.configure(asyncClientBuilderClass)
.defaultBean()
.setRuntimeInit()
.scope(ApplicationScoped.class)
.runtimeValue(asyncClientBuilder)
Expand All @@ -379,6 +381,7 @@ private void createClientBuilders(
presignerBuilder = recorder.configurePresigner(presignerBuilder, awsConfigRuntime, sdkConfigRuntime,
configName());
syntheticBeans.produce(SyntheticBeanBuildItem.configure(presignerBuilderClass)
.defaultBean()
.setRuntimeInit()
.scope(ApplicationScoped.class)
.runtimeValue(presignerBuilder)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.quarkus.amazon.common.runtime;

import java.util.Map;

import jakarta.enterprise.context.spi.CreationalContext;

import io.quarkus.arc.BeanDestroyer;
import software.amazon.awssdk.utils.SdkAutoCloseable;

public class SdkAutoCloseableDestroyer implements BeanDestroyer<SdkAutoCloseable> {

@Override
public void destroy(SdkAutoCloseable instance, CreationalContext<SdkAutoCloseable> creationalContext,
Map<String, Object> params) {
instance.close();
}
}
17 changes: 17 additions & 0 deletions docs/modules/ROOT/pages/amazon-s3.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,23 @@ And add the following dependency to the application `pom.xml`:
</dependency>
----

If you want to use the https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/crt-based-s3-client.html[AWS CRT-based S3 client], add the `io.quarkus.amazon.s3.runtime.S3Crt` qualifier as follows:
[source,java]
----
@Inject
@S3Crt
S3AsyncClient s3;
----

And add the following dependency to the application `pom.xml`:
[source,xml]
----
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>aws-crt-client</artifactId>
</dependency>
----

== Configuration Reference

include::./includes/quarkus-amazon-s3.adoc[]
73 changes: 73 additions & 0 deletions docs/modules/ROOT/pages/includes/quarkus-amazon-s3.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1835,6 +1835,79 @@ endif::add-copy-button-to-env-var[]
--|boolean
|`true`


h|[[quarkus-amazon-s3_quarkus.s3.crt-aws-crt-based-s3-client-configurations]]link:#quarkus-amazon-s3_quarkus.s3.crt-aws-crt-based-s3-client-configurations[AWS CRT-based S3 client configurations]

h|Type
h|Default

a| [[quarkus-amazon-s3_quarkus.s3.crt.initial-read-buffer-size-in-bytes]]`link:#quarkus-amazon-s3_quarkus.s3.crt.initial-read-buffer-size-in-bytes[quarkus.s3.crt.initial-read-buffer-size-in-bytes]`


[.description]
--
Configure the starting buffer size the client will use to buffer the parts downloaded from S3.

ifdef::add-copy-button-to-env-var[]
Environment variable: env_var_with_copy_button:+++QUARKUS_S3_CRT_INITIAL_READ_BUFFER_SIZE_IN_BYTES+++[]
endif::add-copy-button-to-env-var[]
ifndef::add-copy-button-to-env-var[]
Environment variable: `+++QUARKUS_S3_CRT_INITIAL_READ_BUFFER_SIZE_IN_BYTES+++`
endif::add-copy-button-to-env-var[]
--|long
|`Equal to the resolved part size * 10`


a| [[quarkus-amazon-s3_quarkus.s3.crt.max-concurrency]]`link:#quarkus-amazon-s3_quarkus.s3.crt.max-concurrency[quarkus.s3.crt.max-concurrency]`


[.description]
--
Specifies the maximum number of S3 connections that should be established during a transfer.

ifdef::add-copy-button-to-env-var[]
Environment variable: env_var_with_copy_button:+++QUARKUS_S3_CRT_MAX_CONCURRENCY+++[]
endif::add-copy-button-to-env-var[]
ifndef::add-copy-button-to-env-var[]
Environment variable: `+++QUARKUS_S3_CRT_MAX_CONCURRENCY+++`
endif::add-copy-button-to-env-var[]
--|int
|


a| [[quarkus-amazon-s3_quarkus.s3.crt.minimum-part-size-in-bytes]]`link:#quarkus-amazon-s3_quarkus.s3.crt.minimum-part-size-in-bytes[quarkus.s3.crt.minimum-part-size-in-bytes]`


[.description]
--
Sets the minimum part size for transfer parts.

ifdef::add-copy-button-to-env-var[]
Environment variable: env_var_with_copy_button:+++QUARKUS_S3_CRT_MINIMUM_PART_SIZE_IN_BYTES+++[]
endif::add-copy-button-to-env-var[]
ifndef::add-copy-button-to-env-var[]
Environment variable: `+++QUARKUS_S3_CRT_MINIMUM_PART_SIZE_IN_BYTES+++`
endif::add-copy-button-to-env-var[]
--|long
|`8MB`


a| [[quarkus-amazon-s3_quarkus.s3.crt.target-throughput-in-gbps]]`link:#quarkus-amazon-s3_quarkus.s3.crt.target-throughput-in-gbps[quarkus.s3.crt.target-throughput-in-gbps]`


[.description]
--
The target throughput for transfer requests.

ifdef::add-copy-button-to-env-var[]
Environment variable: env_var_with_copy_button:+++QUARKUS_S3_CRT_TARGET_THROUGHPUT_IN_GBPS+++[]
endif::add-copy-button-to-env-var[]
ifndef::add-copy-button-to-env-var[]
Environment variable: `+++QUARKUS_S3_CRT_TARGET_THROUGHPUT_IN_GBPS+++`
endif::add-copy-button-to-env-var[]
--|double
|`10`

|===
ifndef::no-duration-note[]
[NOTE]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@
import java.io.ByteArrayOutputStream;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;

import org.jboss.logging.Logger;

import io.quarkus.amazon.s3.runtime.S3Crt;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.sync.RequestBody;
Expand All @@ -28,6 +31,7 @@
public class S3Resource {
private static final String SYNC_BUCKET = "sync-" + UUID.randomUUID().toString();
private static final String ASYNC_BUCKET = "async-" + UUID.randomUUID().toString();
private static final String CRT_ASYNC_BUCKET = "crt-async-" + UUID.randomUUID().toString();
private static final String SAMPLE_S3_OBJECT = "sample S3 object";

private static final Logger LOG = Logger.getLogger(S3Resource.class);
Expand All @@ -38,6 +42,10 @@ public class S3Resource {
@Inject
S3AsyncClient s3AsyncClient;

@Inject
@S3Crt
Instance<S3AsyncClient> s3AsyncClientWithS3CrtQualifierInstance;

@Inject
S3Presigner s3Presigner;

Expand All @@ -60,6 +68,33 @@ public CompletionStage<String> testAsyncS3() {
});
}

@GET
@Path("crt-async")
@Produces(TEXT_PLAIN)
public CompletionStage<String> testCrtAsyncS3() {
LOG.info("Testing Crt Async S3 client with bucket: " + CRT_ASYNC_BUCKET);
String keyValue = UUID.randomUUID().toString();

S3AsyncClient s3AsyncClientWithS3CrtQualifier = s3AsyncClientWithS3CrtQualifierInstance.get();

try {
return S3Utils.createBucketAsync(s3AsyncClientWithS3CrtQualifier, CRT_ASYNC_BUCKET)
.thenCompose(bucket -> s3AsyncClientWithS3CrtQualifier.putObject(
S3Utils.createPutRequest(CRT_ASYNC_BUCKET, keyValue),
AsyncRequestBody.fromString(SAMPLE_S3_OBJECT)))
.thenCompose(resp -> s3AsyncClientWithS3CrtQualifier.getObject(
S3Utils.createGetRequest(CRT_ASYNC_BUCKET, keyValue),
AsyncResponseTransformer.toBytes()))
.thenApply(resp -> resp.asUtf8String())
.exceptionally(th -> {
LOG.error("Error during async S3 operations", th.getCause());
return "ERROR";
});
} catch (IllegalStateException ex) {
return CompletableFuture.completedStage(ex.getMessage());
}
}

@GET
@Path("blocking")
@Produces(TEXT_PLAIN)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.quarkus.it.amazon;

import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.is;

import org.junit.jupiter.api.Test;
Expand All @@ -15,9 +16,14 @@ public void testS3Async() {
RestAssured.when().get("/test/s3/async").then().body(is("INTERCEPTED+sample S3 object"));
}

@Test
public void testS3CrtAsync() {
RestAssured.when().get("/test/s3/crt-async").then().body(anyOf(is("sample S3 object"),
is("The Crt S3AsyncClient is required but has not been detected/configured.")));
}

@Test
public void testS3Blocking() {
RestAssured.when().get("/test/s3/blocking").then().body(is("INTERCEPTED+sample S3 object"));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package io.quarkus.amazon.s3.deployment;

import java.util.Optional;
import java.util.function.BooleanSupplier;

import jakarta.enterprise.context.ApplicationScoped;

import org.jboss.jandex.ClassType;
import org.jboss.jandex.DotName;
import org.jboss.jandex.ParameterizedType;
import org.jboss.jandex.Type;

import io.quarkus.amazon.common.runtime.SdkAutoCloseableDestroyer;
import io.quarkus.amazon.s3.runtime.S3Crt;
import io.quarkus.amazon.s3.runtime.S3CrtRecorder;
import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.arc.deployment.BeanRegistrationPhaseBuildItem;
import io.quarkus.arc.deployment.SyntheticBeanBuildItem;
import io.quarkus.arc.processor.DotNames;
import io.quarkus.arc.processor.InjectionPointInfo;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.ExecutionTime;
import io.quarkus.deployment.annotations.Record;
import io.quarkus.runtime.RuntimeValue;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder;

public class S3CrtProcessor {

public static final DotName S3CRT = DotName.createSimple(S3Crt.class);

protected String configName() {
return "s3";
}

protected DotName asyncClientName() {
return DotName.createSimple(S3AsyncClient.class.getName());
}

@BuildStep(onlyIf = IsAmazonCrtS3ClientPresent.class)
AdditionalBeanBuildItem qualifiers() {
return AdditionalBeanBuildItem.unremovableOf(S3Crt.class);
}

@BuildStep(onlyIf = IsAmazonCrtS3ClientPresent.class)
@Record(ExecutionTime.RUNTIME_INIT)
void createS3CrtClientBuilders(S3CrtRecorder recorder,
BuildProducer<SyntheticBeanBuildItem> syntheticBeans,
BeanRegistrationPhaseBuildItem beanRegistrationPhase) {

Optional<DotName> asyncClassName = Optional.empty();

// Discover all clients injections in order to determine if crt async client
// is required
for (InjectionPointInfo injectionPoint : beanRegistrationPhase.getInjectionPoints()) {

if (null == injectionPoint.getRequiredQualifier(S3CRT)) {
continue;
}

Type injectedType = getInjectedType(injectionPoint);

if (asyncClientName().equals(injectedType.name())) {
asyncClassName = Optional.of(asyncClientName());
}
}

if (asyncClassName.isPresent()) {
RuntimeValue<S3CrtAsyncClientBuilder> asyncClientBuilder = recorder.getCrtAsyncClientBuilder(configName());
syntheticBeans.produce(SyntheticBeanBuildItem.configure(S3CrtAsyncClientBuilder.class)
.unremovable()
.setRuntimeInit()
.defaultBean()
.scope(ApplicationScoped.class)
.runtimeValue(asyncClientBuilder)
.done());

syntheticBeans.produce(SyntheticBeanBuildItem.configure(S3AsyncClient.class)
.unremovable()
.setRuntimeInit()
.scope(ApplicationScoped.class)
.addQualifier(S3Crt.class)
.createWith(recorder.getS3CrtAsyncClient())
.destroyer(SdkAutoCloseableDestroyer.class)
.addInjectionPoint(ClassType.create(S3CrtAsyncClientBuilder.class))
.done());
}
}

private Type getInjectedType(InjectionPointInfo injectionPoint) {
Type requiredType = injectionPoint.getRequiredType();
Type injectedType = requiredType;

if (DotNames.INSTANCE.equals(requiredType.name()) && requiredType instanceof ParameterizedType) {
injectedType = requiredType.asParameterizedType().arguments().get(0);
}

return injectedType;
}

public static final String AWS_S3_CRT = "software.amazon.awssdk.crt.s3.S3Client";

public static class IsAmazonCrtS3ClientPresent implements BooleanSupplier {
@Override
public boolean getAsBoolean() {
try {
Class.forName(AWS_S3_CRT);
return true;
} catch (Exception e) {
return false;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,10 @@ public interface S3Config {
*/
@ConfigDocSection
AsyncHttpClientConfig asyncClient();

/**
* AWS CRT-based S3 client configurations
*/
@ConfigDocSection
S3CrtConfig crt();
}

0 comments on commit 69daef1

Please sign in to comment.