Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
### 2.2.0 / 2025-10-27

- Prevent the SSE client from reconnecting after `assembly_finished`, eliminating spurious `assembly_error` callbacks and timeouts.
- Add regression coverage with a targeted unit test and a live SSE integration test executed via the Docker harness.
- Confirm the new tests run in CI alongside the existing Gradle `check` workflow.

### 2.1.0 / 2025-10-15

- Added support for external signature generation via `SignatureProvider` interface ([#19](https://github.com/transloadit/android-sdk/issues/19))
Expand Down
33 changes: 31 additions & 2 deletions src/main/java/com/transloadit/sdk/EventsourceRunnable.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ protected void handleMessageEvent(MessageEvent messageEvent) {
String eventName = messageEvent.getEventName();
String data = messageEvent.getData();

if (assemblyFinished) {
shutdownEventSource();
return;
}

// Check if the event is a message event without
if (eventName.equals("message")) {
switch (data) {
Expand All @@ -110,8 +115,9 @@ protected void handleMessageEvent(MessageEvent messageEvent) {
assemblyListener.onAssemblyFinished(transloadit.getAssemblyByUrl(response.getSslUrl()));
} catch (RequestException | LocalOperationException e) {
assemblyListener.onError(e);
} finally {
shutdownEventSource();
}
this.eventSource.close();
break;
case "assembly_upload_meta_data_extracted":
assemblyListener.onMetadataExtracted();
Expand Down Expand Up @@ -139,8 +145,12 @@ protected void handleMessageEvent(MessageEvent messageEvent) {
break;

case "assembly_error":
if (assemblyFinished) {
shutdownEventSource();
break;
}
assemblyListener.onError(new RequestException(data));
this.eventSource.close();
shutdownEventSource();
break;

case "assembly_execution_progress":
Expand All @@ -167,10 +177,29 @@ protected void handleStartedEvent(StartedEvent startedEvent) {
}

protected void handleFaultEvent(FaultEvent faultEvent) {
if (assemblyFinished) {
shutdownEventSource();
}
// Debug output, uncomment if needed
// String data = faultEvent.toString();
// System.out.printf("Fault: %s\n", data);
// System.out.println("Starting Over");
}

private void shutdownEventSource() {
if (this.eventSource == null) {
return;
}
try {
this.eventSource.stop();
} catch (Exception ignore) {
// Ignore cleanup exceptions
}
try {
this.eventSource.close();
} catch (Exception ignore) {
// Ignore cleanup exceptions
}
}

}
2 changes: 1 addition & 1 deletion src/main/resources/java-sdk-version/version.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
versionNumber='2.1.0'
versionNumber='2.2.0'
108 changes: 108 additions & 0 deletions src/test/java/com/transloadit/sdk/EventsourceRunnableTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package com.transloadit.sdk;

import com.launchdarkly.eventsource.ConnectStrategy;
import com.launchdarkly.eventsource.ErrorStrategy;
import com.launchdarkly.eventsource.MessageEvent;
import com.launchdarkly.eventsource.RetryDelayStrategy;
import com.transloadit.sdk.response.AssemblyResponse;
import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.jupiter.api.Test;

import java.net.URI;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class EventsourceRunnableTest {

@Test
void assemblyErrorAfterFinishedDoesNotNotifyListener() throws Exception {
Transloadit transloadit = mock(Transloadit.class);
AssemblyResponse initialResponse = mock(AssemblyResponse.class);
AssemblyResponse finalResponse = mock(AssemblyResponse.class);

when(initialResponse.getSslUrl()).thenReturn("https://example.com/assemblies/123");
when(transloadit.getAssemblyByUrl(anyString())).thenReturn(finalResponse);
when(finalResponse.json()).thenReturn(new JSONObject().put("ok", "ASSEMBLY_COMPLETED"));

ConnectStrategy connectStrategy = ConnectStrategy.http(URI.create("http://localhost/sse"));
RetryDelayStrategy retryStrategy = RetryDelayStrategy.defaultStrategy();
ErrorStrategy errorStrategy = ErrorStrategy.alwaysContinue();

RecordingListener listener = new RecordingListener();

EventsourceRunnable runnable = new EventsourceRunnable(
transloadit,
initialResponse,
listener,
connectStrategy,
retryStrategy,
errorStrategy,
false
);

MessageEvent finishedEvent = new MessageEvent("assembly_finished");
MessageEvent errorEvent = new MessageEvent("assembly_error", "{}", null, null);
runnable.handleMessageEvent(finishedEvent);
runnable.handleMessageEvent(errorEvent);

assertTrue(listener.finishedCalled.get(), "Expected assembly_finished to notify listener");
assertFalse(listener.errorCalled.get(), "Unexpected error callback after completion");
assertNotNull(listener.finishedResponse.get(), "Final response missing");
assertEquals(finalResponse, listener.finishedResponse.get());
}

private static final class RecordingListener implements AssemblyListener {
private final AtomicBoolean finishedCalled = new AtomicBoolean(false);
private final AtomicBoolean errorCalled = new AtomicBoolean(false);
private final AtomicReference<AssemblyResponse> finishedResponse = new AtomicReference<>();

@Override
public void onAssemblyFinished(AssemblyResponse response) {
finishedCalled.set(true);
finishedResponse.set(response);
}

@Override
public void onError(Exception error) {
errorCalled.set(true);
}

@Override
public void onMetadataExtracted() {
}

@Override
public void onAssemblyUploadFinished() {
}

@Override
public void onFileUploadFinished(JSONObject uploadInformation) {
}

@Override
public void onFileUploadPaused(String name) {
}

@Override
public void onFileUploadResumed(String name) {
}

@Override
public void onFileUploadProgress(long uploadedBytes, long totalBytes) {
}

@Override
public void onAssemblyProgress(JSONObject progress) {
}

@Override
public void onAssemblyResultFinished(JSONArray result) {
}
}
}
3 changes: 2 additions & 1 deletion src/test/java/com/transloadit/sdk/RequestTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,11 @@ private String extractMultipartField(String body, String fieldName) {
public void get() throws Exception {
request.get("/foo");

String expectedClientHeader = transloadit.getVersionInfo();
mockServerClient.verify(HttpRequest.request()
.withPath("/foo")
.withMethod("GET")
.withHeader("Transloadit-Client", "java-sdk:2.1.0"));
.withHeader("Transloadit-Client", expectedClientHeader));

}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package com.transloadit.sdk.integration;

import com.transloadit.sdk.Assembly;
import com.transloadit.sdk.AssemblyListener;
import com.transloadit.sdk.Transloadit;
import com.transloadit.sdk.response.AssemblyResponse;
import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.jupiter.api.Assertions.*;

class AssemblySseIntegrationTest {

@Test
void sseStreamShouldCloseWithoutErrorsAfterAssemblyFinished() throws Exception {
String key = System.getenv("TRANSLOADIT_KEY");
String secret = System.getenv("TRANSLOADIT_SECRET");
Assumptions.assumeTrue(key != null && !key.trim().isEmpty(), "TRANSLOADIT_KEY env var required");
Assumptions.assumeTrue(secret != null && !secret.trim().isEmpty(), "TRANSLOADIT_SECRET env var required");

Transloadit client = new Transloadit(key, secret);
Assembly assembly = client.newAssembly();

Path tempFile = createTempUpload();
try {
assembly.addFile(tempFile.toFile(), "file");

Map<String, Object> resizeStep = new HashMap<>();
resizeStep.put("use", ":original");
resizeStep.put("width", 64);
resizeStep.put("height", 64);
resizeStep.put("resize_strategy", "fit");
assembly.addStep("resize", "/image/resize", resizeStep);

AtomicReference<AssemblyResponse> finishedResponse = new AtomicReference<>();
CompletableFuture<Void> finishedFuture = new CompletableFuture<>();
CompletableFuture<Exception> errorFuture = new CompletableFuture<>();

AssemblyListener listener = new AssemblyListener() {
@Override
public void onAssemblyFinished(AssemblyResponse response) {
System.out.println("[AssemblySseIntegrationTest] assembly_finished event");
finishedResponse.set(response);
finishedFuture.complete(null);
}

@Override
public void onError(Exception error) {
System.out.println("[AssemblySseIntegrationTest] SSE error: " + error);
errorFuture.complete(error);
finishedFuture.completeExceptionally(error);
}

@Override
public void onMetadataExtracted() {
}

@Override
public void onAssemblyUploadFinished() {
}

@Override
public void onFileUploadFinished(JSONObject uploadInformation) {
}

@Override
public void onFileUploadPaused(String name) {
}

@Override
public void onFileUploadResumed(String name) {
}

@Override
public void onFileUploadProgress(long uploadedBytes, long totalBytes) {
}

@Override
public void onAssemblyProgress(JSONObject progress) {
}

@Override
public void onAssemblyResultFinished(JSONArray result) {
}
};

assembly.setAssemblyListener(listener);

AssemblyResponse initialResponse = assembly.save(true);
assertNotNull(initialResponse.getId(), "Assembly ID should be present");

try {
finishedFuture.get(5, TimeUnit.MINUTES);
} catch (ExecutionException executionException) {
Throwable cause = executionException.getCause();
if (cause instanceof Exception) {
throw (Exception) cause;
}
throw executionException;
}

AssemblyResponse completed = finishedResponse.get();
assertNotNull(completed, "Assembly finished response missing");
assertTrue(completed.isFinished(), "Assembly should be finished");
assertEquals("ASSEMBLY_COMPLETED", completed.json().optString("ok"));

try {
Exception unexpected = errorFuture.get(30, TimeUnit.SECONDS);
fail("Unexpected SSE error after completion: " + unexpected);
} catch (TimeoutException ignore) {
// expected: no error surfaced after assembly finished
}
} finally {
try {
Files.deleteIfExists(tempFile);
} catch (IOException ignore) {
}
}
}

private static Path createTempUpload() throws IOException {
Path file = Files.createTempFile("transloadit-sse-test", ".jpg");
URL source = new URL("https://demos.transloadit.com/inputs/chameleon.jpg");
try (InputStream input = source.openStream(); OutputStream output = Files.newOutputStream(file)) {
byte[] buffer = new byte[8192];
int read;
while ((read = input.read(buffer)) != -1) {
output.write(buffer, 0, read);
}
}
return file;
}
}