diff --git a/CHANGELOG.md b/CHANGELOG.md index cedb4e6..12b2e27 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +### 2.2.1 / 2025-10-27 + +- Ensure the SSE client drains pending events before shutdown while still tolerating transient network faults prior to completion. +- Prevent duplicate `assembly_finished` callbacks by only stopping once and leaving reconnect handling to the client until completion. + ### 2.2.0 / 2025-10-27 - Prevent the SSE client from reconnecting after `assembly_finished`, eliminating spurious `assembly_error` callbacks and timeouts. diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 46b051a..a35f3e1 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -40,8 +40,9 @@ High-level checklist for maintainers: 1. Bump the version in `src/main/resources/java-sdk-version/version.properties` and update `CHANGELOG.md`. 2. Merge the release branch into `main`. -3. Create a git tag for `main` that matches the new version -4. Publish a GitHub release (include the changelog). This triggers the release workflow. +3. Create a git tag for `main` that matches the new versions +4. Publish a GitHub release (include the changelog). This triggers the release workflow. (via the GitHub UI, `gh release creates v1.0.1 --title "v1.0.1" --notes-file <(cat CHANGELOG.md section)`) 5. Wait for Sonatype to sync the artifact (this can take a few hours). The required signing keys and credentials are stored as GitHub secrets. If you need access or spot an issue with the release automation, please reach out to the Transloadit team via the issue tracker or support channels. + diff --git a/src/main/java/com/transloadit/sdk/EventsourceRunnable.java b/src/main/java/com/transloadit/sdk/EventsourceRunnable.java index c8fad74..426a638 100644 --- a/src/main/java/com/transloadit/sdk/EventsourceRunnable.java +++ b/src/main/java/com/transloadit/sdk/EventsourceRunnable.java @@ -6,6 +6,7 @@ import com.launchdarkly.eventsource.EventSource; import com.launchdarkly.eventsource.FaultEvent; import com.launchdarkly.eventsource.MessageEvent; +import com.launchdarkly.eventsource.ReadyState; import com.launchdarkly.eventsource.RetryDelayStrategy; import com.launchdarkly.eventsource.StartedEvent; import com.launchdarkly.eventsource.StreamEvent; @@ -28,6 +29,8 @@ class EventsourceRunnable implements Runnable { protected EventSource eventSource; protected Transloadit transloadit; + protected boolean stopRequested; + protected boolean assemblyFinishedNotified; /** * Constructor for {@link EventsourceRunnable}. It creates a new {@link EventSource} instance, wrapped in a @@ -64,16 +67,21 @@ class EventsourceRunnable implements Runnable { @Override public void run() { this.assemblyFinished = false; + this.stopRequested = false; + this.assemblyFinishedNotified = false; try { eventSource.start(); } catch (StreamException e) { assemblyListener.onError(e); + stopRequested = true; } - while (!assemblyFinished) { + while (!stopRequested) { + boolean processedEvent = false; Iterable events = eventSource.anyEvents(); Iterator eventIterator = events.iterator(); - if (eventIterator.hasNext()) { + while (eventIterator.hasNext()) { + processedEvent = true; StreamEvent streamEvent = eventIterator.next(); if (streamEvent instanceof MessageEvent) { handleMessageEvent((MessageEvent) streamEvent); @@ -85,7 +93,24 @@ public void run() { handleFaultEvent((FaultEvent) streamEvent); } } + + if (!processedEvent) { + ReadyState state = eventSource.getState(); + if (state == ReadyState.CLOSED || state == ReadyState.SHUTDOWN) { + stopRequested = true; + } + if (!stopRequested) { + try { + Thread.sleep(25); + } catch (InterruptedException interruptedException) { + Thread.currentThread().interrupt(); + stopRequested = true; + } + } + } } + + shutdownEventSource(); } /** @@ -101,22 +126,22 @@ protected void handleMessageEvent(MessageEvent messageEvent) { String eventName = messageEvent.getEventName(); String data = messageEvent.getData(); - if (assemblyFinished) { - shutdownEventSource(); - return; - } - // Check if the event is a message event without if (eventName.equals("message")) { switch (data) { case "assembly_finished": assemblyFinished = true; - try { - assemblyListener.onAssemblyFinished(transloadit.getAssemblyByUrl(response.getSslUrl())); - } catch (RequestException | LocalOperationException e) { - assemblyListener.onError(e); - } finally { - shutdownEventSource(); + stopRequested = true; + if (!assemblyFinishedNotified) { + assemblyFinishedNotified = true; + try { + assemblyListener.onAssemblyFinished(transloadit.getAssemblyByUrl(response.getSslUrl())); + } catch (RequestException | LocalOperationException e) { + assemblyListener.onError(e); + } + } + if (eventSource != null) { + eventSource.stop(); } break; case "assembly_upload_meta_data_extracted": @@ -146,10 +171,10 @@ protected void handleMessageEvent(MessageEvent messageEvent) { case "assembly_error": if (assemblyFinished) { - shutdownEventSource(); break; } assemblyListener.onError(new RequestException(data)); + stopRequested = true; shutdownEventSource(); break; @@ -178,6 +203,7 @@ protected void handleStartedEvent(StartedEvent startedEvent) { protected void handleFaultEvent(FaultEvent faultEvent) { if (assemblyFinished) { + stopRequested = true; shutdownEventSource(); } // Debug output, uncomment if needed diff --git a/src/main/resources/java-sdk-version/version.properties b/src/main/resources/java-sdk-version/version.properties index 51be057..618061c 100644 --- a/src/main/resources/java-sdk-version/version.properties +++ b/src/main/resources/java-sdk-version/version.properties @@ -1 +1 @@ -versionNumber='2.2.0' +versionNumber='2.2.1' diff --git a/src/test/java/com/transloadit/sdk/integration/AssemblySseIntegrationTest.java b/src/test/java/com/transloadit/sdk/integration/AssemblySseIntegrationTest.java index de860ca..7ef8b39 100644 --- a/src/test/java/com/transloadit/sdk/integration/AssemblySseIntegrationTest.java +++ b/src/test/java/com/transloadit/sdk/integration/AssemblySseIntegrationTest.java @@ -19,6 +19,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; @@ -51,6 +52,8 @@ void sseStreamShouldCloseWithoutErrorsAfterAssemblyFinished() throws Exception { AtomicReference finishedResponse = new AtomicReference<>(); CompletableFuture finishedFuture = new CompletableFuture<>(); CompletableFuture errorFuture = new CompletableFuture<>(); + CountDownLatch resultLatch = new CountDownLatch(1); + AtomicReference resultPayload = new AtomicReference<>(); AssemblyListener listener = new AssemblyListener() { @Override @@ -97,6 +100,14 @@ public void onAssemblyProgress(JSONObject progress) { @Override public void onAssemblyResultFinished(JSONArray result) { + System.out.println("[AssemblySseIntegrationTest] assembly_result_finished payload=" + result); + if (result != null && result.length() >= 2) { + String stepName = result.optString(0, null); + if ("resize".equals(stepName)) { + resultPayload.compareAndSet(null, cloneJsonArray(result)); + resultLatch.countDown(); + } + } } }; @@ -120,6 +131,12 @@ public void onAssemblyResultFinished(JSONArray result) { assertTrue(completed.isFinished(), "Assembly should be finished"); assertEquals("ASSEMBLY_COMPLETED", completed.json().optString("ok")); + boolean resultSeen = resultLatch.await(2, TimeUnit.MINUTES); + assertTrue(resultSeen, "Timed out waiting for assembly_result_finished event"); + JSONArray resizePayload = resultPayload.get(); + assertNotNull(resizePayload, "Resize SSE payload missing"); + assertEquals("resize", resizePayload.optString(0)); + try { Exception unexpected = errorFuture.get(30, TimeUnit.SECONDS); fail("Unexpected SSE error after completion: " + unexpected); @@ -134,6 +151,10 @@ public void onAssemblyResultFinished(JSONArray result) { } } + private static JSONArray cloneJsonArray(JSONArray array) { + return array == null ? null : new JSONArray(array.toString()); + } + private static Path createTempUpload() throws IOException { Path file = Files.createTempFile("transloadit-sse-test", ".jpg"); URL source = new URL("https://demos.transloadit.com/inputs/chameleon.jpg");