From dafcb0649cd3cb49aee152c6db02a2fdf823f7f1 Mon Sep 17 00:00:00 2001 From: Kevin van Zonneveld Date: Mon, 27 Oct 2025 20:02:42 +0100 Subject: [PATCH 1/4] Assert SSE result events in integration test --- .../AssemblySseIntegrationTest.java | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/src/test/java/com/transloadit/sdk/integration/AssemblySseIntegrationTest.java b/src/test/java/com/transloadit/sdk/integration/AssemblySseIntegrationTest.java index de860ca..7ef8b39 100644 --- a/src/test/java/com/transloadit/sdk/integration/AssemblySseIntegrationTest.java +++ b/src/test/java/com/transloadit/sdk/integration/AssemblySseIntegrationTest.java @@ -19,6 +19,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; @@ -51,6 +52,8 @@ void sseStreamShouldCloseWithoutErrorsAfterAssemblyFinished() throws Exception { AtomicReference finishedResponse = new AtomicReference<>(); CompletableFuture finishedFuture = new CompletableFuture<>(); CompletableFuture errorFuture = new CompletableFuture<>(); + CountDownLatch resultLatch = new CountDownLatch(1); + AtomicReference resultPayload = new AtomicReference<>(); AssemblyListener listener = new AssemblyListener() { @Override @@ -97,6 +100,14 @@ public void onAssemblyProgress(JSONObject progress) { @Override public void onAssemblyResultFinished(JSONArray result) { + System.out.println("[AssemblySseIntegrationTest] assembly_result_finished payload=" + result); + if (result != null && result.length() >= 2) { + String stepName = result.optString(0, null); + if ("resize".equals(stepName)) { + resultPayload.compareAndSet(null, cloneJsonArray(result)); + resultLatch.countDown(); + } + } } }; @@ -120,6 +131,12 @@ public void onAssemblyResultFinished(JSONArray result) { assertTrue(completed.isFinished(), "Assembly should be finished"); assertEquals("ASSEMBLY_COMPLETED", completed.json().optString("ok")); + boolean resultSeen = resultLatch.await(2, TimeUnit.MINUTES); + assertTrue(resultSeen, "Timed out waiting for assembly_result_finished event"); + JSONArray resizePayload = resultPayload.get(); + assertNotNull(resizePayload, "Resize SSE payload missing"); + assertEquals("resize", resizePayload.optString(0)); + try { Exception unexpected = errorFuture.get(30, TimeUnit.SECONDS); fail("Unexpected SSE error after completion: " + unexpected); @@ -134,6 +151,10 @@ public void onAssemblyResultFinished(JSONArray result) { } } + private static JSONArray cloneJsonArray(JSONArray array) { + return array == null ? null : new JSONArray(array.toString()); + } + private static Path createTempUpload() throws IOException { Path file = Files.createTempFile("transloadit-sse-test", ".jpg"); URL source = new URL("https://demos.transloadit.com/inputs/chameleon.jpg"); From 68079109ee57cf7b4450050ae46c341e1d823c83 Mon Sep 17 00:00:00 2001 From: Kevin van Zonneveld Date: Mon, 27 Oct 2025 21:37:13 +0100 Subject: [PATCH 2/4] Gracefully drain SSE stream before shutdown --- CONTRIBUTING.md | 5 +- .../transloadit/sdk/EventsourceRunnable.java | 58 +++++++++++++------ 2 files changed, 44 insertions(+), 19 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 46b051a..a35f3e1 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -40,8 +40,9 @@ High-level checklist for maintainers: 1. Bump the version in `src/main/resources/java-sdk-version/version.properties` and update `CHANGELOG.md`. 2. Merge the release branch into `main`. -3. Create a git tag for `main` that matches the new version -4. Publish a GitHub release (include the changelog). This triggers the release workflow. +3. Create a git tag for `main` that matches the new versions +4. Publish a GitHub release (include the changelog). This triggers the release workflow. (via the GitHub UI, `gh release creates v1.0.1 --title "v1.0.1" --notes-file <(cat CHANGELOG.md section)`) 5. Wait for Sonatype to sync the artifact (this can take a few hours). The required signing keys and credentials are stored as GitHub secrets. If you need access or spot an issue with the release automation, please reach out to the Transloadit team via the issue tracker or support channels. + diff --git a/src/main/java/com/transloadit/sdk/EventsourceRunnable.java b/src/main/java/com/transloadit/sdk/EventsourceRunnable.java index c8fad74..cb9fcd7 100644 --- a/src/main/java/com/transloadit/sdk/EventsourceRunnable.java +++ b/src/main/java/com/transloadit/sdk/EventsourceRunnable.java @@ -6,6 +6,7 @@ import com.launchdarkly.eventsource.EventSource; import com.launchdarkly.eventsource.FaultEvent; import com.launchdarkly.eventsource.MessageEvent; +import com.launchdarkly.eventsource.ReadyState; import com.launchdarkly.eventsource.RetryDelayStrategy; import com.launchdarkly.eventsource.StartedEvent; import com.launchdarkly.eventsource.StreamEvent; @@ -28,6 +29,8 @@ class EventsourceRunnable implements Runnable { protected EventSource eventSource; protected Transloadit transloadit; + protected boolean stopRequested; + protected boolean assemblyFinishedNotified; /** * Constructor for {@link EventsourceRunnable}. It creates a new {@link EventSource} instance, wrapped in a @@ -64,16 +67,21 @@ class EventsourceRunnable implements Runnable { @Override public void run() { this.assemblyFinished = false; + this.stopRequested = false; + this.assemblyFinishedNotified = false; try { eventSource.start(); } catch (StreamException e) { assemblyListener.onError(e); + stopRequested = true; } - while (!assemblyFinished) { + while (!stopRequested) { + boolean processedEvent = false; Iterable events = eventSource.anyEvents(); Iterator eventIterator = events.iterator(); - if (eventIterator.hasNext()) { + while (eventIterator.hasNext()) { + processedEvent = true; StreamEvent streamEvent = eventIterator.next(); if (streamEvent instanceof MessageEvent) { handleMessageEvent((MessageEvent) streamEvent); @@ -85,7 +93,24 @@ public void run() { handleFaultEvent((FaultEvent) streamEvent); } } + + if (!processedEvent) { + ReadyState state = eventSource.getState(); + if (state == ReadyState.CLOSED || state == ReadyState.SHUTDOWN) { + stopRequested = true; + } + if (!stopRequested) { + try { + Thread.sleep(25); + } catch (InterruptedException interruptedException) { + Thread.currentThread().interrupt(); + stopRequested = true; + } + } + } } + + shutdownEventSource(); } /** @@ -101,22 +126,22 @@ protected void handleMessageEvent(MessageEvent messageEvent) { String eventName = messageEvent.getEventName(); String data = messageEvent.getData(); - if (assemblyFinished) { - shutdownEventSource(); - return; - } - // Check if the event is a message event without if (eventName.equals("message")) { switch (data) { case "assembly_finished": assemblyFinished = true; - try { - assemblyListener.onAssemblyFinished(transloadit.getAssemblyByUrl(response.getSslUrl())); - } catch (RequestException | LocalOperationException e) { - assemblyListener.onError(e); - } finally { - shutdownEventSource(); + stopRequested = true; + if (!assemblyFinishedNotified) { + assemblyFinishedNotified = true; + try { + assemblyListener.onAssemblyFinished(transloadit.getAssemblyByUrl(response.getSslUrl())); + } catch (RequestException | LocalOperationException e) { + assemblyListener.onError(e); + } + } + if (eventSource != null) { + eventSource.stop(); } break; case "assembly_upload_meta_data_extracted": @@ -146,10 +171,10 @@ protected void handleMessageEvent(MessageEvent messageEvent) { case "assembly_error": if (assemblyFinished) { - shutdownEventSource(); break; } assemblyListener.onError(new RequestException(data)); + stopRequested = true; shutdownEventSource(); break; @@ -177,9 +202,8 @@ protected void handleStartedEvent(StartedEvent startedEvent) { } protected void handleFaultEvent(FaultEvent faultEvent) { - if (assemblyFinished) { - shutdownEventSource(); - } + stopRequested = true; + shutdownEventSource(); // Debug output, uncomment if needed // String data = faultEvent.toString(); // System.out.printf("Fault: %s\n", data); From 0d1f698f46eda385a1019bc8ef1c6388a88064db Mon Sep 17 00:00:00 2001 From: Kevin van Zonneveld Date: Mon, 27 Oct 2025 21:45:09 +0100 Subject: [PATCH 3/4] Only stop on faults once assembly finished --- src/main/java/com/transloadit/sdk/EventsourceRunnable.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/transloadit/sdk/EventsourceRunnable.java b/src/main/java/com/transloadit/sdk/EventsourceRunnable.java index cb9fcd7..426a638 100644 --- a/src/main/java/com/transloadit/sdk/EventsourceRunnable.java +++ b/src/main/java/com/transloadit/sdk/EventsourceRunnable.java @@ -202,8 +202,10 @@ protected void handleStartedEvent(StartedEvent startedEvent) { } protected void handleFaultEvent(FaultEvent faultEvent) { - stopRequested = true; - shutdownEventSource(); + if (assemblyFinished) { + stopRequested = true; + shutdownEventSource(); + } // Debug output, uncomment if needed // String data = faultEvent.toString(); // System.out.printf("Fault: %s\n", data); From 9e050a73d3ded9de45a0d32abf84edb45014823d Mon Sep 17 00:00:00 2001 From: Kevin van Zonneveld Date: Mon, 27 Oct 2025 21:52:52 +0100 Subject: [PATCH 4/4] Bump version to 2.2.1 --- CHANGELOG.md | 5 +++++ src/main/resources/java-sdk-version/version.properties | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cedb4e6..12b2e27 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +### 2.2.1 / 2025-10-27 + +- Ensure the SSE client drains pending events before shutdown while still tolerating transient network faults prior to completion. +- Prevent duplicate `assembly_finished` callbacks by only stopping once and leaving reconnect handling to the client until completion. + ### 2.2.0 / 2025-10-27 - Prevent the SSE client from reconnecting after `assembly_finished`, eliminating spurious `assembly_error` callbacks and timeouts. diff --git a/src/main/resources/java-sdk-version/version.properties b/src/main/resources/java-sdk-version/version.properties index 51be057..618061c 100644 --- a/src/main/resources/java-sdk-version/version.properties +++ b/src/main/resources/java-sdk-version/version.properties @@ -1 +1 @@ -versionNumber='2.2.0' +versionNumber='2.2.1'