Skip to content

Commit

Permalink
Issue #4376 Async Content Complete (#4377)
Browse files Browse the repository at this point in the history
* Issue #4376 Async Content Complete

Added test harness to reproduce unready completing write.
Fixed test by not closing output prior to becoming READY

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4376 Async Content Complete

ERROR state still needs to be closed!

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4376 Async Content Complete

close after last blocking write

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4376 Async Content Complete

If completion has to do a flush, then we need a call to closed to
avoid leaking buffers.

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4376 Async Content Complete

Reformat

Signed-off-by: Greg Wilkins <gregw@webtide.com>
  • Loading branch information
gregw committed Dec 2, 2019
1 parent d99ae19 commit 85cda88
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 18 deletions.
Expand Up @@ -1195,10 +1195,7 @@ public void succeeded()
if (_length > 0)
_combinedListener.onResponseContent(_request, _content);
if (_complete && _state.completeResponse())
{
_response.getHttpOutput().closed();
_combinedListener.onResponseEnd(_request);
}
super.succeeded();
}

Expand All @@ -1222,7 +1219,6 @@ public void succeeded()
@Override
public void failed(Throwable th)
{
_response.getHttpOutput().closed();
abort(x);
super.failed(x);
}
Expand Down
Expand Up @@ -968,6 +968,9 @@ protected void completed()
}
}

// release any aggregate buffer from a closing flush
_channel.getResponse().getHttpOutput().closed();

if (event != null)
{
cancelTimeout(event);
Expand Down
Expand Up @@ -234,6 +234,8 @@ private void write(ByteBuffer content, boolean complete) throws IOException
{
write(content, complete, blocker);
blocker.block();
if (complete)
closed();
}
catch (Exception failure)
{
Expand Down Expand Up @@ -403,21 +405,19 @@ public void closed()
State state = _state.get();
switch (state)
{
case CLOSING:
{
if (!_state.compareAndSet(state, State.CLOSED))
break;
releaseBuffer();
return;
}
case CLOSED:
{
return;
}
case UNREADY:
{
if (_state.compareAndSet(state, State.ERROR))
_writeListener.onError(_onError == null ? new EofException("Async closed") : _onError);
{
if (_onError == null)
_onError = new EofException("Async closed");
releaseBuffer();
return;
}
break;
}
default:
Expand Down
Expand Up @@ -33,6 +33,12 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.eclipse.jetty.http.HttpCompliance;
import org.eclipse.jetty.http.HttpTester;
Expand All @@ -41,6 +47,7 @@
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SocketChannelEndPoint;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.Scheduler;
import org.hamcrest.Matchers;
Expand Down Expand Up @@ -89,15 +96,14 @@ public void proceed()
_delay.get(10, TimeUnit.SECONDS);
getCallback().succeeded();
}
catch(Throwable th)
catch (Throwable th)
{
th.printStackTrace();
getCallback().failed(th);
}
}
}


@BeforeEach
public void init() throws Exception
{
Expand Down Expand Up @@ -153,7 +159,7 @@ public ExtendedHttpConnection(HttpConfiguration config, Connector connector, End
@Override
public void onCompleted()
{
COMPLETE.compareAndSet(false,true);
COMPLETE.compareAndSet(false, true);
super.onCompleted();
}
}
Expand All @@ -163,7 +169,8 @@ public static Stream<Arguments> tests()
{
List<Object[]> tests = new ArrayList<>();
tests.add(new Object[]{new HelloWorldHandler(), 200, "Hello world"});
tests.add(new Object[]{new SendErrorHandler(499,"Test async sendError"), 499, "Test async sendError"});
tests.add(new Object[]{new SendErrorHandler(499, "Test async sendError"), 499, "Test async sendError"});
tests.add(new Object[]{new AsyncReadyCompleteHandler(), 200, AsyncReadyCompleteHandler.data});
return tests.stream().map(Arguments::of);
}

Expand Down Expand Up @@ -197,7 +204,7 @@ public void testAsyncCompletion(Handler handler, int status, String message) thr

// wait for threads to return to base level
long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
while(_threadPool.getBusyThreads() != base)
while (_threadPool.getBusyThreads() != base)
{
if (System.nanoTime() > end)
throw new TimeoutException();
Expand All @@ -210,12 +217,54 @@ public void testAsyncCompletion(Handler handler, int status, String message) thr
// proceed with the completion
delay.proceed();

while(!COMPLETE.get())
while (!COMPLETE.get())
{
if (System.nanoTime() > end)
throw new TimeoutException();
Thread.sleep(10);
}
}
}

private static class AsyncReadyCompleteHandler extends AbstractHandler
{
static String data = "Now is the time for all good men to come to the aid of the party";

@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
AsyncContext context = request.startAsync();
ServletOutputStream out = response.getOutputStream();
out.setWriteListener(new WriteListener()
{
byte[] bytes = data.getBytes(StandardCharsets.ISO_8859_1);

@Override
public void onWritePossible() throws IOException
{
while (out.isReady())
{
if (bytes != null)
{
response.setContentType("text/plain");
response.setContentLength(bytes.length);
out.write(bytes);
bytes = null;
}
else
{
context.complete();
return;
}
}
}

@Override
public void onError(Throwable t)
{
t.printStackTrace();
}
});
}
}
}

0 comments on commit 85cda88

Please sign in to comment.