Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #5105 - StatisticsHandler Graceful Shutdown of Async Requests #5175

Merged
merged 8 commits into from Aug 28, 2020
4 changes: 3 additions & 1 deletion jetty-server/src/main/config/etc/jetty-stats.xml
Expand Up @@ -5,7 +5,9 @@
<Configure id="Server" class="org.eclipse.jetty.server.Server">
<Call name="insertHandler">
<Arg>
<New id="StatsHandler" class="org.eclipse.jetty.server.handler.StatisticsHandler"></New>
<New id="StatsHandler" class="org.eclipse.jetty.server.handler.StatisticsHandler">
<Set name="asyncGraceful"><Property name="jetty.statistics.asyncGraceful" default="true"/></Set>
lachlan-roberts marked this conversation as resolved.
Show resolved Hide resolved
</New>
</Arg>
</Call>
<Call class="org.eclipse.jetty.server.ServerConnectionStatistics" name="addToAllConnectors">
Expand Down
5 changes: 5 additions & 0 deletions jetty-server/src/main/config/modules/stats.mod
Expand Up @@ -15,3 +15,8 @@ etc/jetty-stats.xml

[ini]
jetty.webapp.addServerClasses+=,-org.eclipse.jetty.servlet.StatisticsServlet

[ini-template]

## If the Graceful shutdown should wait for async requests as well as the currently dispatched ones.
# jetty.statistics.asyncGraceful=true
Expand Up @@ -61,7 +61,6 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
final Thread thread = Thread.currentThread();
final String old_name = thread.getName();

boolean suspend = false;
boolean retry = false;
String name = (String)request.getAttribute("org.eclipse.jetty.thread.name");
if (name == null)
Expand Down Expand Up @@ -103,11 +102,10 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
finally
{
thread.setName(old_name);
suspend = baseRequest.getHttpChannelState().isSuspended();
if (suspend)
if (baseRequest.getHttpChannelState().isAsyncStarted())
{
request.setAttribute("org.eclipse.jetty.thread.name", name);
print(name, "SUSPEND");
print(name, "ASYNC");
}
else
print(name, "RESPONSE " + base_response.getStatus() + (ex == null ? "" : ("/" + ex)) + " " + base_response.getContentType());
Expand Down
Expand Up @@ -20,7 +20,6 @@

import java.io.IOException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import javax.servlet.AsyncEvent;
Expand Down Expand Up @@ -59,6 +58,7 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful

private final LongAdder _asyncDispatches = new LongAdder();
private final LongAdder _expires = new LongAdder();
private final LongAdder _errors = new LongAdder();

private final LongAdder _responses1xx = new LongAdder();
private final LongAdder _responses2xx = new LongAdder();
Expand All @@ -67,6 +67,8 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
private final LongAdder _responses5xx = new LongAdder();
private final LongAdder _responsesTotalBytes = new LongAdder();

private boolean asyncGraceful = true;

private final Graceful.Shutdown _shutdown = new Graceful.Shutdown()
{
@Override
Expand All @@ -76,25 +78,24 @@ protected FutureCallback newShutdownCallback()
}
};

private final AtomicBoolean _wrapWarning = new AtomicBoolean();

private final AsyncListener _onCompletion = new AsyncListener()
{
@Override
public void onTimeout(AsyncEvent event) throws IOException
public void onStartAsync(AsyncEvent event) throws IOException
{
_expires.increment();
event.getAsyncContext().addListener(this);
}

@Override
public void onStartAsync(AsyncEvent event) throws IOException
public void onTimeout(AsyncEvent event) throws IOException
{
event.getAsyncContext().addListener(this);
_expires.increment();
}

@Override
public void onError(AsyncEvent event) throws IOException
{
_errors.increment();
}

@Override
Expand All @@ -105,15 +106,15 @@ public void onComplete(AsyncEvent event) throws IOException
Request request = state.getBaseRequest();
final long elapsed = System.currentTimeMillis() - request.getTimeStamp();

long d = _requestStats.decrement();
long numRequests = _requestStats.decrement();
_requestTimeStats.record(elapsed);

updateResponse(request);

_asyncWaitStats.decrement();

// If we have no more dispatches, should we signal shutdown?
if (d == 0)
if (numRequests == 0 && asyncGraceful)
lachlan-roberts marked this conversation as resolved.
Show resolved Hide resolved
{
FutureCallback shutdown = _shutdown.get();
if (shutdown != null)
Expand Down Expand Up @@ -149,6 +150,14 @@ public void statsReset()
@Override
public void handle(String path, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
Handler handler = getHandler();
if (handler == null || !isStarted() || isShutdown())
{
if (!baseRequest.getResponse().isCommitted())
response.sendError(HttpStatus.SERVICE_UNAVAILABLE_503);
return;
}

_dispatchedStats.increment();

final long start;
Expand All @@ -168,51 +177,41 @@ public void handle(String path, Request baseRequest, HttpServletRequest request,

try
{
Handler handler = getHandler();
if (handler != null && !_shutdown.isShutdown() && isStarted())
handler.handle(path, baseRequest, request, response);
else
{
if (!baseRequest.isHandled())
baseRequest.setHandled(true);
else if (_wrapWarning.compareAndSet(false, true))
LOG.warn("Bad statistics configuration. Latencies will be incorrect in {}", this);
if (!baseRequest.getResponse().isCommitted())
response.sendError(HttpStatus.SERVICE_UNAVAILABLE_503);
}
handler.handle(path, baseRequest, request, response);
}
finally
{
final long now = System.currentTimeMillis();
final long dispatched = now - start;

_dispatchedStats.decrement();
long numRequests = -1;
long numDispatches = _dispatchedStats.decrement();
_dispatchedTimeStats.record(dispatched);

if (state.isSuspended())
if (state.isInitial())
{
if (state.isInitial())
if (state.isAsyncStarted())
lachlan-roberts marked this conversation as resolved.
Show resolved Hide resolved
{
state.addListener(_onCompletion);
_asyncWaitStats.increment();
}
else
{
numRequests = _requestStats.decrement();
_requestTimeStats.record(dispatched);
updateResponse(baseRequest);
}
}
else if (state.isInitial())

FutureCallback shutdown = _shutdown.get();
if (shutdown != null)
{
long d = _requestStats.decrement();
_requestTimeStats.record(dispatched);
updateResponse(baseRequest);
response.flushBuffer();

// If we have no more dispatches, should we signal shutdown?
FutureCallback shutdown = _shutdown.get();
if (shutdown != null)
{
response.flushBuffer();
if (d == 0)
shutdown.succeeded();
}
// If we either have no more requests or dispatches, we can complete shutdown.
if (asyncGraceful ? (numRequests == 0) : (numDispatches == 0))
shutdown.succeeded();
}
// else onCompletion will handle it.
}
}

Expand Down Expand Up @@ -251,6 +250,8 @@ protected void updateResponse(Request request)
@Override
protected void doStart() throws Exception
{
if (getHandler() == null)
throw new IllegalStateException("StatisticsHandler has no Wrapped Handler");
_shutdown.cancel();
super.doStart();
statsReset();
Expand All @@ -263,6 +264,17 @@ protected void doStop() throws Exception
super.doStop();
}

/**
* Set whether the graceful shutdown should wait for all requests to complete including
* async requests which are not currently dispatched, or whether it should only wait for all the
* actively dispatched requests to complete.
* @param asyncGraceful true to wait for async requests on graceful shutdown.
*/
public void setAsyncGraceful(boolean asyncGraceful)
{
this.asyncGraceful = asyncGraceful;
}

lachlan-roberts marked this conversation as resolved.
Show resolved Hide resolved
/**
* @return the number of requests handled by this handler
* since {@link #statsReset()} was last called, excluding
Expand Down Expand Up @@ -467,6 +479,16 @@ public int getExpires()
return _expires.intValue();
}

/**
* @return the number of async errors that occurred.
* @see #getAsyncDispatches()
*/
@ManagedAttribute("number of async errors that occurred")
public int getErrors()
{
return _errors.intValue();
}

/**
* @return the number of responses with a 1xx status returned by this context
* since {@link #statsReset()} was last called.
Expand Down
Expand Up @@ -330,8 +330,9 @@ public void run()
assertThat(response, containsString(" 200 OK"));
assertThat(response, containsString("read 10/10"));

assertThat(stats.getRequests(), is(2));
assertThat(stats.getResponses5xx(), is(1));
// The StatisticsHandler was shutdown when it received the second request so does not contribute to the stats.
assertThat(stats.getRequests(), is(1));
assertThat(stats.getResponses4xx(), is(0));
}
}

Expand Down