Skip to content

Commit

Permalink
Fixed TCK test, added passing version of SSE TCK test (#2305)
Browse files Browse the repository at this point in the history
* Fixed TCK test, added passing version of SSE TCK test

Re-resolve the writer after all interceptors are done, making sure we don't get a blocking
one after we started with an async one

* Changed behaviour of SseEventOutputImpl for TCK
  • Loading branch information
FroMage committed Feb 21, 2020
1 parent 5c95169 commit ca08b7c
Show file tree
Hide file tree
Showing 11 changed files with 346 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public abstract class AbstractWriterInterceptorContext implements WriterIntercep
{
protected RESTEasyTracingLogger tracingLogger;
protected WriterInterceptor[] interceptors;
protected boolean requireAsyncIO;
protected Object entity;
protected Class type;
protected Type genericType;
Expand Down Expand Up @@ -160,8 +161,10 @@ public AsyncOutputStream getAsyncOutputStream()
public CompletionStage<Void> getStarted() {
if(outputStream instanceof AsyncOutputStream
&& getWriter() instanceof AsyncMessageBodyWriter
&& interceptorsSupportAsyncIo())
&& interceptorsSupportAsyncIo()) {
requireAsyncIO = true;
return asyncProceed();
}
try
{
return syncProceed();
Expand Down Expand Up @@ -291,6 +294,8 @@ protected CompletionStage<Void> writeTo(AsyncMessageBodyWriter writer)
protected MessageBodyWriter getWriter()
{
MessageBodyWriter writer = resolveWriter();
if(requireAsyncIO && writer instanceof AsyncMessageBodyWriter == false)
throw new IllegalStateException("Cannot switch body writer from blocking to asynchronous during writer interceptor run");

if (writer == null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public class ServerWriterInterceptorContext extends AbstractWriterInterceptorCon
{
private HttpRequest request;
private Consumer<Throwable> onWriteComplete;
private MessageBodyWriter writer;

public ServerWriterInterceptorContext(final WriterInterceptor[] interceptors, final ResteasyProviderFactory providerFactory,
final Object entity, final Class type, final Type genericType, final Annotation[] annotations,
Expand All @@ -50,12 +49,10 @@ public ServerWriterInterceptorContext(final WriterInterceptor[] interceptors, fi
@Override
protected MessageBodyWriter resolveWriter()
{
if(writer == null) {
writer = ((ResteasyProviderFactoryImpl)providerFactory).getServerMessageBodyWriter(
type, genericType, annotations, mediaType, tracingLogger);
}
return writer;
return ((ResteasyProviderFactoryImpl)providerFactory).getServerMessageBodyWriter(
type, genericType, annotations, mediaType, tracingLogger);
}

@Override
void throwWriterNotFoundException()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,12 @@ public CompletionStage<?> send(OutboundSseEvent event)
{
if (closed)
{
CompletableFuture<?> ret = new CompletableFuture<>();
ret.completeExceptionally(new IllegalStateException(Messages.MESSAGES.sseEventSinkIsClosed()));
return ret;
// FIXME: should be this
// CompletableFuture<?> ret = new CompletableFuture<>();
// ret.completeExceptionally(new IllegalStateException(Messages.MESSAGES.sseEventSinkIsClosed()));
// return ret;
// But the TCK expects a real exception
throw new IllegalStateException(Messages.MESSAGES.sseEventSinkIsClosed());
}
// eager composition to guarantee ordering
CompletionStage<Void> a = internalFlushResponseToClient(true);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.jboss.resteasy.test.asyncio;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

@Path("async-io-writer")
public class AsyncIOWriterResource
{
@Produces(MediaType.TEXT_PLAIN)
@GET
public CompletionStage<String> get() {
return CompletableFuture.supplyAsync(() -> {
try
{
Thread.sleep(100);
} catch (InterruptedException e)
{
throw new RuntimeException(e);
}
return "Hello";
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package org.jboss.resteasy.test.asyncio;

import static org.jboss.resteasy.test.TestPortProvider.generateURL;

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;

import org.jboss.resteasy.plugins.server.vertx.VertxContainer;
import org.jboss.resteasy.spi.Registry;
import org.jboss.resteasy.spi.ResteasyDeployment;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class AsyncIOWriterTest
{

static Client client;
@BeforeClass
public static void setup() throws Exception
{
ResteasyDeployment deployment = VertxContainer.start();
deployment.getProviderFactory().register(MyTypeWriter.class);
deployment.getProviderFactory().register(MyTypeInterceptor.class);
Registry registry = deployment.getRegistry();
registry.addPerRequestResource(AsyncIOWriterResource.class);
client = ClientBuilder.newClient();
}

@AfterClass
public static void end() throws Exception
{
try
{
client.close();
}
catch (Exception e)
{

}
VertxContainer.stop();
}

@Test
public void testAsyncIoWriter() throws Exception
{
WebTarget target = client.target(generateURL("/async-io-writer"));
String val = target.request().get(String.class);
Assert.assertEquals("OK", val);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.jboss.resteasy.test.asyncio;

public class MyType
{

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.jboss.resteasy.test.asyncio;

import java.io.IOException;

import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.ext.Provider;
import javax.ws.rs.ext.WriterInterceptor;
import javax.ws.rs.ext.WriterInterceptorContext;

@Provider
public class MyTypeInterceptor implements WriterInterceptor
{

@Override
public void aroundWriteTo(WriterInterceptorContext context) throws IOException, WebApplicationException
{
if ("Hello".equals(context.getEntity())) {
context.setEntity(new MyType());
context.setMediaType(MediaType.TEXT_PLAIN_TYPE);
context.setType(MyType.class);
}
context.proceed();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package org.jboss.resteasy.test.asyncio;

import java.io.IOException;
import java.io.OutputStream;
import java.lang.annotation.Annotation;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;

import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.ext.MessageBodyWriter;
import javax.ws.rs.ext.Provider;

@Provider
public class MyTypeWriter implements MessageBodyWriter<MyType>
{

@Override
public boolean isWriteable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType)
{
return type == MyType.class;
}

@Override
public void writeTo(MyType t, Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType,
MultivaluedMap<String, Object> httpHeaders, OutputStream entityStream)
throws IOException, WebApplicationException
{
entityStream.write("OK".getBytes(StandardCharsets.UTF_8));
entityStream.flush();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package org.jboss.resteasy.test.asyncio;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.sse.Sse;
import javax.ws.rs.sse.SseEventSink;

@Path("close")
public class SSEResource {

private static volatile boolean exception = false;

private static volatile boolean isClosed = false;

@GET
@Path("reset")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void reset(@Context SseEventSink sink, @Context Sse sse) {
exception = false;
isClosed = false;
try (SseEventSink s = sink) {
s.send(sse.newEvent("RESET"));
}
}

@GET
@Path("send")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void send(@Context SseEventSink sink, @Context Sse sse) {
Thread t = new Thread(new Runnable() {
public void run() {
SseEventSink s = sink;
s.send(sse.newEvent("HELLO"));
s.close();
isClosed = s.isClosed();
if (!isClosed)
return;
s.close();
isClosed = s.isClosed();
if (!isClosed)
return;
s.close();
isClosed = s.isClosed();
if (!isClosed)
return;
try {
s.send(sse.newEvent("SOMETHING")).exceptionally(t -> {
if(t instanceof IllegalStateException)
exception = true;
return null;
});
} catch (IllegalStateException ise) {
exception = true;
}
}
});
t.start();
}

@GET
@Path("check")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void check(@Context SseEventSink sink, @Context Sse sse) {
try (SseEventSink s = sink) {
if (!isClosed) {
s.send(sse.newEvent("Not closed"));
return;
}
if (!exception) {
s.send(sse.newEvent("No IllegalStateException is thrown"));
return;
}
s.send(sse.newEvent("CHECK"));
}
}

@GET
@Path("closed")
@Produces(MediaType.TEXT_PLAIN)
public boolean isClosed() {
return isClosed;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package org.jboss.resteasy.test.asyncio;

import static org.jboss.resteasy.test.TestPortProvider.generateURL;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.sse.SseEventSource;

import org.jboss.resteasy.plugins.server.vertx.VertxContainer;
import org.jboss.resteasy.spi.Registry;
import org.jboss.resteasy.spi.ResteasyDeployment;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class SSETest
{
static Client client;
@BeforeClass
public static void setup() throws Exception
{
ResteasyDeployment deployment = VertxContainer.start();
Registry registry = deployment.getRegistry();
registry.addPerRequestResource(SSEResource.class);
client = ClientBuilder.newClient();
}

@AfterClass
public static void end() throws Exception
{
try
{
client.close();
}
catch (Exception e)
{

}
VertxContainer.stop();
}

@Test
public void testSSE() throws Exception
{
WebTarget target = client.target(generateURL("/close/closed"));
querySSEAndAssert("RESET", "/close/reset");
querySSEAndAssert("HELLO", "/close/send");


boolean closed = false;
int cnt = 0;
while (!closed && cnt < 20) {
closed = target.request().get(Boolean.class);
Thread.sleep(200);
cnt++;
}

querySSEAndAssert("CHECK", "/close/check");
}

private void querySSEAndAssert(String message, String uri) throws InterruptedException, ExecutionException, TimeoutException
{
WebTarget target = client.target(generateURL(uri));
SseEventSource source = SseEventSource.target(target).build();
CompletableFuture<String> cf = new CompletableFuture<>();
source.register(event -> {
cf.complete(event.readData());
},
error -> {
cf.completeExceptionally(error);
},
() -> {
if(!cf.isDone())
cf.completeExceptionally(new RuntimeException("closed with no data"));
});
source.open();
try (SseEventSource x = source){
Assert.assertEquals(message, cf.get(5, TimeUnit.SECONDS));
}
}
}
Loading

0 comments on commit ca08b7c

Please sign in to comment.