From 84904d843860a99039b66ba424e271a37ada5571 Mon Sep 17 00:00:00 2001 From: Kevin van Zonneveld Date: Mon, 27 Oct 2025 16:59:38 +0100 Subject: [PATCH 1/4] Stop SSE reconnect after assembly finishes --- .../transloadit/sdk/EventsourceRunnable.java | 33 +++- .../sdk/EventsourceRunnableTest.java | 108 +++++++++++++ .../AssemblySseIntegrationTest.java | 149 ++++++++++++++++++ 3 files changed, 288 insertions(+), 2 deletions(-) create mode 100644 src/test/java/com/transloadit/sdk/EventsourceRunnableTest.java create mode 100644 src/test/java/com/transloadit/sdk/integration/AssemblySseIntegrationTest.java diff --git a/src/main/java/com/transloadit/sdk/EventsourceRunnable.java b/src/main/java/com/transloadit/sdk/EventsourceRunnable.java index acb066c..c8fad74 100644 --- a/src/main/java/com/transloadit/sdk/EventsourceRunnable.java +++ b/src/main/java/com/transloadit/sdk/EventsourceRunnable.java @@ -101,6 +101,11 @@ protected void handleMessageEvent(MessageEvent messageEvent) { String eventName = messageEvent.getEventName(); String data = messageEvent.getData(); + if (assemblyFinished) { + shutdownEventSource(); + return; + } + // Check if the event is a message event without if (eventName.equals("message")) { switch (data) { @@ -110,8 +115,9 @@ protected void handleMessageEvent(MessageEvent messageEvent) { assemblyListener.onAssemblyFinished(transloadit.getAssemblyByUrl(response.getSslUrl())); } catch (RequestException | LocalOperationException e) { assemblyListener.onError(e); + } finally { + shutdownEventSource(); } - this.eventSource.close(); break; case "assembly_upload_meta_data_extracted": assemblyListener.onMetadataExtracted(); @@ -139,8 +145,12 @@ protected void handleMessageEvent(MessageEvent messageEvent) { break; case "assembly_error": + if (assemblyFinished) { + shutdownEventSource(); + break; + } assemblyListener.onError(new RequestException(data)); - this.eventSource.close(); + shutdownEventSource(); break; case "assembly_execution_progress": @@ -167,10 +177,29 @@ protected void handleStartedEvent(StartedEvent startedEvent) { } protected void handleFaultEvent(FaultEvent faultEvent) { + if (assemblyFinished) { + shutdownEventSource(); + } // Debug output, uncomment if needed // String data = faultEvent.toString(); // System.out.printf("Fault: %s\n", data); // System.out.println("Starting Over"); } + private void shutdownEventSource() { + if (this.eventSource == null) { + return; + } + try { + this.eventSource.stop(); + } catch (Exception ignore) { + // Ignore cleanup exceptions + } + try { + this.eventSource.close(); + } catch (Exception ignore) { + // Ignore cleanup exceptions + } + } + } diff --git a/src/test/java/com/transloadit/sdk/EventsourceRunnableTest.java b/src/test/java/com/transloadit/sdk/EventsourceRunnableTest.java new file mode 100644 index 0000000..e592c6b --- /dev/null +++ b/src/test/java/com/transloadit/sdk/EventsourceRunnableTest.java @@ -0,0 +1,108 @@ +package com.transloadit.sdk; + +import com.launchdarkly.eventsource.ConnectStrategy; +import com.launchdarkly.eventsource.ErrorStrategy; +import com.launchdarkly.eventsource.MessageEvent; +import com.launchdarkly.eventsource.RetryDelayStrategy; +import com.transloadit.sdk.response.AssemblyResponse; +import org.json.JSONArray; +import org.json.JSONObject; +import org.junit.jupiter.api.Test; + +import java.net.URI; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class EventsourceRunnableTest { + + @Test + void assemblyErrorAfterFinishedDoesNotNotifyListener() throws Exception { + Transloadit transloadit = mock(Transloadit.class); + AssemblyResponse initialResponse = mock(AssemblyResponse.class); + AssemblyResponse finalResponse = mock(AssemblyResponse.class); + + when(initialResponse.getSslUrl()).thenReturn("https://example.com/assemblies/123"); + when(transloadit.getAssemblyByUrl(anyString())).thenReturn(finalResponse); + when(finalResponse.json()).thenReturn(new JSONObject().put("ok", "ASSEMBLY_COMPLETED")); + + ConnectStrategy connectStrategy = ConnectStrategy.http(URI.create("http://localhost/sse")); + RetryDelayStrategy retryStrategy = RetryDelayStrategy.defaultStrategy(); + ErrorStrategy errorStrategy = ErrorStrategy.alwaysContinue(); + + RecordingListener listener = new RecordingListener(); + + EventsourceRunnable runnable = new EventsourceRunnable( + transloadit, + initialResponse, + listener, + connectStrategy, + retryStrategy, + errorStrategy, + false + ); + + MessageEvent finishedEvent = new MessageEvent("assembly_finished"); + MessageEvent errorEvent = new MessageEvent("assembly_error", "{}", null, null); + runnable.handleMessageEvent(finishedEvent); + runnable.handleMessageEvent(errorEvent); + + assertTrue(listener.finishedCalled.get(), "Expected assembly_finished to notify listener"); + assertFalse(listener.errorCalled.get(), "Unexpected error callback after completion"); + assertNotNull(listener.finishedResponse.get(), "Final response missing"); + assertEquals(finalResponse, listener.finishedResponse.get()); + } + + private static final class RecordingListener implements AssemblyListener { + private final AtomicBoolean finishedCalled = new AtomicBoolean(false); + private final AtomicBoolean errorCalled = new AtomicBoolean(false); + private final AtomicReference finishedResponse = new AtomicReference<>(); + + @Override + public void onAssemblyFinished(AssemblyResponse response) { + finishedCalled.set(true); + finishedResponse.set(response); + } + + @Override + public void onError(Exception error) { + errorCalled.set(true); + } + + @Override + public void onMetadataExtracted() { + } + + @Override + public void onAssemblyUploadFinished() { + } + + @Override + public void onFileUploadFinished(JSONObject uploadInformation) { + } + + @Override + public void onFileUploadPaused(String name) { + } + + @Override + public void onFileUploadResumed(String name) { + } + + @Override + public void onFileUploadProgress(long uploadedBytes, long totalBytes) { + } + + @Override + public void onAssemblyProgress(JSONObject progress) { + } + + @Override + public void onAssemblyResultFinished(JSONArray result) { + } + } +} diff --git a/src/test/java/com/transloadit/sdk/integration/AssemblySseIntegrationTest.java b/src/test/java/com/transloadit/sdk/integration/AssemblySseIntegrationTest.java new file mode 100644 index 0000000..de860ca --- /dev/null +++ b/src/test/java/com/transloadit/sdk/integration/AssemblySseIntegrationTest.java @@ -0,0 +1,149 @@ +package com.transloadit.sdk.integration; + +import com.transloadit.sdk.Assembly; +import com.transloadit.sdk.AssemblyListener; +import com.transloadit.sdk.Transloadit; +import com.transloadit.sdk.response.AssemblyResponse; +import org.json.JSONArray; +import org.json.JSONObject; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.*; + +class AssemblySseIntegrationTest { + + @Test + void sseStreamShouldCloseWithoutErrorsAfterAssemblyFinished() throws Exception { + String key = System.getenv("TRANSLOADIT_KEY"); + String secret = System.getenv("TRANSLOADIT_SECRET"); + Assumptions.assumeTrue(key != null && !key.trim().isEmpty(), "TRANSLOADIT_KEY env var required"); + Assumptions.assumeTrue(secret != null && !secret.trim().isEmpty(), "TRANSLOADIT_SECRET env var required"); + + Transloadit client = new Transloadit(key, secret); + Assembly assembly = client.newAssembly(); + + Path tempFile = createTempUpload(); + try { + assembly.addFile(tempFile.toFile(), "file"); + + Map resizeStep = new HashMap<>(); + resizeStep.put("use", ":original"); + resizeStep.put("width", 64); + resizeStep.put("height", 64); + resizeStep.put("resize_strategy", "fit"); + assembly.addStep("resize", "/image/resize", resizeStep); + + AtomicReference finishedResponse = new AtomicReference<>(); + CompletableFuture finishedFuture = new CompletableFuture<>(); + CompletableFuture errorFuture = new CompletableFuture<>(); + + AssemblyListener listener = new AssemblyListener() { + @Override + public void onAssemblyFinished(AssemblyResponse response) { + System.out.println("[AssemblySseIntegrationTest] assembly_finished event"); + finishedResponse.set(response); + finishedFuture.complete(null); + } + + @Override + public void onError(Exception error) { + System.out.println("[AssemblySseIntegrationTest] SSE error: " + error); + errorFuture.complete(error); + finishedFuture.completeExceptionally(error); + } + + @Override + public void onMetadataExtracted() { + } + + @Override + public void onAssemblyUploadFinished() { + } + + @Override + public void onFileUploadFinished(JSONObject uploadInformation) { + } + + @Override + public void onFileUploadPaused(String name) { + } + + @Override + public void onFileUploadResumed(String name) { + } + + @Override + public void onFileUploadProgress(long uploadedBytes, long totalBytes) { + } + + @Override + public void onAssemblyProgress(JSONObject progress) { + } + + @Override + public void onAssemblyResultFinished(JSONArray result) { + } + }; + + assembly.setAssemblyListener(listener); + + AssemblyResponse initialResponse = assembly.save(true); + assertNotNull(initialResponse.getId(), "Assembly ID should be present"); + + try { + finishedFuture.get(5, TimeUnit.MINUTES); + } catch (ExecutionException executionException) { + Throwable cause = executionException.getCause(); + if (cause instanceof Exception) { + throw (Exception) cause; + } + throw executionException; + } + + AssemblyResponse completed = finishedResponse.get(); + assertNotNull(completed, "Assembly finished response missing"); + assertTrue(completed.isFinished(), "Assembly should be finished"); + assertEquals("ASSEMBLY_COMPLETED", completed.json().optString("ok")); + + try { + Exception unexpected = errorFuture.get(30, TimeUnit.SECONDS); + fail("Unexpected SSE error after completion: " + unexpected); + } catch (TimeoutException ignore) { + // expected: no error surfaced after assembly finished + } + } finally { + try { + Files.deleteIfExists(tempFile); + } catch (IOException ignore) { + } + } + } + + private static Path createTempUpload() throws IOException { + Path file = Files.createTempFile("transloadit-sse-test", ".jpg"); + URL source = new URL("https://demos.transloadit.com/inputs/chameleon.jpg"); + try (InputStream input = source.openStream(); OutputStream output = Files.newOutputStream(file)) { + byte[] buffer = new byte[8192]; + int read; + while ((read = input.read(buffer)) != -1) { + output.write(buffer, 0, read); + } + } + return file; + } +} From 9176afbdb5de3c9608ae78c67c9b62ca8c17c25c Mon Sep 17 00:00:00 2001 From: Kevin van Zonneveld Date: Mon, 27 Oct 2025 17:03:21 +0100 Subject: [PATCH 2/4] bump version --- src/main/resources/java-sdk-version/version.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/resources/java-sdk-version/version.properties b/src/main/resources/java-sdk-version/version.properties index ac60fcd..51be057 100644 --- a/src/main/resources/java-sdk-version/version.properties +++ b/src/main/resources/java-sdk-version/version.properties @@ -1 +1 @@ -versionNumber='2.1.0' +versionNumber='2.2.0' From 1205d6e2d7f403787ae91b5fe2d181faceefde6c Mon Sep 17 00:00:00 2001 From: Kevin van Zonneveld Date: Mon, 27 Oct 2025 17:07:47 +0100 Subject: [PATCH 3/4] Document 2.2.0 release --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2cf217c..cedb4e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +### 2.2.0 / 2025-10-27 + +- Prevent the SSE client from reconnecting after `assembly_finished`, eliminating spurious `assembly_error` callbacks and timeouts. +- Add regression coverage with a targeted unit test and a live SSE integration test executed via the Docker harness. +- Confirm the new tests run in CI alongside the existing Gradle `check` workflow. + ### 2.1.0 / 2025-10-15 - Added support for external signature generation via `SignatureProvider` interface ([#19](https://github.com/transloadit/android-sdk/issues/19)) From 76d4ddd51b751e17a90e3723100401c358da01d4 Mon Sep 17 00:00:00 2001 From: Kevin van Zonneveld Date: Mon, 27 Oct 2025 17:17:34 +0100 Subject: [PATCH 4/4] Make RequestTest expect dynamic version header --- src/test/java/com/transloadit/sdk/RequestTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/test/java/com/transloadit/sdk/RequestTest.java b/src/test/java/com/transloadit/sdk/RequestTest.java index 5d39d8a..3a50e0b 100644 --- a/src/test/java/com/transloadit/sdk/RequestTest.java +++ b/src/test/java/com/transloadit/sdk/RequestTest.java @@ -130,10 +130,11 @@ private String extractMultipartField(String body, String fieldName) { public void get() throws Exception { request.get("/foo"); + String expectedClientHeader = transloadit.getVersionInfo(); mockServerClient.verify(HttpRequest.request() .withPath("/foo") .withMethod("GET") - .withHeader("Transloadit-Client", "java-sdk:2.1.0")); + .withHeader("Transloadit-Client", expectedClientHeader)); }