Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RESTEASY-2985] Only run the complete listeners if a 204 response is … #3863

Merged
merged 1 commit into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

Expand Down Expand Up @@ -50,6 +51,7 @@ private enum State {
private final List<Consumer<Throwable>> onErrorConsumers = new CopyOnWriteArrayList<>();

private final List<Runnable> onCompleteConsumers = new CopyOnWriteArrayList<>();
private final AtomicBoolean completeListenersInvoked = new AtomicBoolean(false);

private final boolean alwaysReconnect;

Expand Down Expand Up @@ -221,6 +223,12 @@ public boolean close(final long timeout, final TimeUnit unit) {
}
}

private void runCompleteConsumers() {
if (completeListenersInvoked.compareAndSet(false, true)) {
onCompleteConsumers.forEach(Runnable::run);
}
}

private void internalClose() {
if (state.getAndSet(State.CLOSED) == State.CLOSED) {
return;
Expand All @@ -236,7 +244,7 @@ private void internalClose() {
}
}
sseEventSourceScheduler.shutdownNow();
onCompleteConsumers.forEach(Runnable::run);
runCompleteConsumers();
}

private class EventHandler implements Runnable {
Expand Down Expand Up @@ -289,6 +297,11 @@ public void run() {
response = clientResponse;
if (Family.SUCCESSFUL.equals(clientResponse.getStatusInfo().getFamily())) {
onConnection();
if (clientResponse.getStatus() == 204) {
// On a 204, only the onComplete() should be invoked
runCompleteConsumers();
return;
}
eventInput = clientResponse.readEntity(SseEventInputImpl.class);
//if 200<= response code <300 and response contentType is null, fail the connection.
if (eventInput == null) {
Expand Down Expand Up @@ -320,10 +333,9 @@ public void run() {
onUnrecoverableError(e);
return;
}

final Providers providers = (ClientConfiguration) target.getConfiguration();
while (!Thread.currentThread().isInterrupted() && state.get() == State.OPEN) {
if (eventInput != null && eventInput.isClosed()) {
if (eventInput == null || eventInput.isClosed()) {
if (alwaysReconnect) {
reconnect(reconnectDelay);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.Optional;
import java.util.function.Supplier;

import jakarta.ws.rs.sse.SseEventSink;

import org.jboss.resteasy.resteasy_jaxrs.i18n.LogMessages;
import org.jboss.resteasy.spi.util.Functions;

Expand Down Expand Up @@ -63,6 +65,18 @@ public class Options<T> {
Threshold.class,
Functions.singleton(() -> Threshold.of(50L, SizeUnit.MEGABYTE)));

/**
* An option which allows which HTTP status code should be sent when the {@link SseEventSink#close()} is invoked.
* In some implementations 200 (OK) is the default. However, RESTEasy prefers 204 (No Content) as no content has
* been sent the response.
* <p>
* The default is 204 - No Content
* </p>
*/
public static final Options<Integer> SSE_CLOSED_RESPONSE_CODE = new Options<>("dev.resteasy.sse.closed.response.code",
Integer.class,
Functions.singleton(() -> 204));

private final String key;
private final Class<T> name;
private final Supplier<T> dftValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@
import org.jboss.resteasy.spi.AsyncOutputStream;
import org.jboss.resteasy.spi.HttpRequest;
import org.jboss.resteasy.spi.HttpResponse;
import org.jboss.resteasy.spi.HttpResponseCodes;
import org.jboss.resteasy.spi.ResteasyAsynchronousContext;
import org.jboss.resteasy.spi.ResteasyAsynchronousResponse;
import org.jboss.resteasy.spi.ResteasyProviderFactory;
import org.jboss.resteasy.spi.config.Options;
import org.jboss.resteasy.spi.util.FindAnnotation;

public class SseEventOutputImpl extends GenericType<OutboundSseEvent> implements SseEventSink {
Expand Down Expand Up @@ -306,43 +308,45 @@ private CompletionStage<Void> internalWriteEvent(final OutboundSseEvent event) {

private BuiltResponse createResponse() {
BuiltResponse jaxrsResponse;
final int responseCode;
if (state.get() == CLOSED) {
jaxrsResponse = (BuiltResponse) Response.noContent().build();
responseCode = Options.SSE_CLOSED_RESPONSE_CODE.getValue();
} else //set back to client 200 OK to implies the SseEventOutput is ready
{
ResourceMethodInvoker method = (ResourceMethodInvoker) request.getAttribute(ResourceMethodInvoker.class.getName());
MediaType[] mediaTypes = method.getProduces();
if (mediaTypes != null && getSseEventType(mediaTypes) != null) {
// @Produces("text/event-stream")
SseElementType sseElementType = FindAnnotation.findAnnotation(method.getMethodAnnotations(),
SseElementType.class);
if (sseElementType != null) {
// Get element media type from @SseElementType.
Map<String, String> parameterMap = new HashMap<>();
parameterMap.put(SseConstants.SSE_ELEMENT_MEDIA_TYPE, sseElementType.value());
MediaType mediaType = new MediaType(MediaType.SERVER_SENT_EVENTS_TYPE.getType(),
MediaType.SERVER_SENT_EVENTS_TYPE.getSubtype(), parameterMap);
jaxrsResponse = (BuiltResponse) Response.ok().type(mediaType).build();
} else {
// No element media type declared.
jaxrsResponse = (BuiltResponse) Response.ok().type(getSseEventType(mediaTypes)).build();
// // use "element-type=text/plain"?
}
responseCode = HttpResponseCodes.SC_OK;
}
ResourceMethodInvoker method = (ResourceMethodInvoker) request.getAttribute(ResourceMethodInvoker.class.getName());
MediaType[] mediaTypes = method.getProduces();
if (mediaTypes != null && getSseEventType(mediaTypes) != null) {
// @Produces("text/event-stream")
SseElementType sseElementType = FindAnnotation.findAnnotation(method.getMethodAnnotations(),
SseElementType.class);
if (sseElementType != null) {
// Get element media type from @SseElementType.
Map<String, String> parameterMap = new HashMap<>();
parameterMap.put(SseConstants.SSE_ELEMENT_MEDIA_TYPE, sseElementType.value());
MediaType mediaType = new MediaType(MediaType.SERVER_SENT_EVENTS_TYPE.getType(),
MediaType.SERVER_SENT_EVENTS_TYPE.getSubtype(), parameterMap);
jaxrsResponse = (BuiltResponse) Response.status(responseCode).type(mediaType).build();
} else {
Stream stream = FindAnnotation.findAnnotation(method.getMethodAnnotations(), Stream.class);
if (stream != null) {
// Get element media type from @Produces.
jaxrsResponse = (BuiltResponse) Response.ok("").build();
MediaType elementType = ServerResponseWriter.getResponseMediaType(jaxrsResponse, request, response,
providerFactory, method);
Map<String, String> parameterMap = new HashMap<>();
parameterMap.put(SseConstants.SSE_ELEMENT_MEDIA_TYPE, elementType.toString());
String[] streamType = getStreamType(method);
MediaType mediaType = new MediaType(streamType[0], streamType[1], parameterMap);
jaxrsResponse = (BuiltResponse) Response.ok().type(mediaType).build();
} else {
throw new RuntimeException(Messages.MESSAGES.expectedStreamOrSseMediaType());
}
// No element media type declared.
jaxrsResponse = (BuiltResponse) Response.status(responseCode).type(getSseEventType(mediaTypes)).build();
// // use "element-type=text/plain"?
}
} else {
Stream stream = FindAnnotation.findAnnotation(method.getMethodAnnotations(), Stream.class);
if (stream != null) {
// Get element media type from @Produces.
jaxrsResponse = (BuiltResponse) Response.ok("").build();
MediaType elementType = ServerResponseWriter.getResponseMediaType(jaxrsResponse, request, response,
providerFactory, method);
Map<String, String> parameterMap = new HashMap<>();
parameterMap.put(SseConstants.SSE_ELEMENT_MEDIA_TYPE, elementType.toString());
String[] streamType = getStreamType(method);
MediaType mediaType = new MediaType(streamType[0], streamType[1], parameterMap);
jaxrsResponse = (BuiltResponse) Response.status(responseCode).type(mediaType).build();
} else {
throw new RuntimeException(Messages.MESSAGES.expectedStreamOrSseMediaType());
}
}
return jaxrsResponse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import jakarta.ws.rs.HeaderParam;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.ServiceUnavailableException;
import jakarta.ws.rs.WebApplicationException;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.HttpHeaders;
Expand Down Expand Up @@ -96,7 +97,8 @@ public void testReconnectDelay(@Context SseEventSink sseEventSink, @Context Sse
public void sseLost(@Context SseEventSink sink, @Context Sse sse) {
if (tryCount != 0) {
tryCount--;
sink.close();
// Throw a service unavailable, 503, with a 1 second retry.
throw new ServiceUnavailableException(1L);
} else {
try (SseEventSink s = sink) {
s.send(sse.newEvent("MESSAGE"));
Expand Down