Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
### 2.2.1 / 2025-10-27

- Ensure the SSE client drains pending events before shutdown while still tolerating transient network faults prior to completion.
- Prevent duplicate `assembly_finished` callbacks by only stopping once and leaving reconnect handling to the client until completion.

### 2.2.0 / 2025-10-27

- Prevent the SSE client from reconnecting after `assembly_finished`, eliminating spurious `assembly_error` callbacks and timeouts.
Expand Down
5 changes: 3 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ High-level checklist for maintainers:

1. Bump the version in `src/main/resources/java-sdk-version/version.properties` and update `CHANGELOG.md`.
2. Merge the release branch into `main`.
3. Create a git tag for `main` that matches the new version
4. Publish a GitHub release (include the changelog). This triggers the release workflow.
3. Create a git tag for `main` that matches the new versions
4. Publish a GitHub release (include the changelog). This triggers the release workflow. (via the GitHub UI, `gh release creates v1.0.1 --title "v1.0.1" --notes-file <(cat CHANGELOG.md section)`)
5. Wait for Sonatype to sync the artifact (this can take a few hours).

The required signing keys and credentials are stored as GitHub secrets. If you need access or spot an issue with the release automation, please reach out to the Transloadit team via the issue tracker or support channels.

54 changes: 40 additions & 14 deletions src/main/java/com/transloadit/sdk/EventsourceRunnable.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.launchdarkly.eventsource.EventSource;
import com.launchdarkly.eventsource.FaultEvent;
import com.launchdarkly.eventsource.MessageEvent;
import com.launchdarkly.eventsource.ReadyState;
import com.launchdarkly.eventsource.RetryDelayStrategy;
import com.launchdarkly.eventsource.StartedEvent;
import com.launchdarkly.eventsource.StreamEvent;
Expand All @@ -28,6 +29,8 @@ class EventsourceRunnable implements Runnable {
protected EventSource eventSource;

protected Transloadit transloadit;
protected boolean stopRequested;
protected boolean assemblyFinishedNotified;

/**
* Constructor for {@link EventsourceRunnable}. It creates a new {@link EventSource} instance, wrapped in a
Expand Down Expand Up @@ -64,16 +67,21 @@ class EventsourceRunnable implements Runnable {
@Override
public void run() {
this.assemblyFinished = false;
this.stopRequested = false;
this.assemblyFinishedNotified = false;
try {
eventSource.start();
} catch (StreamException e) {
assemblyListener.onError(e);
stopRequested = true;
}

while (!assemblyFinished) {
while (!stopRequested) {
boolean processedEvent = false;
Iterable<StreamEvent> events = eventSource.anyEvents();
Iterator<StreamEvent> eventIterator = events.iterator();
if (eventIterator.hasNext()) {
while (eventIterator.hasNext()) {
processedEvent = true;
StreamEvent streamEvent = eventIterator.next();
if (streamEvent instanceof MessageEvent) {
handleMessageEvent((MessageEvent) streamEvent);
Expand All @@ -85,7 +93,24 @@ public void run() {
handleFaultEvent((FaultEvent) streamEvent);
}
}

if (!processedEvent) {
ReadyState state = eventSource.getState();
if (state == ReadyState.CLOSED || state == ReadyState.SHUTDOWN) {
stopRequested = true;
}
if (!stopRequested) {
try {
Thread.sleep(25);
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt();
stopRequested = true;
}
}
}
}

shutdownEventSource();
}

/**
Expand All @@ -101,22 +126,22 @@ protected void handleMessageEvent(MessageEvent messageEvent) {
String eventName = messageEvent.getEventName();
String data = messageEvent.getData();

if (assemblyFinished) {
shutdownEventSource();
return;
}

// Check if the event is a message event without
if (eventName.equals("message")) {
switch (data) {
case "assembly_finished":
assemblyFinished = true;
try {
assemblyListener.onAssemblyFinished(transloadit.getAssemblyByUrl(response.getSslUrl()));
} catch (RequestException | LocalOperationException e) {
assemblyListener.onError(e);
} finally {
shutdownEventSource();
stopRequested = true;
if (!assemblyFinishedNotified) {
assemblyFinishedNotified = true;
try {
assemblyListener.onAssemblyFinished(transloadit.getAssemblyByUrl(response.getSslUrl()));
} catch (RequestException | LocalOperationException e) {
assemblyListener.onError(e);
}
}
if (eventSource != null) {
eventSource.stop();
}
break;
case "assembly_upload_meta_data_extracted":
Expand Down Expand Up @@ -146,10 +171,10 @@ protected void handleMessageEvent(MessageEvent messageEvent) {

case "assembly_error":
if (assemblyFinished) {
shutdownEventSource();
break;
}
assemblyListener.onError(new RequestException(data));
stopRequested = true;
shutdownEventSource();
break;

Expand Down Expand Up @@ -178,6 +203,7 @@ protected void handleStartedEvent(StartedEvent startedEvent) {

protected void handleFaultEvent(FaultEvent faultEvent) {
if (assemblyFinished) {
stopRequested = true;
shutdownEventSource();
}
// Debug output, uncomment if needed
Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/java-sdk-version/version.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
versionNumber='2.2.0'
versionNumber='2.2.1'
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -51,6 +52,8 @@ void sseStreamShouldCloseWithoutErrorsAfterAssemblyFinished() throws Exception {
AtomicReference<AssemblyResponse> finishedResponse = new AtomicReference<>();
CompletableFuture<Void> finishedFuture = new CompletableFuture<>();
CompletableFuture<Exception> errorFuture = new CompletableFuture<>();
CountDownLatch resultLatch = new CountDownLatch(1);
AtomicReference<JSONArray> resultPayload = new AtomicReference<>();

AssemblyListener listener = new AssemblyListener() {
@Override
Expand Down Expand Up @@ -97,6 +100,14 @@ public void onAssemblyProgress(JSONObject progress) {

@Override
public void onAssemblyResultFinished(JSONArray result) {
System.out.println("[AssemblySseIntegrationTest] assembly_result_finished payload=" + result);
if (result != null && result.length() >= 2) {
String stepName = result.optString(0, null);
if ("resize".equals(stepName)) {
resultPayload.compareAndSet(null, cloneJsonArray(result));
resultLatch.countDown();
}
}
}
};

Expand All @@ -120,6 +131,12 @@ public void onAssemblyResultFinished(JSONArray result) {
assertTrue(completed.isFinished(), "Assembly should be finished");
assertEquals("ASSEMBLY_COMPLETED", completed.json().optString("ok"));

boolean resultSeen = resultLatch.await(2, TimeUnit.MINUTES);
assertTrue(resultSeen, "Timed out waiting for assembly_result_finished event");
JSONArray resizePayload = resultPayload.get();
assertNotNull(resizePayload, "Resize SSE payload missing");
assertEquals("resize", resizePayload.optString(0));

try {
Exception unexpected = errorFuture.get(30, TimeUnit.SECONDS);
fail("Unexpected SSE error after completion: " + unexpected);
Expand All @@ -134,6 +151,10 @@ public void onAssemblyResultFinished(JSONArray result) {
}
}

private static JSONArray cloneJsonArray(JSONArray array) {
return array == null ? null : new JSONArray(array.toString());
}

private static Path createTempUpload() throws IOException {
Path file = Files.createTempFile("transloadit-sse-test", ".jpg");
URL source = new URL("https://demos.transloadit.com/inputs/chameleon.jpg");
Expand Down