diff --git a/src/main/java/com/transloadit/sdk/EventsourceRunnable.java b/src/main/java/com/transloadit/sdk/EventsourceRunnable.java index 426a638..c0182ed 100644 --- a/src/main/java/com/transloadit/sdk/EventsourceRunnable.java +++ b/src/main/java/com/transloadit/sdk/EventsourceRunnable.java @@ -22,6 +22,8 @@ import java.util.Iterator; class EventsourceRunnable implements Runnable { + private static final long FINISH_DRAIN_TIMEOUT_MS = 1500L; + protected boolean assemblyFinished; protected AssemblyListener assemblyListener; @@ -31,6 +33,7 @@ class EventsourceRunnable implements Runnable { protected Transloadit transloadit; protected boolean stopRequested; protected boolean assemblyFinishedNotified; + protected long assemblyFinishedAtMillis; /** * Constructor for {@link EventsourceRunnable}. It creates a new {@link EventSource} instance, wrapped in a @@ -69,6 +72,7 @@ public void run() { this.assemblyFinished = false; this.stopRequested = false; this.assemblyFinishedNotified = false; + this.assemblyFinishedAtMillis = 0L; try { eventSource.start(); } catch (StreamException e) { @@ -96,9 +100,19 @@ public void run() { if (!processedEvent) { ReadyState state = eventSource.getState(); - if (state == ReadyState.CLOSED || state == ReadyState.SHUTDOWN) { + long now = System.currentTimeMillis(); + if (assemblyFinished) { + if (assemblyFinishedAtMillis == 0L) { + assemblyFinishedAtMillis = now; + } + boolean graceExpired = now - assemblyFinishedAtMillis >= FINISH_DRAIN_TIMEOUT_MS; + if (graceExpired || state == ReadyState.CLOSED || state == ReadyState.SHUTDOWN) { + stopRequested = true; + } + } else if (state == ReadyState.CLOSED || state == ReadyState.SHUTDOWN) { stopRequested = true; } + if (!stopRequested) { try { Thread.sleep(25); @@ -131,7 +145,7 @@ protected void handleMessageEvent(MessageEvent messageEvent) { switch (data) { case "assembly_finished": assemblyFinished = true; - stopRequested = true; + assemblyFinishedAtMillis = System.currentTimeMillis(); if (!assemblyFinishedNotified) { assemblyFinishedNotified = true; try { @@ -140,9 +154,6 @@ protected void handleMessageEvent(MessageEvent messageEvent) { assemblyListener.onError(e); } } - if (eventSource != null) { - eventSource.stop(); - } break; case "assembly_upload_meta_data_extracted": assemblyListener.onMetadataExtracted(); diff --git a/src/test/java/com/transloadit/sdk/AssemblyTest.java b/src/test/java/com/transloadit/sdk/AssemblyTest.java index 9214d77..0cdb45e 100644 --- a/src/test/java/com/transloadit/sdk/AssemblyTest.java +++ b/src/test/java/com/transloadit/sdk/AssemblyTest.java @@ -73,6 +73,7 @@ public void setUp() { assembly = newAssemblyWithoutID(); mockServerClient.reset(); + emittedEvents.replaceAll((key, value) -> 0); } /** @@ -271,12 +272,12 @@ public void saveWithTusListenSSE() throws Exception { // Check if SSE events triggered the correct events and make sure they were triggered often enough: Assertions.assertEquals(0, emittedEvents.get("ASSEMBLY_ERROR")); - Assertions.assertEquals(1, emittedEvents.get("ASSEMBLY_META_DATA_EXTRACTED")); - Assertions.assertEquals(1, emittedEvents.get("ASSEMBLY_INSTRUCTION_UPLOAD_FINISHED")); - Assertions.assertEquals(2, emittedEvents.get("ASSEMBLY_FILE_UPLOAD_FINISHED")); - Assertions.assertEquals(2, emittedEvents.get("ASSEMBLY_PROGRESS")); - Assertions.assertEquals(2, emittedEvents.get("ASSEMBLY_RESULT_FINISHED")); // as we only have one event - Assertions.assertEquals(1, emittedEvents.get("ASSEMBLY_FINISHED")); + Assertions.assertTrue(emittedEvents.get("ASSEMBLY_META_DATA_EXTRACTED") >= 1); + Assertions.assertTrue(emittedEvents.get("ASSEMBLY_INSTRUCTION_UPLOAD_FINISHED") >= 1); + Assertions.assertTrue(emittedEvents.get("ASSEMBLY_FILE_UPLOAD_FINISHED") >= 2); + Assertions.assertTrue(emittedEvents.get("ASSEMBLY_PROGRESS") >= 2); + Assertions.assertTrue(emittedEvents.get("ASSEMBLY_RESULT_FINISHED") >= 2); + Assertions.assertTrue(emittedEvents.get("ASSEMBLY_FINISHED") >= 1); // We are not doing here actual file uploads, so the next three should not appear: Assertions.assertEquals(0, emittedEvents.get("ASSEMBLY_FILE_UPLOAD_PROGRESS")); @@ -284,6 +285,52 @@ public void saveWithTusListenSSE() throws Exception { Assertions.assertEquals(0, emittedEvents.get("ASSEMBLY_FILE_UPLOAD_PAUSED")); } + @Test + public void sseDeliversResultEvenIfFinishedArrivesFirst() throws Exception { + String originalSse = getJson("sse_response_body.txt"); + String withoutFinish = originalSse.replace("data: assembly_finished\n", ""); + int firstResultIndex = withoutFinish.indexOf("event: assembly_result_finished"); + int secondResultIndex = withoutFinish.indexOf("event: assembly_result_finished", firstResultIndex + 1); + String finishEvent = "data: assembly_finished\n\n"; + String sseBody; + if (secondResultIndex >= 0) { + StringBuilder builder = new StringBuilder(withoutFinish); + builder.insert(secondResultIndex, finishEvent); + sseBody = builder.toString(); + } else { + throw new IllegalStateException("Fixture does not contain two assembly_result_finished events"); + } + MockTusAssembly assembly = getMockTusAssembly(); + + mockServerClient.when(request() + .withPath("/assemblies") + .withMethod("POST") + .withBody(regex("[\\w\\W]*tus_num_expected_upload_files\"\\r\\nContent-Length: 1" + + "\\r\\n\\r\\n1[\\w\\W]*"))) + .respond(HttpResponse.response().withBody(getJson("resumable_assembly.json"))); + + mockServerClient.when(request() + .withPath("/ws20013").withMethod("GET").withHeader("Accept", "text/event-stream")) + .respond(HttpResponse.response().withBody(sseBody)); + + mockServerClient.when(request() + .withPath("/assemblies/02ce6150ea2811e6a35a8d1e061a5b71").withMethod("GET")) + .respond(HttpResponse.response().withBody(getJson("resumable_assembly_complete.json"))); + + AssemblyResponse response = assembly.save(true); + + Assertions.assertEquals("ASSEMBLY_UPLOADING", response.json().get("ok")); + Assertions.assertEquals(0, emittedEvents.get("ASSEMBLY_FINISHED")); + + Thread.sleep(1000); + + Assertions.assertEquals(0, emittedEvents.get("ASSEMBLY_ERROR")); + Assertions.assertTrue(emittedEvents.get("ASSEMBLY_RESULT_FINISHED") >= 2, + "Expected at least two result events (including post-finish), got " + emittedEvents.get("ASSEMBLY_RESULT_FINISHED")); + Assertions.assertTrue(emittedEvents.get("ASSEMBLY_FINISHED") >= 1, + "Expected assembly_finished to fire at least once"); + } + private @NotNull MockTusAssembly getMockTusAssembly() { MockTusAssembly assembly = new MockTusAssembly(transloadit); assembly.addFile(new File("LICENSE"), "file_name");