diff --git a/resteasy-client/src/main/java/org/jboss/resteasy/plugins/providers/sse/client/SseEventSourceImpl.java b/resteasy-client/src/main/java/org/jboss/resteasy/plugins/providers/sse/client/SseEventSourceImpl.java index e558b12d901..13ff965e98d 100644 --- a/resteasy-client/src/main/java/org/jboss/resteasy/plugins/providers/sse/client/SseEventSourceImpl.java +++ b/resteasy-client/src/main/java/org/jboss/resteasy/plugins/providers/sse/client/SseEventSourceImpl.java @@ -7,6 +7,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -50,6 +51,7 @@ private enum State { private final List> onErrorConsumers = new CopyOnWriteArrayList<>(); private final List onCompleteConsumers = new CopyOnWriteArrayList<>(); + private final AtomicBoolean completeListenersInvoked = new AtomicBoolean(false); private final boolean alwaysReconnect; @@ -221,6 +223,12 @@ public boolean close(final long timeout, final TimeUnit unit) { } } + private void runCompleteConsumers() { + if (completeListenersInvoked.compareAndSet(false, true)) { + onCompleteConsumers.forEach(Runnable::run); + } + } + private void internalClose() { if (state.getAndSet(State.CLOSED) == State.CLOSED) { return; @@ -236,7 +244,7 @@ private void internalClose() { } } sseEventSourceScheduler.shutdownNow(); - onCompleteConsumers.forEach(Runnable::run); + runCompleteConsumers(); } private class EventHandler implements Runnable { @@ -289,6 +297,11 @@ public void run() { response = clientResponse; if (Family.SUCCESSFUL.equals(clientResponse.getStatusInfo().getFamily())) { onConnection(); + if (clientResponse.getStatus() == 204) { + // On a 204, only the onComplete() should be invoked + runCompleteConsumers(); + return; + } eventInput = clientResponse.readEntity(SseEventInputImpl.class); //if 200<= response code <300 and response contentType is null, fail the connection. if (eventInput == null) { @@ -320,10 +333,9 @@ public void run() { onUnrecoverableError(e); return; } - final Providers providers = (ClientConfiguration) target.getConfiguration(); while (!Thread.currentThread().isInterrupted() && state.get() == State.OPEN) { - if (eventInput != null && eventInput.isClosed()) { + if (eventInput == null || eventInput.isClosed()) { if (alwaysReconnect) { reconnect(reconnectDelay); } else { diff --git a/resteasy-core-spi/src/main/java/org/jboss/resteasy/spi/config/Options.java b/resteasy-core-spi/src/main/java/org/jboss/resteasy/spi/config/Options.java index eb3cd038516..a95fa438faf 100644 --- a/resteasy-core-spi/src/main/java/org/jboss/resteasy/spi/config/Options.java +++ b/resteasy-core-spi/src/main/java/org/jboss/resteasy/spi/config/Options.java @@ -23,6 +23,8 @@ import java.util.Optional; import java.util.function.Supplier; +import jakarta.ws.rs.sse.SseEventSink; + import org.jboss.resteasy.resteasy_jaxrs.i18n.LogMessages; import org.jboss.resteasy.spi.util.Functions; @@ -63,6 +65,18 @@ public class Options { Threshold.class, Functions.singleton(() -> Threshold.of(50L, SizeUnit.MEGABYTE))); + /** + * An option which allows which HTTP status code should be sent when the {@link SseEventSink#close()} is invoked. + * In some implementations 200 (OK) is the default. However, RESTEasy prefers 204 (No Content) as no content has + * been sent the response. + *

+ * The default is 204 - No Content + *

+ */ + public static final Options SSE_CLOSED_RESPONSE_CODE = new Options<>("dev.resteasy.sse.closed.response.code", + Integer.class, + Functions.singleton(() -> 204)); + private final String key; private final Class name; private final Supplier dftValue; diff --git a/resteasy-core/src/main/java/org/jboss/resteasy/plugins/providers/sse/SseEventOutputImpl.java b/resteasy-core/src/main/java/org/jboss/resteasy/plugins/providers/sse/SseEventOutputImpl.java index 3377fe75cb3..bc78a589cbb 100644 --- a/resteasy-core/src/main/java/org/jboss/resteasy/plugins/providers/sse/SseEventOutputImpl.java +++ b/resteasy-core/src/main/java/org/jboss/resteasy/plugins/providers/sse/SseEventOutputImpl.java @@ -40,9 +40,11 @@ import org.jboss.resteasy.spi.AsyncOutputStream; import org.jboss.resteasy.spi.HttpRequest; import org.jboss.resteasy.spi.HttpResponse; +import org.jboss.resteasy.spi.HttpResponseCodes; import org.jboss.resteasy.spi.ResteasyAsynchronousContext; import org.jboss.resteasy.spi.ResteasyAsynchronousResponse; import org.jboss.resteasy.spi.ResteasyProviderFactory; +import org.jboss.resteasy.spi.config.Options; import org.jboss.resteasy.spi.util.FindAnnotation; public class SseEventOutputImpl extends GenericType implements SseEventSink { @@ -306,43 +308,45 @@ private CompletionStage internalWriteEvent(final OutboundSseEvent event) { private BuiltResponse createResponse() { BuiltResponse jaxrsResponse; + final int responseCode; if (state.get() == CLOSED) { - jaxrsResponse = (BuiltResponse) Response.noContent().build(); + responseCode = Options.SSE_CLOSED_RESPONSE_CODE.getValue(); } else //set back to client 200 OK to implies the SseEventOutput is ready { - ResourceMethodInvoker method = (ResourceMethodInvoker) request.getAttribute(ResourceMethodInvoker.class.getName()); - MediaType[] mediaTypes = method.getProduces(); - if (mediaTypes != null && getSseEventType(mediaTypes) != null) { - // @Produces("text/event-stream") - SseElementType sseElementType = FindAnnotation.findAnnotation(method.getMethodAnnotations(), - SseElementType.class); - if (sseElementType != null) { - // Get element media type from @SseElementType. - Map parameterMap = new HashMap<>(); - parameterMap.put(SseConstants.SSE_ELEMENT_MEDIA_TYPE, sseElementType.value()); - MediaType mediaType = new MediaType(MediaType.SERVER_SENT_EVENTS_TYPE.getType(), - MediaType.SERVER_SENT_EVENTS_TYPE.getSubtype(), parameterMap); - jaxrsResponse = (BuiltResponse) Response.ok().type(mediaType).build(); - } else { - // No element media type declared. - jaxrsResponse = (BuiltResponse) Response.ok().type(getSseEventType(mediaTypes)).build(); - // // use "element-type=text/plain"? - } + responseCode = HttpResponseCodes.SC_OK; + } + ResourceMethodInvoker method = (ResourceMethodInvoker) request.getAttribute(ResourceMethodInvoker.class.getName()); + MediaType[] mediaTypes = method.getProduces(); + if (mediaTypes != null && getSseEventType(mediaTypes) != null) { + // @Produces("text/event-stream") + SseElementType sseElementType = FindAnnotation.findAnnotation(method.getMethodAnnotations(), + SseElementType.class); + if (sseElementType != null) { + // Get element media type from @SseElementType. + Map parameterMap = new HashMap<>(); + parameterMap.put(SseConstants.SSE_ELEMENT_MEDIA_TYPE, sseElementType.value()); + MediaType mediaType = new MediaType(MediaType.SERVER_SENT_EVENTS_TYPE.getType(), + MediaType.SERVER_SENT_EVENTS_TYPE.getSubtype(), parameterMap); + jaxrsResponse = (BuiltResponse) Response.status(responseCode).type(mediaType).build(); } else { - Stream stream = FindAnnotation.findAnnotation(method.getMethodAnnotations(), Stream.class); - if (stream != null) { - // Get element media type from @Produces. - jaxrsResponse = (BuiltResponse) Response.ok("").build(); - MediaType elementType = ServerResponseWriter.getResponseMediaType(jaxrsResponse, request, response, - providerFactory, method); - Map parameterMap = new HashMap<>(); - parameterMap.put(SseConstants.SSE_ELEMENT_MEDIA_TYPE, elementType.toString()); - String[] streamType = getStreamType(method); - MediaType mediaType = new MediaType(streamType[0], streamType[1], parameterMap); - jaxrsResponse = (BuiltResponse) Response.ok().type(mediaType).build(); - } else { - throw new RuntimeException(Messages.MESSAGES.expectedStreamOrSseMediaType()); - } + // No element media type declared. + jaxrsResponse = (BuiltResponse) Response.status(responseCode).type(getSseEventType(mediaTypes)).build(); + // // use "element-type=text/plain"? + } + } else { + Stream stream = FindAnnotation.findAnnotation(method.getMethodAnnotations(), Stream.class); + if (stream != null) { + // Get element media type from @Produces. + jaxrsResponse = (BuiltResponse) Response.ok("").build(); + MediaType elementType = ServerResponseWriter.getResponseMediaType(jaxrsResponse, request, response, + providerFactory, method); + Map parameterMap = new HashMap<>(); + parameterMap.put(SseConstants.SSE_ELEMENT_MEDIA_TYPE, elementType.toString()); + String[] streamType = getStreamType(method); + MediaType mediaType = new MediaType(streamType[0], streamType[1], parameterMap); + jaxrsResponse = (BuiltResponse) Response.status(responseCode).type(mediaType).build(); + } else { + throw new RuntimeException(Messages.MESSAGES.expectedStreamOrSseMediaType()); } } return jaxrsResponse; diff --git a/testsuite/integration-tests/src/test/java/org/jboss/resteasy/test/providers/sse/resource/SseReconnectResource.java b/testsuite/integration-tests/src/test/java/org/jboss/resteasy/test/providers/sse/resource/SseReconnectResource.java index 4df1a3890b0..ff69b5b5c83 100644 --- a/testsuite/integration-tests/src/test/java/org/jboss/resteasy/test/providers/sse/resource/SseReconnectResource.java +++ b/testsuite/integration-tests/src/test/java/org/jboss/resteasy/test/providers/sse/resource/SseReconnectResource.java @@ -8,6 +8,7 @@ import jakarta.ws.rs.HeaderParam; import jakarta.ws.rs.Path; import jakarta.ws.rs.Produces; +import jakarta.ws.rs.ServiceUnavailableException; import jakarta.ws.rs.WebApplicationException; import jakarta.ws.rs.core.Context; import jakarta.ws.rs.core.HttpHeaders; @@ -96,7 +97,8 @@ public void testReconnectDelay(@Context SseEventSink sseEventSink, @Context Sse public void sseLost(@Context SseEventSink sink, @Context Sse sse) { if (tryCount != 0) { tryCount--; - sink.close(); + // Throw a service unavailable, 503, with a 1 second retry. + throw new ServiceUnavailableException(1L); } else { try (SseEventSink s = sink) { s.send(sse.newEvent("MESSAGE"));