From a491d994de0e9eb537de6c9a58ad268971a0fe4f Mon Sep 17 00:00:00 2001 From: thomasva Date: Mon, 18 Mar 2024 12:19:59 +0100 Subject: [PATCH] Add injection of custom executor service to S3Base supplyAsync calls --- .DS_Store | Bin 0 -> 8196 bytes .../main/java/io/minio/MinioAsyncClient.java | 29 +++++++++++------ api/src/main/java/io/minio/S3Base.java | 30 +++++++++++++++--- 3 files changed, 46 insertions(+), 13 deletions(-) create mode 100644 .DS_Store diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..59aac7e10d7ff8dff4813f85871f6505fd9e2125 GIT binary patch literal 8196 zcmeHMy^9k;6o0dqi{bp>0v6XOTc{wM;(2h5AZv(?AQp$kbzVL%ki89O0#*{N1TD1i z;~KTK^B*`AL~D!F#!9Xh?8L*u-+W~EO?Efw)K)!xh`#nCS0quy+G*-=HfU8pWjz8c@xOPUU-P1n01Jw0`&J zC&giGoG?n50KJUvPcNeuLW8-i>`6!Ez{|>=u z7W#C+rAg}+7dpOXdpQqp@5uJD=E3@N4%rARKB-qYC8b0N4x6~^QHQe-;mgNhk-~5; zD>uJ=F`8uHl3ALB4MbF@+g5b%d>_W*&s~0 copyObject(CopyObjectArgs args) args.validateSse(this.baseUrl); return CompletableFuture.supplyAsync( - () -> args.source().offset() != null && args.source().length() != null) + () -> args.source().offset() != null && args.source().length() != null, executorService) .thenCompose( condition -> { if (condition) { @@ -667,7 +671,7 @@ public CompletableFuture composeObject(ComposeObjectArgs ar Multimap headers = newMultimap(args.extraHeaders()); headers.putAll(args.genHeaders()); return headers; - }) + }, executorService) .thenCompose( headers -> { try { @@ -705,7 +709,7 @@ public CompletableFuture composeObject(ComposeObjectArgs ar CompletableFuture.supplyAsync( () -> { return new Part[partCount[0]]; - }); + }, executorService); for (ComposeSource src : sources) { long size = 0; try { @@ -801,8 +805,8 @@ public CompletableFuture composeObject(ComposeObjectArgs ar throw new CompletionException(e); } }); - offset += length; - size -= length; + offset = startBytes; + size -= (endBytes - startBytes); } } @@ -3155,7 +3159,7 @@ public CompletableFuture uploadSnowballObjects( } } return baos; - }) + }, executorService) .thenCompose( baos -> { Multimap headers = newMultimap(args.extraHeaders()); @@ -3223,6 +3227,7 @@ public static final class Builder { private String region; private Provider provider; private OkHttpClient httpClient; + private ExecutorService executorService = ForkJoinPool.commonPool(); private void setAwsInfo(String host, boolean https) { this.awsS3Prefix = null; @@ -3329,6 +3334,11 @@ public Builder httpClient(OkHttpClient httpClient) { return this; } + public Builder executorService(ExecutorService executorService) { + this.executorService = executorService; + return this; + } + public MinioAsyncClient build() { HttpUtils.validateNotNull(this.baseUrl, "endpoint"); @@ -3357,7 +3367,8 @@ public MinioAsyncClient build() { region, provider, httpClient, - closeHttpClient); + closeHttpClient, + executorService()); } } } diff --git a/api/src/main/java/io/minio/S3Base.java b/api/src/main/java/io/minio/S3Base.java index 18115370e..e6616b68d 100644 --- a/api/src/main/java/io/minio/S3Base.java +++ b/api/src/main/java/io/minio/S3Base.java @@ -84,6 +84,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -137,6 +138,7 @@ public abstract class S3Base implements AutoCloseable { protected Provider provider; protected OkHttpClient httpClient; protected boolean closeHttpClient; + protected final ExecutorService executorService; /** @deprecated This method is no longer supported. */ @Deprecated @@ -161,6 +163,20 @@ protected S3Base( false); } + protected S3Base( + HttpUrl baseUrl, + String awsS3Prefix, + String awsDomainSuffix, + boolean awsDualstack, + boolean useVirtualStyle, + String region, + Provider provider, + OkHttpClient httpClient, + boolean closeHttpClient) { + this(baseUrl, awsS3Prefix, awsDomainSuffix, awsDualstack, useVirtualStyle, region, provider, httpClient, closeHttpClient); + this.executorService = ForkJoinPool.commonPool(); + } + protected S3Base( HttpUrl baseUrl, String awsS3Prefix, @@ -170,7 +186,8 @@ protected S3Base( String region, Provider provider, OkHttpClient httpClient, - boolean closeHttpClient) { + boolean closeHttpClient, + ExecutorService executorService) { this.baseUrl = baseUrl; this.awsS3Prefix = awsS3Prefix; this.awsDomainSuffix = awsDomainSuffix; @@ -180,6 +197,7 @@ protected S3Base( this.provider = provider; this.httpClient = httpClient; this.closeHttpClient = closeHttpClient; + this.executorService = executorService; } /** @deprecated This method is no longer supported. */ @@ -221,6 +239,7 @@ protected S3Base(S3Base client) { this.provider = client.provider; this.httpClient = client.httpClient; this.closeHttpClient = client.closeHttpClient; + this.executorService = client.executorService; } /** Check whether argument is valid or not. */ @@ -1163,7 +1182,8 @@ protected CompletableFuture calculatePartCountAsync(List long[] objectSize = {0}; int index = 0; - CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> 0); + CompletableFuture completableFuture = + CompletableFuture.supplyAsync(() -> 0, executorService); for (ComposeSource src : sources) { index++; final int i = index; @@ -2882,7 +2902,8 @@ private CompletableFuture putMultipartObjectAsync( } } return response; - }); + }, + executorService); } /** @@ -2928,7 +2949,8 @@ protected CompletableFuture putObjectAsync( } catch (NoSuchAlgorithmException | IOException e) { throw new CompletionException(e); } - }) + }, + executorService) .thenCompose( partSource -> { try {