Skip to content

Commit

Permalink
Delay invocation of onComplete listeners until the initial thread has…
Browse files Browse the repository at this point in the history
… returned
  • Loading branch information
stuartwdouglas committed Apr 7, 2015
1 parent 0b5a826 commit 2962f45
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 26 deletions.
Expand Up @@ -272,38 +272,36 @@ public synchronized void complete() {
timeoutKey.remove(); timeoutKey.remove();
timeoutKey = null; timeoutKey = null;
} }
onAsyncComplete();
if(!dispatched) { if(!dispatched) {
completeInternal(); completeInternal();
} else {
onAsyncComplete();
} }
if(previousAsyncContext != null) { if(previousAsyncContext != null) {
previousAsyncContext.complete(); previousAsyncContext.complete();
} }
} }


public synchronized void completeInternal() { public synchronized void completeInternal() {
if(timeoutKey != null) {
timeoutKey.remove();
timeoutKey = null;
}
servletRequestContext.getOriginalRequest().asyncRequestDispatched(); servletRequestContext.getOriginalRequest().asyncRequestDispatched();
Thread currentThread = Thread.currentThread(); Thread currentThread = Thread.currentThread();
if (!initialRequestDone && currentThread == initiatingThread) { if (!initialRequestDone && currentThread == initiatingThread) {
//the context was stopped in the same request context it was started, we don't do anything //TODO: according to the spec we should delay this until the container initiated thread has returned?

onAsyncComplete();
if (dispatched) { if (dispatched) {
throw UndertowServletMessages.MESSAGES.asyncRequestAlreadyDispatched(); throw UndertowServletMessages.MESSAGES.asyncRequestAlreadyDispatched();
} }
exchange.unDispatch(); exchange.unDispatch();
dispatched = true; dispatched = true;
initialRequestDone(); initialRequestDone();
} else { } else {
//we do not run the ServletRequestListeners here, as the request does not come into the scope
//of a web application, as defined by the javadoc on ServletRequestListener
if(currentThread == exchange.getIoThread()) { if(currentThread == exchange.getIoThread()) {
//the thread safety semantics here are a bit weird. //the thread safety semantics here are a bit weird.
//basically if we are doing async IO we can't do a dispatch here, as then the IO thread can be racing //basically if we are doing async IO we can't do a dispatch here, as then the IO thread can be racing
//with the dispatch thread. //with the dispatch thread.
//at all other times the dispatch is desirable //at all other times the dispatch is desirable
onAsyncComplete();
HttpServletResponseImpl response = servletRequestContext.getOriginalResponse(); HttpServletResponseImpl response = servletRequestContext.getOriginalResponse();
response.responseDone(); response.responseDone();
try { try {
Expand All @@ -315,6 +313,7 @@ public synchronized void completeInternal() {
doDispatch(new Runnable() { doDispatch(new Runnable() {
@Override @Override
public void run() { public void run() {
onAsyncComplete();


HttpServletResponseImpl response = servletRequestContext.getOriginalResponse(); HttpServletResponseImpl response = servletRequestContext.getOriginalResponse();
response.responseDone(); response.responseDone();
Expand Down Expand Up @@ -396,7 +395,7 @@ public void exchangeEvent(HttpServerExchange exchange, NextListener nextListener
} }


@Override @Override
public void setTimeout(final long timeout) { public synchronized void setTimeout(final long timeout) {
if (initialRequestDone) { if (initialRequestDone) {
throw UndertowServletMessages.MESSAGES.asyncRequestAlreadyReturnedToContainer(); throw UndertowServletMessages.MESSAGES.asyncRequestAlreadyReturnedToContainer();
} }
Expand Down
Expand Up @@ -20,8 +20,9 @@


import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;


import javax.servlet.AsyncEvent; import javax.servlet.AsyncEvent;


Expand All @@ -30,10 +31,31 @@
*/ */
public class AsyncEventListener implements javax.servlet.AsyncListener { public class AsyncEventListener implements javax.servlet.AsyncListener {


private static final List<String> EVENTS = Collections.synchronizedList(new ArrayList<String>()); private static final LinkedBlockingDeque<String> EVENTS = new LinkedBlockingDeque<>();


public static String[] results() { public static String[] results(int expected) {
String[] ret = EVENTS.toArray(new String[EVENTS.size()]); List<String> poll = new ArrayList<>();
String current = EVENTS.poll();
while (current != null) {
poll.add(current);
current = EVENTS.poll();
}
try {
if (poll.size() < expected) {
current = EVENTS.poll(5, TimeUnit.SECONDS);
while (current != null) {
poll.add(current);
if (poll.size() < expected) {
current = EVENTS.poll(5, TimeUnit.SECONDS);
} else {
current = EVENTS.poll();
}
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
String[] ret = poll.toArray(new String[poll.size()]);
EVENTS.clear(); EVENTS.clear();
return ret; return ret;
} }
Expand Down
Expand Up @@ -18,10 +18,6 @@


package io.undertow.servlet.test.listener.request.async.onError; package io.undertow.servlet.test.listener.request.async.onError;


import java.io.IOException;

import javax.servlet.ServletException;

import io.undertow.server.handlers.PathHandler; import io.undertow.server.handlers.PathHandler;
import io.undertow.servlet.api.DeploymentInfo; import io.undertow.servlet.api.DeploymentInfo;
import io.undertow.servlet.api.DeploymentManager; import io.undertow.servlet.api.DeploymentManager;
Expand All @@ -41,12 +37,14 @@
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;


import javax.servlet.ServletException;
import java.io.IOException;

/** /**
* @author Jozef Hartinger
* @see https://issues.jboss.org/browse/UNDERTOW-30 * @see https://issues.jboss.org/browse/UNDERTOW-30
* @see https://issues.jboss.org/browse/UNDERTOW-31 * @see https://issues.jboss.org/browse/UNDERTOW-31
* @see https://issues.jboss.org/browse/UNDERTOW-32 * @see https://issues.jboss.org/browse/UNDERTOW-32
*
* @author Jozef Hartinger
*/ */
@RunWith(DefaultServer.class) @RunWith(DefaultServer.class)
public class AsyncListenerOnErrorTest { public class AsyncListenerOnErrorTest {
Expand All @@ -66,13 +64,13 @@ public static void setup() throws ServletException {
.addMapping("/async1"); .addMapping("/async1");


ServletInfo a2 = new ServletInfo("asyncServlet2", AsyncServlet2.class) ServletInfo a2 = new ServletInfo("asyncServlet2", AsyncServlet2.class)
.setAsyncSupported(true) .setAsyncSupported(true)
.addMapping("/async2"); .addMapping("/async2");




ServletInfo a3 = new ServletInfo("asyncServlet3", AsyncServlet3.class) ServletInfo a3 = new ServletInfo("asyncServlet3", AsyncServlet3.class)
.setAsyncSupported(true) .setAsyncSupported(true)
.addMapping("/async3"); .addMapping("/async3");


DeploymentInfo builder = new DeploymentInfo() DeploymentInfo builder = new DeploymentInfo()
.setClassLoader(AsyncListenerOnErrorTest.class.getClassLoader()) .setClassLoader(AsyncListenerOnErrorTest.class.getClassLoader())
Expand Down Expand Up @@ -102,7 +100,7 @@ public void testAsyncListenerOnErrorInvoked1() throws IOException {
Assert.assertEquals(StatusCodes.OK, result.getStatusLine().getStatusCode()); Assert.assertEquals(StatusCodes.OK, result.getStatusLine().getStatusCode());
final String response = HttpClientUtils.readResponse(result); final String response = HttpClientUtils.readResponse(result);
Assert.assertEquals(SimpleAsyncListener.MESSAGE, response); Assert.assertEquals(SimpleAsyncListener.MESSAGE, response);
Assert.assertArrayEquals(new String[] {"ERROR", "COMPLETE"}, AsyncEventListener.results()); Assert.assertArrayEquals(new String[]{"ERROR", "COMPLETE"}, AsyncEventListener.results(2));
} finally { } finally {
client.getConnectionManager().shutdown(); client.getConnectionManager().shutdown();
} }
Expand All @@ -117,7 +115,7 @@ public void testAsyncListenerOnErrorInvoked2() throws IOException {
Assert.assertEquals(StatusCodes.OK, result.getStatusLine().getStatusCode()); Assert.assertEquals(StatusCodes.OK, result.getStatusLine().getStatusCode());
final String response = HttpClientUtils.readResponse(result); final String response = HttpClientUtils.readResponse(result);
Assert.assertEquals(SimpleAsyncListener.MESSAGE, response); Assert.assertEquals(SimpleAsyncListener.MESSAGE, response);
Assert.assertArrayEquals(new String[] {"COMPLETE", "ERROR"}, AsyncEventListener.results()); Assert.assertArrayEquals(new String[]{"ERROR", "COMPLETE"}, AsyncEventListener.results(2));
} finally { } finally {
client.getConnectionManager().shutdown(); client.getConnectionManager().shutdown();
} }
Expand All @@ -132,7 +130,7 @@ public void testMultiAsyncDispatchError() throws IOException {
Assert.assertEquals(StatusCodes.OK, result.getStatusLine().getStatusCode()); Assert.assertEquals(StatusCodes.OK, result.getStatusLine().getStatusCode());
final String response = HttpClientUtils.readResponse(result); final String response = HttpClientUtils.readResponse(result);
Assert.assertEquals(SimpleAsyncListener.MESSAGE, response); Assert.assertEquals(SimpleAsyncListener.MESSAGE, response);
Assert.assertArrayEquals(new String[] {"START", "COMPLETE", "ERROR"}, AsyncEventListener.results()); Assert.assertArrayEquals(new String[]{"START", "ERROR", "COMPLETE"}, AsyncEventListener.results(3));
} finally { } finally {
client.getConnectionManager().shutdown(); client.getConnectionManager().shutdown();
} }
Expand Down

0 comments on commit 2962f45

Please sign in to comment.