Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jetty 9.4.x 6254 queued request total timeout #6257

Merged
merged 3 commits into from May 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -65,7 +65,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
private final ProxyConfiguration.Proxy proxy;
private final ClientConnectionFactory connectionFactory;
private final HttpField hostField;
private final TimeoutTask timeout;
private final RequestTimeouts requestTimeouts;
private ConnectionPool connectionPool;

public HttpDestination(HttpClient client, Origin origin)
Expand All @@ -78,7 +78,7 @@ public HttpDestination(HttpClient client, Origin origin)
this.requestNotifier = new RequestNotifier(client);
this.responseNotifier = new ResponseNotifier();

this.timeout = new TimeoutTask(client.getScheduler());
this.requestTimeouts = new RequestTimeouts(client.getScheduler());

ProxyConfiguration proxyConfig = client.getProxyConfiguration();
proxy = proxyConfig.match(origin);
Expand Down Expand Up @@ -272,7 +272,7 @@ public void send(HttpExchange exchange)
{
long expiresAt = request.getTimeoutAt();
if (expiresAt != -1)
timeout.schedule(expiresAt);
requestTimeouts.schedule(expiresAt);

if (!client.isRunning() && exchanges.remove(exchange))
{
Expand Down Expand Up @@ -425,7 +425,7 @@ public void close()
if (LOG.isDebugEnabled())
LOG.debug("Closed {}", this);
connectionPool.close();
timeout.destroy();
requestTimeouts.destroy();
}

public void release(Connection connection)
Expand Down Expand Up @@ -547,15 +547,15 @@ public String toString()
}

/**
* This class enforces the total timeout for exchanges that are still in the queue.
* The total timeout for exchanges that are not in the destination queue is enforced
* by {@link HttpChannel}.
* <p>Enforces the total timeout for for exchanges that are still in the queue.</p>
* <p>The total timeout for exchanges that are not in the destination queue
* is enforced in {@link HttpChannel} by {@link TimeoutCompleteListener}.</p>
*/
private class TimeoutTask extends CyclicTimeout
private class RequestTimeouts extends CyclicTimeout
{
private final AtomicLong nextTimeout = new AtomicLong(Long.MAX_VALUE);
private final AtomicLong earliestTimeout = new AtomicLong(Long.MAX_VALUE);

private TimeoutTask(Scheduler scheduler)
private RequestTimeouts(Scheduler scheduler)
{
super(scheduler);
}
Expand All @@ -564,14 +564,18 @@ private TimeoutTask(Scheduler scheduler)
public void onTimeoutExpired()
{
if (LOG.isDebugEnabled())
LOG.debug("{} timeout expired", this);
LOG.debug("{} timeouts check", this);

nextTimeout.set(Long.MAX_VALUE);
long now = System.nanoTime();
long nextExpiresAt = Long.MAX_VALUE;

// Check all queued exchanges for those that have expired
// and to determine when the next check must be.
long earliest = Long.MAX_VALUE;
// Reset the earliest timeout so we can expire again.
// A concurrent call to schedule(long) may lose an earliest
// value, but the corresponding exchange is already enqueued
// and will be seen by scanning the exchange queue below.
earliestTimeout.set(earliest);

// Scan the message queue to abort expired exchanges
// and to find the exchange that expire the earliest.
for (HttpExchange exchange : exchanges)
{
HttpRequest request = exchange.getRequest();
Expand All @@ -580,34 +584,27 @@ public void onTimeoutExpired()
continue;
if (expiresAt <= now)
request.abort(new TimeoutException("Total timeout " + request.getTimeout() + " ms elapsed"));
else if (expiresAt < nextExpiresAt)
nextExpiresAt = expiresAt;
else if (expiresAt < earliest)
earliest = expiresAt;
}

if (nextExpiresAt < Long.MAX_VALUE && client.isRunning())
schedule(nextExpiresAt);
if (earliest < Long.MAX_VALUE && client.isRunning())
schedule(earliest);
}

private void schedule(long expiresAt)
{
// Schedule a timeout for the soonest any known exchange can expire.
// If subsequently that exchange is removed from the queue, the
// timeout is not cancelled, instead the entire queue is swept
// for expired exchanges and a new timeout is set.
long timeoutAt = nextTimeout.getAndUpdate(e -> Math.min(e, expiresAt));
if (timeoutAt != expiresAt)
// Schedule a timeout for the earliest exchange that may expire.
// When the timeout expires, scan the exchange queue for the next
// earliest exchange that may expire, and reschedule a new timeout.
long prevEarliest = earliestTimeout.getAndUpdate(t -> Math.min(t, expiresAt));
if (expiresAt < prevEarliest)
{
long delay = expiresAt - System.nanoTime();
if (delay <= 0)
{
onTimeoutExpired();
}
else
{
schedule(delay, TimeUnit.NANOSECONDS);
if (LOG.isDebugEnabled())
LOG.debug("{} scheduled timeout in {} ms", this, TimeUnit.NANOSECONDS.toMillis(delay));
}
// A new request expires earlier than previous requests, schedule it.
long delay = Math.max(0, expiresAt - System.nanoTime());
if (LOG.isDebugEnabled())
LOG.debug("{} scheduling timeout in {} ms", this, TimeUnit.NANOSECONDS.toMillis(delay));
schedule(delay, TimeUnit.NANOSECONDS);
}
}
}
Expand Down
Expand Up @@ -34,7 +34,7 @@ public class TimeoutCompleteListener extends CyclicTimeout implements Response.C
{
private static final Logger LOG = Log.getLogger(TimeoutCompleteListener.class);

private final AtomicReference<Request> request = new AtomicReference<>();
private final AtomicReference<Request> requestTimeout = new AtomicReference<>();

public TimeoutCompleteListener(Scheduler scheduler)
{
Expand All @@ -44,7 +44,7 @@ public TimeoutCompleteListener(Scheduler scheduler)
@Override
public void onTimeoutExpired()
{
Request request = this.request.getAndSet(null);
Request request = requestTimeout.getAndSet(null);
if (LOG.isDebugEnabled())
LOG.debug("Total timeout {} ms elapsed for {} on {}", request.getTimeout(), request, this);
if (request != null)
Expand All @@ -54,7 +54,7 @@ public void onTimeoutExpired()
@Override
public void onComplete(Result result)
{
Request request = this.request.getAndSet(null);
Request request = requestTimeout.getAndSet(null);
if (request != null)
{
boolean cancelled = cancel();
Expand All @@ -65,19 +65,12 @@ public void onComplete(Result result)

void schedule(HttpRequest request, long timeoutAt)
{
if (this.request.compareAndSet(null, request))
if (requestTimeout.compareAndSet(null, request))
{
long delay = timeoutAt - System.nanoTime();
if (delay <= 0)
{
onTimeoutExpired();
}
else
{
schedule(delay, TimeUnit.NANOSECONDS);
if (LOG.isDebugEnabled())
LOG.debug("Scheduled timeout in {} ms for {} on {}", TimeUnit.NANOSECONDS.toMillis(delay), request, this);
}
long delay = Math.max(0, timeoutAt - System.nanoTime());
if (LOG.isDebugEnabled())
LOG.debug("Scheduling timeout in {} ms for {} on {}", TimeUnit.NANOSECONDS.toMillis(delay), request, this);
schedule(delay, TimeUnit.NANOSECONDS);
}
}
}
22 changes: 21 additions & 1 deletion jetty-io/src/main/java/org/eclipse/jetty/io/CyclicTimeout.java
Expand Up @@ -33,6 +33,23 @@
* <p>Subclasses should implement {@link #onTimeoutExpired()}.</p>
* <p>This implementation is optimised assuming that the timeout
* will mostly be cancelled and then reused with a similar value.</p>
* <p>The typical scenario to use this class is when you have events
* that postpone (by re-scheduling), or cancel then re-schedule, a
* timeout for a single entity.
* For example: connection idleness, where for each connection there
* is a CyclicTimeout and a read/write postpones the timeout; when
* the timeout expires, the implementation checks against a timestamp
* if the connection is really idle.
* Another example: HTTP session expiration, where for each HTTP
* session there is a CyclicTimeout and at the beginning of the
* request processing the timeout is canceled (via cancel()), but at
* the end of the request processing the timeout is re-scheduled.</p>
* <p>Another typical scenario is for a parent entity to manage
* the timeouts of many children entities; the timeout is scheduled
* for the child entity that expires the earlier; when the timeout
* expires, the implementation scans the children entities to find
* the expired child entities and to find the next child entity
* that expires the earlier. </p>
* <p>This implementation has a {@link Timeout} holding the time
* at which the scheduled task should fire, and a linked list of
* {@link Wakeup}, each holding the actual scheduled task.</p>
Expand Down Expand Up @@ -102,9 +119,12 @@ public boolean schedule(long delay, TimeUnit units)
if (_timeout.compareAndSet(timeout, new Timeout(newTimeoutAt, wakeup)))
{
if (LOG.isDebugEnabled())
LOG.debug("Installed timeout in {} ms, waking up in {} ms",
{
LOG.debug("Installed timeout in {} ms, {} wake up in {} ms",
sbordet marked this conversation as resolved.
Show resolved Hide resolved
units.toMillis(delay),
newWakeup != null ? "new" : "existing",
TimeUnit.NANOSECONDS.toMillis(wakeup._at - now));
}
break;
}
}
Expand Down
Expand Up @@ -54,12 +54,14 @@
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.opentest4j.TestAbortedException;

import static org.eclipse.jetty.http.client.Transport.UNIX_SOCKET;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -501,6 +503,83 @@ protected void service(String target, org.eclipse.jetty.server.Request jettyRequ
assertTrue(latch.await(5, TimeUnit.SECONDS));
}

@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
public void testRequestQueuedDoesNotCancelTimeoutOfQueuedRequests(Transport transport) throws Exception
{
init(transport);

CountDownLatch serverLatch = new CountDownLatch(1);
scenario.start(new EmptyServerHandler()
{
@Override
protected void service(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
if (request.getRequestURI().startsWith("/one"))
{
try
{
serverLatch.await();
}
catch (InterruptedException x)
{
throw new InterruptedIOException();
}
}
}
});

scenario.client.setMaxConnectionsPerDestination(1);
scenario.setMaxRequestsPerConnection(1);

// Send the first request so that the others get queued.
CountDownLatch latch1 = new CountDownLatch(1);
scenario.client.newRequest(scenario.newURI())
.path("/one")
.send(result ->
{
assertTrue(result.isSucceeded());
assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
latch1.countDown();
});

// Queue a second request, it should expire in the queue.
long timeout = 1000;
CountDownLatch latch2 = new CountDownLatch(1);
scenario.client.newRequest(scenario.newURI())
.path("/two")
.timeout(2 * timeout, TimeUnit.MILLISECONDS)
.send(result ->
{
assertTrue(result.isFailed());
assertThat(result.getFailure(), Matchers.instanceOf(TimeoutException.class));
latch2.countDown();
});

Thread.sleep(timeout);

// Queue a third request, it should not reset the timeout of the second request.
CountDownLatch latch3 = new CountDownLatch(1);
scenario.client.newRequest(scenario.newURI())
.path("/three")
.timeout(2 * timeout, TimeUnit.MILLISECONDS)
.send(result ->
{
assertTrue(result.isSucceeded());
assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
latch3.countDown();
});

// We have already slept a timeout, expect the second request to be back in another timeout.
assertTrue(latch2.await(2 * timeout, TimeUnit.MILLISECONDS));

// Release the first request so the third can be served as well.
serverLatch.countDown();

assertTrue(latch1.await(2 * timeout, TimeUnit.MILLISECONDS));
assertTrue(latch3.await(2 * timeout, TimeUnit.MILLISECONDS));
}

private void assumeConnectTimeout(String host, int port, int connectTimeout)
{
try (Socket socket = new Socket())
Expand All @@ -516,7 +595,6 @@ private void assumeConnectTimeout(String host, int port, int connectTimeout)
catch (SocketTimeoutException x)
{
// Expected timeout during connect, continue the test.
return;
}
catch (Throwable x)
{
Expand All @@ -525,7 +603,7 @@ private void assumeConnectTimeout(String host, int port, int connectTimeout)
}
}

private class TimeoutHandler extends AbstractHandler
private static class TimeoutHandler extends AbstractHandler
{
private final long timeout;

Expand Down
Expand Up @@ -286,6 +286,13 @@ public void setServerIdleTimeout(long idleTimeout)
setConnectionIdleTimeout(idleTimeout);
}

public void setMaxRequestsPerConnection(int maxRequestsPerConnection)
{
AbstractHTTP2ServerConnectionFactory h2 = connector.getConnectionFactory(AbstractHTTP2ServerConnectionFactory.class);
if (h2 != null)
h2.setMaxConcurrentStreams(maxRequestsPerConnection);
}

public void start(Handler handler) throws Exception
{
start(handler, null);
Expand Down