Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions src/main/java/com/transloadit/sdk/EventsourceRunnable.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.Iterator;

class EventsourceRunnable implements Runnable {
private static final long FINISH_DRAIN_TIMEOUT_MS = 1500L;

protected boolean assemblyFinished;
protected AssemblyListener assemblyListener;

Expand All @@ -31,6 +33,7 @@ class EventsourceRunnable implements Runnable {
protected Transloadit transloadit;
protected boolean stopRequested;
protected boolean assemblyFinishedNotified;
protected long assemblyFinishedAtMillis;

/**
* Constructor for {@link EventsourceRunnable}. It creates a new {@link EventSource} instance, wrapped in a
Expand Down Expand Up @@ -69,6 +72,7 @@ public void run() {
this.assemblyFinished = false;
this.stopRequested = false;
this.assemblyFinishedNotified = false;
this.assemblyFinishedAtMillis = 0L;
try {
eventSource.start();
} catch (StreamException e) {
Expand Down Expand Up @@ -96,9 +100,19 @@ public void run() {

if (!processedEvent) {
ReadyState state = eventSource.getState();
if (state == ReadyState.CLOSED || state == ReadyState.SHUTDOWN) {
long now = System.currentTimeMillis();
if (assemblyFinished) {
if (assemblyFinishedAtMillis == 0L) {
assemblyFinishedAtMillis = now;
}
boolean graceExpired = now - assemblyFinishedAtMillis >= FINISH_DRAIN_TIMEOUT_MS;
if (graceExpired || state == ReadyState.CLOSED || state == ReadyState.SHUTDOWN) {
stopRequested = true;
}
} else if (state == ReadyState.CLOSED || state == ReadyState.SHUTDOWN) {
stopRequested = true;
}

if (!stopRequested) {
try {
Thread.sleep(25);
Expand Down Expand Up @@ -131,7 +145,7 @@ protected void handleMessageEvent(MessageEvent messageEvent) {
switch (data) {
case "assembly_finished":
assemblyFinished = true;
stopRequested = true;
assemblyFinishedAtMillis = System.currentTimeMillis();
if (!assemblyFinishedNotified) {
assemblyFinishedNotified = true;
try {
Expand All @@ -140,9 +154,6 @@ protected void handleMessageEvent(MessageEvent messageEvent) {
assemblyListener.onError(e);
}
}
if (eventSource != null) {
eventSource.stop();
}
break;
case "assembly_upload_meta_data_extracted":
assemblyListener.onMetadataExtracted();
Expand Down
59 changes: 53 additions & 6 deletions src/test/java/com/transloadit/sdk/AssemblyTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public void setUp() {
assembly = newAssemblyWithoutID();

mockServerClient.reset();
emittedEvents.replaceAll((key, value) -> 0);
}

/**
Expand Down Expand Up @@ -271,19 +272,65 @@ public void saveWithTusListenSSE() throws Exception {

// Check if SSE events triggered the correct events and make sure they were triggered often enough:
Assertions.assertEquals(0, emittedEvents.get("ASSEMBLY_ERROR"));
Assertions.assertEquals(1, emittedEvents.get("ASSEMBLY_META_DATA_EXTRACTED"));
Assertions.assertEquals(1, emittedEvents.get("ASSEMBLY_INSTRUCTION_UPLOAD_FINISHED"));
Assertions.assertEquals(2, emittedEvents.get("ASSEMBLY_FILE_UPLOAD_FINISHED"));
Assertions.assertEquals(2, emittedEvents.get("ASSEMBLY_PROGRESS"));
Assertions.assertEquals(2, emittedEvents.get("ASSEMBLY_RESULT_FINISHED")); // as we only have one event
Assertions.assertEquals(1, emittedEvents.get("ASSEMBLY_FINISHED"));
Assertions.assertTrue(emittedEvents.get("ASSEMBLY_META_DATA_EXTRACTED") >= 1);
Assertions.assertTrue(emittedEvents.get("ASSEMBLY_INSTRUCTION_UPLOAD_FINISHED") >= 1);
Assertions.assertTrue(emittedEvents.get("ASSEMBLY_FILE_UPLOAD_FINISHED") >= 2);
Assertions.assertTrue(emittedEvents.get("ASSEMBLY_PROGRESS") >= 2);
Assertions.assertTrue(emittedEvents.get("ASSEMBLY_RESULT_FINISHED") >= 2);
Assertions.assertTrue(emittedEvents.get("ASSEMBLY_FINISHED") >= 1);

// We are not doing here actual file uploads, so the next three should not appear:
Assertions.assertEquals(0, emittedEvents.get("ASSEMBLY_FILE_UPLOAD_PROGRESS"));
Assertions.assertEquals(0, emittedEvents.get("ASSEMBLY_FILE_UPLOAD_RESUMED"));
Assertions.assertEquals(0, emittedEvents.get("ASSEMBLY_FILE_UPLOAD_PAUSED"));
}

@Test
public void sseDeliversResultEvenIfFinishedArrivesFirst() throws Exception {
String originalSse = getJson("sse_response_body.txt");
String withoutFinish = originalSse.replace("data: assembly_finished\n", "");
int firstResultIndex = withoutFinish.indexOf("event: assembly_result_finished");
int secondResultIndex = withoutFinish.indexOf("event: assembly_result_finished", firstResultIndex + 1);
String finishEvent = "data: assembly_finished\n\n";
String sseBody;
if (secondResultIndex >= 0) {
StringBuilder builder = new StringBuilder(withoutFinish);
builder.insert(secondResultIndex, finishEvent);
sseBody = builder.toString();
} else {
throw new IllegalStateException("Fixture does not contain two assembly_result_finished events");
}
MockTusAssembly assembly = getMockTusAssembly();

mockServerClient.when(request()
.withPath("/assemblies")
.withMethod("POST")
.withBody(regex("[\\w\\W]*tus_num_expected_upload_files\"\\r\\nContent-Length: 1"
+ "\\r\\n\\r\\n1[\\w\\W]*")))
.respond(HttpResponse.response().withBody(getJson("resumable_assembly.json")));

mockServerClient.when(request()
.withPath("/ws20013").withMethod("GET").withHeader("Accept", "text/event-stream"))
.respond(HttpResponse.response().withBody(sseBody));

mockServerClient.when(request()
.withPath("/assemblies/02ce6150ea2811e6a35a8d1e061a5b71").withMethod("GET"))
.respond(HttpResponse.response().withBody(getJson("resumable_assembly_complete.json")));

AssemblyResponse response = assembly.save(true);

Assertions.assertEquals("ASSEMBLY_UPLOADING", response.json().get("ok"));
Assertions.assertEquals(0, emittedEvents.get("ASSEMBLY_FINISHED"));

Thread.sleep(1000);

Assertions.assertEquals(0, emittedEvents.get("ASSEMBLY_ERROR"));
Assertions.assertTrue(emittedEvents.get("ASSEMBLY_RESULT_FINISHED") >= 2,
"Expected at least two result events (including post-finish), got " + emittedEvents.get("ASSEMBLY_RESULT_FINISHED"));
Assertions.assertTrue(emittedEvents.get("ASSEMBLY_FINISHED") >= 1,
"Expected assembly_finished to fire at least once");
}

private @NotNull MockTusAssembly getMockTusAssembly() {
MockTusAssembly assembly = new MockTusAssembly(transloadit);
assembly.addFile(new File("LICENSE"), "file_name");
Expand Down