Skip to content

Commit

Permalink
[RESTEASY-2506] Async io (#2170)
Browse files Browse the repository at this point in the history
* Support async IO message body writers

* Ignore new Eclipse file

* Support async IO in JSON-B

* UndertowTestRunner: support Application classes

* Flush SSE on close, make sure send events are atomic

* Complete flush promises

* Fixed the ordering issue

* Do not fullfil promises more than once

* Added async writer interceptors

* Implement more async body writers

* Made type argument lookup work for super-interfaces

* Most async writers done, added a Writer interface, Buffered impl/compat interface

* Moved AsyncMessageBodyWriter to spi

* Async writer tests on vertx

* Async the SSE writer

* Renamed AsyncOutputStream.rx* to async*

* Fixed bug introduced in master

* DOCS

* Bytes and proper encodings

* MultiPromise: add synchronized

* Test throwing in async writers/interceptors

* Flush servlet async operation queue on error
  • Loading branch information
FroMage committed Feb 19, 2020
1 parent a8695fb commit bf233e6
Show file tree
Hide file tree
Showing 143 changed files with 5,277 additions and 452 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ target
**/.project
**/.classpath
**/.checkstyle
**/.factorypath


# Packages #
Expand Down
46 changes: 46 additions & 0 deletions docbook/reference/en/en-US/modules/Asynchronous_HTTP.xml
Original file line number Diff line number Diff line change
Expand Up @@ -140,4 +140,50 @@ public class SimpleResource
order to function as specified. Synchronous methods and asynchronous methods will work as specified by the spec.
</para>
</sect1>
<sect1 id="Asynchronous_IO">
<title>Asynchronous IO</title>
<para>
Some backends support asynchronous IO operations (Servlet, Undertow, Vert.x, Quarkus, Netty), which are exposed using the
<code>AsyncOutputStream</code> subtype of <code>OutputStream</code>. It includes async variants for writing and flushing
the stream.
</para>
<para>
Some backends have what is called an "Event Loop Thread", which is a thread responsible for doing all IO operations. Those
backends require the Event Loop Thread to never be blocked, because it does IO for every other thread. Those backends
typically require JAX-RS endpoints to be invoked on worker threads, to make sure they never block the Event Loop Thread.
</para>
<para>
Sometimes, with Async programming, it is possible for asynchronous JAX-RS requests to be resumed from the Event Loop Thread.
As a result, JAX-RS will attempt to serialise the response and send it to the client. But JAX-RS is written using "Blocking IO"
mechanics, such as <code>OutputStream</code> (used by <code>MessageBodyWriter</code> and <code>WriterInterceptor</code>), which
means that sending the response will block the current thread until the response is received. This would work on a worker thread,
but if it happens on the Event Loop Thread it will block it and prevent it from sending the response, resulting in a deadlock.
</para>
<para>
As a result, we've decided to support and expose Async IO interfaces in the form of <code>AsyncOutputStream</code>,
<code>AsyncMessageBodyWriter</code> and <code>AsyncWriterInterceptor</code>, to allow users to write Async IO applications
in RESTEasy.
</para>
<para>
Most built-in <code>MessageBodyWriter</code> and <code>WriterInterceptor</code> support Async IO, with the notable exceptions of:
</para>
<itemizedlist>
<listitem><para><code>HtmlRenderableWriter</code>, which is tied to servlet APIs</para></listitem>
<listitem><para><code>ReaderProvider</code></para></listitem>
<listitem><para><code>StreamingOutputProvider</code>: use <code>AsyncStreamingOutput</code> instead</para></listitem>
<listitem><para><code>GZIPEncodingInterceptor</code></para></listitem>
</itemizedlist>
<para>
Async IO will be preferred if the following conditions are met:
</para>
<itemizedlist>
<listitem><para>The backend supports it</para></listitem>
<listitem><para>The writer supports it</para></listitem>
<listitem><para>All writer interceptors support it</para></listitem>
</itemizedlist>
<para>
If those conditions are not met, and you attempt to use Blocking IO on an Event Loop Thread (as determined by the
backend), then an exception will be thrown.
</para>
</sect1>
</chapter>
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,25 @@ public class TempFileDeletionResource
}
}
</programlisting>
<para>
Note that in order to support <link linkend="Asynchronous_IO">Async IO</link>, you need to implement the
<code>AsyncMessageBodyWriter</code> interface, which requires you to implement this extra method:
</para>
<programlisting>
@Provider
@Produces("text/plain")
@Consumes("text/plain")
public class DefaultTextPlain implements MessageBodyReader, AsyncMessageBodyWriter {
// ...
public CompletionStage&lt;Void&gt; asyncWriteTo(Object o, Class type, Type genericType, Annotation[] annotations, MediaType mediaType, MultivaluedMap httpHeaders, AsyncOutputStream entityStream) {
String charset = mediaType.getParameters().get("charset");
if (charset == null)
return entityStream.asyncWrite(o.toString().getBytes(StandardCharsets.UTF_8));
else
return entityStream.asyncWrite(o.toString().getBytes(charset));
}
}
</programlisting>
<para>


Expand Down
9 changes: 9 additions & 0 deletions docbook/reference/en/en-US/modules/Interceptors.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@
</listitem>
</varlistentry>
</variablelist>
<para>
You can also do async processing inside your <code>AsyncWriterInterceptor</code> (if you are using
<link linkend="Asynchronous_IO">Async IO</link>), which is the asynchronous-supporting equivalent to
<code>WriterInterceptor</code>. In this case, you don't need to manually suspend or resume the request.
</para>
</sect2>
</sect1>
<sect1>
Expand Down Expand Up @@ -125,6 +130,10 @@
They can be used to implement a specific content-encoding. They can be used to generate digital signatures or
to post or pre-process a Java object model before or after it is marshalled.
</para>
<para>
Note that in order to support Async IO, you can implement <code>AsyncWriterInterceptor</code>, which is a subtype of
<code>WriterInterceptor</code>.
</para>
</sect1>
<sect1>
<title>Per Resource Method Filters and Interceptors</title>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import javax.ws.rs.ConstrainedTo;
import javax.ws.rs.RuntimeType;
Expand All @@ -14,12 +16,14 @@
import javax.ws.rs.ext.ContextResolver;
import javax.ws.rs.ext.Provider;
import javax.ws.rs.ext.Providers;
import javax.ws.rs.ext.WriterInterceptor;
import javax.ws.rs.ext.WriterInterceptorContext;

import org.jboss.resteasy.core.MediaTypeMap;
import org.jboss.resteasy.core.ResteasyContext;
import org.jboss.resteasy.resteasy_jaxrs.i18n.LogMessages;
import org.jboss.resteasy.spi.AsyncOutputWriter;
import org.jboss.resteasy.spi.AsyncWriterInterceptor;
import org.jboss.resteasy.spi.AsyncWriterInterceptorContext;
import org.jboss.resteasy.spi.ResteasyConfiguration;
import org.jboss.resteasy.util.CommitHeaderOutputStream;

Expand Down Expand Up @@ -56,7 +60,7 @@
*/
@Provider
@ConstrainedTo(RuntimeType.SERVER)
public class Jackson2JsonpInterceptor implements WriterInterceptor{
public class Jackson2JsonpInterceptor implements AsyncWriterInterceptor{

/**
* "text/javascript" media type. Default media type of script tags.
Expand Down Expand Up @@ -183,6 +187,33 @@ public void aroundWriteTo(WriterInterceptorContext context) throws IOException,
}
}

@Override
public CompletionStage<Void> asyncAroundWriteTo(AsyncWriterInterceptorContext context) {
LogMessages.LOGGER.debugf("Interceptor : %s, Method : aroundWriteTo", getClass().getName());

String function = uri.getQueryParameters().getFirst(callbackQueryParameter);
if (enabled && function != null && !function.trim().isEmpty() && !jsonpCompatibleMediaTypes.getPossible(context.getMediaType()).isEmpty()){

AsyncOutputWriter writer = new AsyncOutputWriter(context.getAsyncOutputStream());
CompletionStage<Void> ret = CompletableFuture.completedFuture(null);
if (wrapInTryCatch) {
ret = ret.thenCompose(v -> writer.asyncWrite("try{"));
}
ret = ret.thenCompose(v -> writer.asyncWrite(function + "("))
.thenCompose(v -> writer.asyncFlush())
.thenCompose(v -> context.asyncProceed())
.thenCompose(v -> writer.asyncFlush())
.thenCompose(v -> writer.asyncWrite(")"));

if (wrapInTryCatch) {
ret = ret.thenCompose(v -> writer.asyncWrite("}catch(e){}"));
}
return ret.thenCompose(v -> writer.asyncFlush());
} else {
return context.asyncProceed();
}
}

/**
* Search for an {@link ObjectMapper} for the given class and mediaType
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.jboss.resteasy.annotations.providers.jackson.Formatted;
import org.jboss.resteasy.core.interception.jaxrs.DecoratorMatcher;
import org.jboss.resteasy.core.messagebody.AsyncBufferedMessageBodyWriter;
import org.jboss.resteasy.resteasy_jaxrs.i18n.LogMessages;
import org.jboss.resteasy.util.DelegatingOutputStream;

Expand Down Expand Up @@ -48,7 +49,7 @@
@Provider
@Consumes({"application/json", "application/*+json", "text/json"})
@Produces({"application/json", "application/*+json", "text/json"})
public class ResteasyJackson2Provider extends JacksonJaxbJsonProvider
public class ResteasyJackson2Provider extends JacksonJaxbJsonProvider implements AsyncBufferedMessageBodyWriter<Object>
{

DecoratorMatcher decoratorMatcher = new DecoratorMatcher();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.jboss.resteasy.core.ResteasyContext;
import org.jboss.resteasy.core.interception.jaxrs.DecoratorMatcher;
import org.jboss.resteasy.core.messagebody.AsyncBufferedMessageBodyWriter;
import org.jboss.resteasy.plugins.providers.AbstractEntityProvider;
import org.jboss.resteasy.plugins.providers.jaxb.i18n.LogMessages;
import org.jboss.resteasy.plugins.providers.jaxb.i18n.Messages;
Expand All @@ -37,7 +38,7 @@
* @version $Revision:$
* @param <T> type
*/
public abstract class AbstractJAXBProvider<T> extends AbstractEntityProvider<T>
public abstract class AbstractJAXBProvider<T> extends AbstractEntityProvider<T> implements AsyncBufferedMessageBodyWriter<T>
{
@Context
protected Providers providers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.ext.ContextResolver;
import javax.ws.rs.ext.MessageBodyReader;
import javax.ws.rs.ext.MessageBodyWriter;
import javax.ws.rs.ext.Provider;
import javax.ws.rs.ext.Providers;
import javax.xml.bind.JAXBContext;
Expand All @@ -42,6 +41,7 @@
import org.jboss.resteasy.annotations.providers.jaxb.DoNotUseJAXBProvider;
import org.jboss.resteasy.annotations.providers.jaxb.Wrapped;
import org.jboss.resteasy.core.ResteasyContext;
import org.jboss.resteasy.core.messagebody.AsyncBufferedMessageBodyWriter;
import org.jboss.resteasy.plugins.providers.jaxb.i18n.LogMessages;
import org.jboss.resteasy.plugins.providers.jaxb.i18n.Messages;
import org.jboss.resteasy.spi.ResteasyConfiguration;
Expand All @@ -57,7 +57,7 @@
@Provider
@Produces({"application/xml", "application/*+xml", "text/xml", "text/*+xml"})
@Consumes({"application/xml", "application/*+xml", "text/xml", "text/*+xml"})
public class CollectionProvider implements MessageBodyReader<Object>, MessageBodyWriter<Object>
public class CollectionProvider implements MessageBodyReader<Object>, AsyncBufferedMessageBodyWriter<Object>
{
@Context
protected Providers providers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.lang.annotation.Annotation;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletionStage;

import javax.ws.rs.Consumes;
import javax.ws.rs.Produces;
Expand All @@ -21,6 +22,7 @@
import javax.xml.transform.stream.StreamSource;

import org.jboss.resteasy.resteasy_jaxrs.i18n.LogMessages;
import org.jboss.resteasy.spi.AsyncOutputStream;
import org.jboss.resteasy.spi.util.Types;
import org.jboss.resteasy.util.NoContent;
import org.xml.sax.InputSource;
Expand Down Expand Up @@ -139,4 +141,13 @@ public void writeTo(JAXBElement<?> t,
super.writeTo(t, typeArg, genericType, annotations, mediaType, httpHeaders, outputStream);
}

@Override
public CompletionStage<Void> asyncWriteTo(JAXBElement<?> t, Class<?> type, Type genericType, Annotation[] annotations,
MediaType mediaType, MultivaluedMap<String, Object> httpHeaders,
AsyncOutputStream outputStream) {
LogMessages.LOGGER.debugf("Provider : %s, Method : writeTo", getClass().getName());
Class<?> typeArg = Object.class;
if (genericType != null) typeArg = Types.getTypeArgument(genericType);
return super.asyncWriteTo(t, typeArg, genericType, annotations, mediaType, httpHeaders, outputStream);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.ext.ContextResolver;
import javax.ws.rs.ext.MessageBodyReader;
import javax.ws.rs.ext.MessageBodyWriter;
import javax.ws.rs.ext.Provider;
import javax.ws.rs.ext.Providers;
import javax.xml.bind.JAXBContext;
Expand All @@ -36,6 +35,7 @@
import org.jboss.resteasy.annotations.providers.jaxb.DoNotUseJAXBProvider;
import org.jboss.resteasy.annotations.providers.jaxb.WrappedMap;
import org.jboss.resteasy.core.ResteasyContext;
import org.jboss.resteasy.core.messagebody.AsyncBufferedMessageBodyWriter;
import org.jboss.resteasy.plugins.providers.jaxb.i18n.LogMessages;
import org.jboss.resteasy.plugins.providers.jaxb.i18n.Messages;
import org.jboss.resteasy.spi.ResteasyConfiguration;
Expand All @@ -53,7 +53,7 @@
@Provider
@Produces({"application/xml", "application/*+xml", "text/xml", "text/*+xml"})
@Consumes({"application/xml", "application/*+xml", "text/xml", "text/*+xml"})
public class MapProvider implements MessageBodyReader<Object>, MessageBodyWriter<Object>
public class MapProvider implements MessageBodyReader<Object>, AsyncBufferedMessageBodyWriter<Object>
{
@Context
protected Providers providers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import java.io.OutputStream;
import java.lang.annotation.Annotation;
import java.lang.reflect.Type;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import javax.annotation.Priority;
import javax.json.bind.Jsonb;
Expand All @@ -15,13 +17,14 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.ext.MessageBodyReader;
import javax.ws.rs.ext.MessageBodyWriter;
import javax.ws.rs.ext.Provider;

import org.apache.commons.io.input.ProxyInputStream;
import org.jboss.resteasy.core.ResteasyContext;
import org.jboss.resteasy.plugins.providers.jsonb.i18n.Messages;
import org.jboss.resteasy.plugins.server.servlet.ResteasyContextParameters;
import org.jboss.resteasy.spi.AsyncMessageBodyWriter;
import org.jboss.resteasy.spi.AsyncOutputStream;
import org.jboss.resteasy.spi.ResteasyConfiguration;
import org.jboss.resteasy.util.DelegatingOutputStream;

Expand All @@ -33,7 +36,7 @@
@Consumes({"application/json", "application/*+json", "text/json"})
@Priority(Priorities.USER-100)
public class JsonBindingProvider extends AbstractJsonBindingProvider
implements MessageBodyReader<Object>, MessageBodyWriter<Object> {
implements MessageBodyReader<Object>, AsyncMessageBodyWriter<Object> {

private final boolean disabled;

Expand Down Expand Up @@ -141,4 +144,19 @@ public void flush() throws IOException {
throw new ProcessingException(Messages.MESSAGES.jsonBSerializationError(e.toString()), e);
}
}

@Override
public CompletionStage<Void> asyncWriteTo(Object t, Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType,
MultivaluedMap<String, Object> httpHeaders, AsyncOutputStream entityStream) {
Jsonb jsonb = getJsonb(type);
try
{
return entityStream.asyncWrite(jsonb.toJson(t).getBytes(getCharset(mediaType)));
} catch (Throwable e)
{
CompletableFuture<Void> ret = new CompletableFuture<>();
ret.completeExceptionally(new ProcessingException(Messages.MESSAGES.jsonBSerializationError(e.toString()), e));
return ret;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.ext.MessageBodyReader;
import javax.ws.rs.ext.MessageBodyWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.annotation.Annotation;
import java.lang.reflect.Type;

import org.jboss.resteasy.core.messagebody.AsyncBufferedMessageBodyWriter;
import org.jboss.resteasy.resteasy_jaxrs.i18n.LogMessages;

/**
Expand All @@ -23,7 +24,7 @@
*/
@Consumes({"application/json", "application/*+json", "text/json"})
@Produces({"application/json", "application/*+json", "text/json"})
public class JsonArrayProvider extends AbstractJsonpProvider implements MessageBodyReader<JsonArray>, MessageBodyWriter<JsonArray>
public class JsonArrayProvider extends AbstractJsonpProvider implements MessageBodyReader<JsonArray>, AsyncBufferedMessageBodyWriter<JsonArray>
{
@Override
public boolean isReadable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.ext.MessageBodyReader;
import javax.ws.rs.ext.MessageBodyWriter;

import org.jboss.resteasy.core.messagebody.AsyncBufferedMessageBodyWriter;
import org.jboss.resteasy.resteasy_jaxrs.i18n.LogMessages;

/**
Expand All @@ -25,7 +25,7 @@
*/
@Consumes({"application/json", "application/*+json", "text/json"})
@Produces({"application/json", "application/*+json", "text/json"})
public class JsonObjectProvider extends AbstractJsonpProvider implements MessageBodyReader<JsonObject>, MessageBodyWriter<JsonObject>
public class JsonObjectProvider extends AbstractJsonpProvider implements MessageBodyReader<JsonObject>, AsyncBufferedMessageBodyWriter<JsonObject>
{


Expand Down
Loading

0 comments on commit bf233e6

Please sign in to comment.