Skip to content

Commit

Permalink
Write headers just before commit
Browse files Browse the repository at this point in the history
Fixes #13926
  • Loading branch information
stuartwdouglas committed Dec 17, 2020
1 parent 5ef0548 commit 76cd569
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 42 deletions.
Expand Up @@ -40,13 +40,15 @@
import io.quarkus.resteasy.reactive.server.runtime.ResteasyReactiveSecurityContext;
import io.quarkus.security.identity.SecurityIdentity;
import io.quarkus.vertx.http.runtime.security.QuarkusHttpUser;
import io.undertow.server.HttpServerExchange;
import io.undertow.server.ResponseCommitListener;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.net.impl.ConnectionBase;
import io.vertx.ext.web.RoutingContext;

public class ServletRequestContext extends ResteasyReactiveRequestContext
implements ServerHttpRequest, ServerHttpResponse {
implements ServerHttpRequest, ServerHttpResponse, ResponseCommitListener {

private static final LazyValue<Event<SecurityIdentity>> SECURITY_IDENTITY_EVENT = new LazyValue<>(
ServletRequestContext::createEvent);
Expand All @@ -57,15 +59,17 @@ public class ServletRequestContext extends ResteasyReactiveRequestContext
ServletWriteListener writeListener;
byte[] asyncWriteData;
Consumer<Throwable> asyncWriteHandler;
protected Consumer<ResteasyReactiveRequestContext> preCommitTask;

public ServletRequestContext(Deployment deployment, ProvidersImpl providers,
HttpServletRequest request, HttpServletResponse response,
ThreadSetupAction requestContext, ServerRestHandler[] handlerChain, ServerRestHandler[] abortHandlerChain,
RoutingContext context) {
RoutingContext context, HttpServerExchange exchange) {
super(deployment, providers, requestContext, handlerChain, abortHandlerChain);
this.request = request;
this.response = response;
this.context = context;
exchange.addResponseCommitListener(this);
}

protected boolean isRequestScopeManagementRequired() {
Expand Down Expand Up @@ -453,6 +457,18 @@ public OutputStream createResponseOutputStream() {
}
}

@Override
public void setPreCommitListener(Consumer<ResteasyReactiveRequestContext> task) {
preCommitTask = task;
}

@Override
public void beforeCommit(HttpServerExchange exchange) {
if (preCommitTask != null) {
preCommitTask.accept(this);
}
}

class ServletWriteListener implements WriteListener {

private final ServletOutputStream outputStream;
Expand Down
Expand Up @@ -24,6 +24,6 @@ public ResteasyReactiveRequestContext createContext(Deployment deployment, Provi
io.undertow.servlet.handlers.ServletRequestContext src = (io.undertow.servlet.handlers.ServletRequestContext) context;
return new ServletRequestContext(deployment, providers, (HttpServletRequest) src.getServletRequest(),
(HttpServletResponse) src.getServletResponse(), requestContext, handlerChain, abortHandlerChain,
(RoutingContext) ((VertxHttpExchange) src.getExchange().getDelegate()).getContext());
(RoutingContext) ((VertxHttpExchange) src.getExchange().getDelegate()).getContext(), src.getExchange());
}
}
Expand Up @@ -17,6 +17,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.ws.rs.RuntimeType;
import javax.ws.rs.WebApplicationException;
Expand Down Expand Up @@ -57,6 +58,13 @@

public class ServerSerialisers extends Serialisers {

private static final Consumer<ResteasyReactiveRequestContext> HEADER_FUNCTION = new Consumer<ResteasyReactiveRequestContext>() {
@Override
public void accept(ResteasyReactiveRequestContext context) {
ServerSerialisers.encodeResponseHeaders(context);
}
};

public static BuiltinReader[] BUILTIN_READERS = new BuiltinReader[] {
new BuiltinReader(String.class, ServerStringMessageBodyHandler.class,
MediaType.WILDCARD),
Expand Down Expand Up @@ -164,49 +172,62 @@ public static boolean invokeWriter(ResteasyReactiveRequestContext context, Objec

WriterInterceptor[] writerInterceptors = context.getWriterInterceptors();
boolean outputStreamSet = context.getOutputStream() != null;
if (writer instanceof ServerMessageBodyWriter && writerInterceptors == null && !outputStreamSet) {
ServerMessageBodyWriter<Object> quarkusRestWriter = (ServerMessageBodyWriter<Object>) writer;
RuntimeResource target = context.getTarget();
ServerSerialisers.encodeResponseHeaders(context);
Type genericType;
if (context.hasGenericReturnType()) { // make sure that when a Response with a GenericEntity was returned, we use it
genericType = context.getGenericReturnType();
} else {
genericType = target == null ? null : target.getReturnType();
}
Class<?> entityClass = entity.getClass();
if (quarkusRestWriter.isWriteable(
entityClass,
genericType,
target == null ? null : target.getLazyMethod(),
context.getResponseMediaType())) {
if (mediaType != null) {
context.setResponseContentType(mediaType);
context.serverResponse().setPreCommitListener(HEADER_FUNCTION);
try {
if (writer instanceof ServerMessageBodyWriter && writerInterceptors == null && !outputStreamSet) {
ServerMessageBodyWriter<Object> quarkusRestWriter = (ServerMessageBodyWriter<Object>) writer;
RuntimeResource target = context.getTarget();
Type genericType;
if (context.hasGenericReturnType()) { // make sure that when a Response with a GenericEntity was returned, we use it
genericType = context.getGenericReturnType();
} else {
genericType = target == null ? null : target.getReturnType();
}
quarkusRestWriter.writeResponse(entity, genericType, context);
return true;
} else {
return false;
}
} else {
if (writer.isWriteable(entity.getClass(), context.getGenericReturnType(), context.getAllAnnotations(),
context.getResponseMediaType())) {
Response response = context.getResponse().get();
if (mediaType != null) {
context.setResponseContentType(mediaType);
Class<?> entityClass = entity.getClass();
if (quarkusRestWriter.isWriteable(
entityClass,
genericType,
target == null ? null : target.getLazyMethod(),
context.getResponseMediaType())) {
if (mediaType != null) {
context.setResponseContentType(mediaType);
}
quarkusRestWriter.writeResponse(entity, genericType, context);
return true;
} else {
return false;
}
if (writerInterceptors == null) {
writer.writeTo(entity, entity.getClass(), context.getGenericReturnType(),
context.getAllAnnotations(), response.getMediaType(), response.getHeaders(),
context.getOrCreateOutputStream());
ServerSerialisers.encodeResponseHeaders(context);
context.getOrCreateOutputStream().close();
} else {
if (writer.isWriteable(entity.getClass(), context.getGenericReturnType(), context.getAllAnnotations(),
context.getResponseMediaType())) {
Response response = context.getResponse().get();
if (mediaType != null) {
context.setResponseContentType(mediaType);
}
if (writerInterceptors == null) {
writer.writeTo(entity, entity.getClass(), context.getGenericReturnType(),
context.getAllAnnotations(), response.getMediaType(), response.getHeaders(),
context.getOrCreateOutputStream());
context.getOrCreateOutputStream().close();
} else {
runWriterInterceptors(context, entity, writer, response, writerInterceptors, serialisers);
}
return true;
} else {
runWriterInterceptors(context, entity, writer, response, writerInterceptors, serialisers);
return false;
}
return true;
}
} catch (Throwable e) {
//clear the pre-commit listener, as if this error is unrecoverable
//the error handling will want to write out its own response
//and the pre commit listener will interfere with that
context.serverResponse().setPreCommitListener(null);
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
} else if (e instanceof IOException) {
throw (IOException) e;
} else {
return false;
throw new RuntimeException(e);
}
}
}
Expand Down
Expand Up @@ -4,6 +4,7 @@
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext;

public interface ServerHttpResponse {

Expand Down Expand Up @@ -34,4 +35,6 @@ public interface ServerHttpResponse {
CompletionStage<Void> write(byte[] data);

OutputStream createResponseOutputStream();

void setPreCommitListener(Consumer<ResteasyReactiveRequestContext> task);
}
Expand Up @@ -29,11 +29,12 @@
import org.jboss.resteasy.reactive.spi.ThreadSetupAction;

public class VertxResteasyReactiveRequestContext extends ResteasyReactiveRequestContext
implements ServerHttpRequest, ServerHttpResponse {
implements ServerHttpRequest, ServerHttpResponse, Handler<Void> {

protected final RoutingContext context;
protected final HttpServerRequest request;
protected final HttpServerResponse response;
protected Consumer<ResteasyReactiveRequestContext> preCommitTask;

public VertxResteasyReactiveRequestContext(Deployment deployment, ProvidersImpl providers,
RoutingContext context,
Expand All @@ -42,6 +43,7 @@ public VertxResteasyReactiveRequestContext(Deployment deployment, ProvidersImpl
this.context = context;
this.request = context.request();
this.response = context.response();
context.addHeadersEndHandler(this);
}

public RoutingContext getContext() {
Expand Down Expand Up @@ -313,4 +315,16 @@ public void handle(AsyncResult<Void> event) {
public OutputStream createResponseOutputStream() {
return new ResteasyReactiveOutputStream(this);
}

@Override
public void setPreCommitListener(Consumer<ResteasyReactiveRequestContext> task) {
preCommitTask = task;
}

@Override
public void handle(Void event) {
if (preCommitTask != null) {
preCommitTask.accept(this);
}
}
}

0 comments on commit 76cd569

Please sign in to comment.