Skip to content

Commit

Permalink
Manage Max In-Flux Data During Uploads (#60)
Browse files Browse the repository at this point in the history
* Bump com.github.tomakehurst:wiremock-jre8 from 2.35.0 to 2.35.1

Bumps [com.github.tomakehurst:wiremock-jre8](https://github.com/wiremock/wiremock) from 2.35.0 to 2.35.1.
- [Release notes](https://github.com/wiremock/wiremock/releases)
- [Commits](wiremock/wiremock@2.35.0...2.35.1)

---
updated-dependencies:
- dependency-name: com.github.tomakehurst:wiremock-jre8
  dependency-type: direct:development
...

Signed-off-by: dependabot[bot] <support@github.com>

* Bump ch.qos.logback:logback-classic from 1.3.5 to 1.3.12

Bumps [ch.qos.logback:logback-classic](https://github.com/qos-ch/logback) from 1.3.5 to 1.3.12.
- [Commits](qos-ch/logback@v_1.3.5...v_1.3.12)

---
updated-dependencies:
- dependency-name: ch.qos.logback:logback-classic
  dependency-type: direct:development
...

Signed-off-by: dependabot[bot] <support@github.com>

* Added configuration to manage max allowed in-flux data during uploads - protecting the heap

* Spotless adjustments

* Dont forget to incrment current count of influx bytes

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
  • Loading branch information
knighto82 and dependabot[bot] committed Apr 9, 2024
1 parent 797567e commit ef7fcdc
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 2 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.3.5</version> <!-- logback >1.3.x does not support JDK8 -->
<version>1.3.12</version> <!-- logback >1.3.x does not support JDK8 -->
<scope>test</scope>
</dependency>

Expand All @@ -96,7 +96,7 @@
<dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock-jre8</artifactId>
<version>2.35.0</version>
<version>2.35.1</version>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ public class FusionConfiguration {
@Builder.Default
int uploadPartSize = 8;

/**
* Max in flux data to be read at a given time. Defaults to 500MB.
* If a value such as 1gb is required, then client would set this value to 1000;
*/
@Builder.Default
long maxInFluxDataSize = 500;

/**
* Size of Thread-Pool to be used for uploading chunks of a multipart file
* Defaults to number of available processors.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.CompletableFuture;
Expand All @@ -32,11 +33,16 @@
import java.util.concurrent.Executors;
import lombok.Builder;
import lombok.Getter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Builder
@Getter
public class FusionAPIUploadOperations implements APIUploadOperations {

private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private static final String UPLOAD_FAILED_EXCEPTION_MSG =
"Exception encountered while attempting to upload part, please try again";

Expand Down Expand Up @@ -76,6 +82,12 @@ public class FusionAPIUploadOperations implements APIUploadOperations {
*/
int uploadThreadPoolSize;

/**
* Max size of in-flux data that can be read at a given time.
* See {@link FusionConfiguration} for default values.
*/
long maxInFluxDataSize;

/**
* Call the API upload endpoint to load a distribution
*
Expand Down Expand Up @@ -206,28 +218,39 @@ protected MultipartTransferContext callAPIToInitiateMultiPartUpload(UploadReques
protected MultipartTransferContext callAPIToUploadParts(MultipartTransferContext mtx, UploadRequest ur) {

int chunkSize = uploadPartSize * (1024 * 1024);
long maxInFluxBytes = maxInFluxDataSize * (1024L * 1024L);

byte[] buffer = new byte[chunkSize];
int partCnt = 1;
int totalBytes = 0;
int inFluxBytes = 0;

ExecutorService executor = Executors.newFixedThreadPool(uploadThreadPoolSize);
try {
List<CompletableFuture<Void>> futures = new ArrayList<>();

int bytesRead;
while ((bytesRead = ur.getData().read(buffer)) != -1) {

logger.debug(
"Creating upload task for part number {}, bytes read for this part {}", partCnt, bytesRead);

final int currentPartCnt = partCnt;
final int currentBytesRead = bytesRead;
byte[] taskBuffer = Arrays.copyOf(buffer, bytesRead);

if (inFluxBytes > maxInFluxBytes) {
inFluxBytes = easeDataPressure(futures);
}

futures.add(CompletableFuture.runAsync(
() -> mtx.partUploaded(
callAPIToUploadPart(mtx, ur, taskBuffer, currentBytesRead, currentPartCnt)),
executor));

partCnt++;
totalBytes += bytesRead;
inFluxBytes += bytesRead;
}

for (CompletableFuture<Void> future : futures) {
Expand All @@ -243,6 +266,18 @@ protected MultipartTransferContext callAPIToUploadParts(MultipartTransferContext
return mtx.transferred(chunkSize, totalBytes, partCnt);
}

private int easeDataPressure(List<CompletableFuture<Void>> futures)
throws InterruptedException, ExecutionException {

logger.debug("Reached max in-flux bytes - easing pressure");
for (CompletableFuture<Void> future : futures) {
future.get();
}
logger.debug("Max in-flux bytes handled - pressure eased");
futures.clear();
return 0;
}

protected UploadedPartContext callAPIToUploadPart(
MultipartTransferContext mtx, UploadRequest ur, byte[] part, int read, int partNo) {

Expand Down Expand Up @@ -348,6 +383,7 @@ public static class FusionAPIUploadOperationsBuilder {
int singlePartUploadSizeLimit;
int uploadPartSize;
int uploadThreadPoolSize;
long maxInFluxDataSize;

public FusionAPIUploadOperationsBuilder configuration(FusionConfiguration configuration) {
this.configuration = configuration;
Expand All @@ -371,6 +407,12 @@ private FusionAPIUploadOperationsBuilder uploadThreadPoolSize(int uploadThreadPo
this.uploadThreadPoolSize = uploadThreadPoolSize;
return this;
}

@SuppressWarnings("PIT")
private FusionAPIUploadOperationsBuilder maxInFluxDataSize(long maxInFluxDataSize) {
this.maxInFluxDataSize = maxInFluxDataSize;
return this;
}
}

private static class CustomFusionAPIUploadOperationsBuilder extends FusionAPIUploadOperationsBuilder {
Expand All @@ -379,6 +421,7 @@ public FusionAPIUploadOperations build() {
this.singlePartUploadSizeLimit = configuration.getSinglePartUploadSizeLimit();
this.uploadPartSize = configuration.getUploadPartSize();
this.uploadThreadPoolSize = configuration.getUploadThreadPoolSize();
this.maxInFluxDataSize = configuration.getMaxInFluxDataSize();

if (Objects.isNull(digestProducer)) {
this.digestProducer = AlgoSpecificDigestProducer.builder()
Expand Down

0 comments on commit ef7fcdc

Please sign in to comment.