Skip to content

Commit

Permalink
Merge pull request #3863 from jamezp/RESTEASY-2985-6.2
Browse files Browse the repository at this point in the history
[RESTEASY-2985] Only run the complete listeners if a 204 response is …
  • Loading branch information
jamezp committed Oct 18, 2023
2 parents 53db9cc + 8963587 commit 931db08
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

Expand Down Expand Up @@ -50,6 +51,7 @@ private enum State {
private final List<Consumer<Throwable>> onErrorConsumers = new CopyOnWriteArrayList<>();

private final List<Runnable> onCompleteConsumers = new CopyOnWriteArrayList<>();
private final AtomicBoolean completeListenersInvoked = new AtomicBoolean(false);

private final boolean alwaysReconnect;

Expand Down Expand Up @@ -221,6 +223,12 @@ public boolean close(final long timeout, final TimeUnit unit) {
}
}

private void runCompleteConsumers() {
if (completeListenersInvoked.compareAndSet(false, true)) {
onCompleteConsumers.forEach(Runnable::run);
}
}

private void internalClose() {
if (state.getAndSet(State.CLOSED) == State.CLOSED) {
return;
Expand All @@ -236,7 +244,7 @@ private void internalClose() {
}
}
sseEventSourceScheduler.shutdownNow();
onCompleteConsumers.forEach(Runnable::run);
runCompleteConsumers();
}

private class EventHandler implements Runnable {
Expand Down Expand Up @@ -289,6 +297,11 @@ public void run() {
response = clientResponse;
if (Family.SUCCESSFUL.equals(clientResponse.getStatusInfo().getFamily())) {
onConnection();
if (clientResponse.getStatus() == 204) {
// On a 204, only the onComplete() should be invoked
runCompleteConsumers();
return;
}
eventInput = clientResponse.readEntity(SseEventInputImpl.class);
//if 200<= response code <300 and response contentType is null, fail the connection.
if (eventInput == null) {
Expand Down Expand Up @@ -320,10 +333,9 @@ public void run() {
onUnrecoverableError(e);
return;
}

final Providers providers = (ClientConfiguration) target.getConfiguration();
while (!Thread.currentThread().isInterrupted() && state.get() == State.OPEN) {
if (eventInput != null && eventInput.isClosed()) {
if (eventInput == null || eventInput.isClosed()) {
if (alwaysReconnect) {
reconnect(reconnectDelay);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.Optional;
import java.util.function.Supplier;

import jakarta.ws.rs.sse.SseEventSink;

import org.jboss.resteasy.resteasy_jaxrs.i18n.LogMessages;
import org.jboss.resteasy.spi.util.Functions;

Expand Down Expand Up @@ -63,6 +65,18 @@ public class Options<T> {
Threshold.class,
Functions.singleton(() -> Threshold.of(50L, SizeUnit.MEGABYTE)));

/**
* An option which allows which HTTP status code should be sent when the {@link SseEventSink#close()} is invoked.
* In some implementations 200 (OK) is the default. However, RESTEasy prefers 204 (No Content) as no content has
* been sent the response.
* <p>
* The default is 204 - No Content
* </p>
*/
public static final Options<Integer> SSE_CLOSED_RESPONSE_CODE = new Options<>("dev.resteasy.sse.closed.response.code",
Integer.class,
Functions.singleton(() -> 204));

private final String key;
private final Class<T> name;
private final Supplier<T> dftValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@
import org.jboss.resteasy.spi.AsyncOutputStream;
import org.jboss.resteasy.spi.HttpRequest;
import org.jboss.resteasy.spi.HttpResponse;
import org.jboss.resteasy.spi.HttpResponseCodes;
import org.jboss.resteasy.spi.ResteasyAsynchronousContext;
import org.jboss.resteasy.spi.ResteasyAsynchronousResponse;
import org.jboss.resteasy.spi.ResteasyProviderFactory;
import org.jboss.resteasy.spi.config.Options;
import org.jboss.resteasy.spi.util.FindAnnotation;

public class SseEventOutputImpl extends GenericType<OutboundSseEvent> implements SseEventSink {
Expand Down Expand Up @@ -306,43 +308,45 @@ private CompletionStage<Void> internalWriteEvent(final OutboundSseEvent event) {

private BuiltResponse createResponse() {
BuiltResponse jaxrsResponse;
final int responseCode;
if (state.get() == CLOSED) {
jaxrsResponse = (BuiltResponse) Response.noContent().build();
responseCode = Options.SSE_CLOSED_RESPONSE_CODE.getValue();
} else //set back to client 200 OK to implies the SseEventOutput is ready
{
ResourceMethodInvoker method = (ResourceMethodInvoker) request.getAttribute(ResourceMethodInvoker.class.getName());
MediaType[] mediaTypes = method.getProduces();
if (mediaTypes != null && getSseEventType(mediaTypes) != null) {
// @Produces("text/event-stream")
SseElementType sseElementType = FindAnnotation.findAnnotation(method.getMethodAnnotations(),
SseElementType.class);
if (sseElementType != null) {
// Get element media type from @SseElementType.
Map<String, String> parameterMap = new HashMap<>();
parameterMap.put(SseConstants.SSE_ELEMENT_MEDIA_TYPE, sseElementType.value());
MediaType mediaType = new MediaType(MediaType.SERVER_SENT_EVENTS_TYPE.getType(),
MediaType.SERVER_SENT_EVENTS_TYPE.getSubtype(), parameterMap);
jaxrsResponse = (BuiltResponse) Response.ok().type(mediaType).build();
} else {
// No element media type declared.
jaxrsResponse = (BuiltResponse) Response.ok().type(getSseEventType(mediaTypes)).build();
// // use "element-type=text/plain"?
}
responseCode = HttpResponseCodes.SC_OK;
}
ResourceMethodInvoker method = (ResourceMethodInvoker) request.getAttribute(ResourceMethodInvoker.class.getName());
MediaType[] mediaTypes = method.getProduces();
if (mediaTypes != null && getSseEventType(mediaTypes) != null) {
// @Produces("text/event-stream")
SseElementType sseElementType = FindAnnotation.findAnnotation(method.getMethodAnnotations(),
SseElementType.class);
if (sseElementType != null) {
// Get element media type from @SseElementType.
Map<String, String> parameterMap = new HashMap<>();
parameterMap.put(SseConstants.SSE_ELEMENT_MEDIA_TYPE, sseElementType.value());
MediaType mediaType = new MediaType(MediaType.SERVER_SENT_EVENTS_TYPE.getType(),
MediaType.SERVER_SENT_EVENTS_TYPE.getSubtype(), parameterMap);
jaxrsResponse = (BuiltResponse) Response.status(responseCode).type(mediaType).build();
} else {
Stream stream = FindAnnotation.findAnnotation(method.getMethodAnnotations(), Stream.class);
if (stream != null) {
// Get element media type from @Produces.
jaxrsResponse = (BuiltResponse) Response.ok("").build();
MediaType elementType = ServerResponseWriter.getResponseMediaType(jaxrsResponse, request, response,
providerFactory, method);
Map<String, String> parameterMap = new HashMap<>();
parameterMap.put(SseConstants.SSE_ELEMENT_MEDIA_TYPE, elementType.toString());
String[] streamType = getStreamType(method);
MediaType mediaType = new MediaType(streamType[0], streamType[1], parameterMap);
jaxrsResponse = (BuiltResponse) Response.ok().type(mediaType).build();
} else {
throw new RuntimeException(Messages.MESSAGES.expectedStreamOrSseMediaType());
}
// No element media type declared.
jaxrsResponse = (BuiltResponse) Response.status(responseCode).type(getSseEventType(mediaTypes)).build();
// // use "element-type=text/plain"?
}
} else {
Stream stream = FindAnnotation.findAnnotation(method.getMethodAnnotations(), Stream.class);
if (stream != null) {
// Get element media type from @Produces.
jaxrsResponse = (BuiltResponse) Response.ok("").build();
MediaType elementType = ServerResponseWriter.getResponseMediaType(jaxrsResponse, request, response,
providerFactory, method);
Map<String, String> parameterMap = new HashMap<>();
parameterMap.put(SseConstants.SSE_ELEMENT_MEDIA_TYPE, elementType.toString());
String[] streamType = getStreamType(method);
MediaType mediaType = new MediaType(streamType[0], streamType[1], parameterMap);
jaxrsResponse = (BuiltResponse) Response.status(responseCode).type(mediaType).build();
} else {
throw new RuntimeException(Messages.MESSAGES.expectedStreamOrSseMediaType());
}
}
return jaxrsResponse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import jakarta.ws.rs.HeaderParam;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.ServiceUnavailableException;
import jakarta.ws.rs.WebApplicationException;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.HttpHeaders;
Expand Down Expand Up @@ -96,7 +97,8 @@ public void testReconnectDelay(@Context SseEventSink sseEventSink, @Context Sse
public void sseLost(@Context SseEventSink sink, @Context Sse sse) {
if (tryCount != 0) {
tryCount--;
sink.close();
// Throw a service unavailable, 503, with a 1 second retry.
throw new ServiceUnavailableException(1L);
} else {
try (SseEventSink s = sink) {
s.send(sse.newEvent("MESSAGE"));
Expand Down

0 comments on commit 931db08

Please sign in to comment.