Skip to content

Commit

Permalink
8312433: HttpClient request fails due to connection being considered …
Browse files Browse the repository at this point in the history
…idle and closed

Reviewed-by: djelinski
  • Loading branch information
jaikiran committed Jul 27, 2023
1 parent 271417a commit 486c784
Show file tree
Hide file tree
Showing 3 changed files with 308 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import java.io.EOFException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Base64;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -71,7 +69,8 @@ class Http2ClientImpl {
// only accessed from within lock protected blocks
private final Set<String> failures = new HashSet<>();

private final ReentrantLock lock = new ReentrantLock();
// used when dealing with connections in the pool
private final ReentrantLock connectionPoolLock = new ReentrantLock();

/**
* When HTTP/2 requested only. The following describes the aggregate behavior including the
Expand Down Expand Up @@ -100,15 +99,16 @@ CompletableFuture<Http2Connection> getConnectionFor(HttpRequestImpl req,
Exchange<?> exchange) {
String key = Http2Connection.keyFor(req);

lock.lock();
connectionPoolLock.lock();
try {
Http2Connection connection = connections.get(key);
if (connection != null) {
try {
if (!connection.isOpen() || !connection.reserveStream(true)) {
if (!connection.tryReserveForPoolCheckout() || !connection.reserveStream(true)) {
if (debug.on())
debug.log("removing found closed or closing connection: %s", connection);
deleteConnection(connection);
debug.log("removing connection from pool since it couldn't be" +
" reserved for use: %s", connection);
removeFromPool(connection);
} else {
// fast path if connection already exists
if (debug.on())
Expand All @@ -128,12 +128,12 @@ CompletableFuture<Http2Connection> getConnectionFor(HttpRequestImpl req,
return MinimalFuture.completedFuture(null);
}
} finally {
lock.unlock();
connectionPoolLock.unlock();
}
return Http2Connection
.createAsync(req, this, exchange)
.whenComplete((conn, t) -> {
lock.lock();
connectionPoolLock.lock();
try {
if (conn != null) {
try {
Expand All @@ -148,7 +148,7 @@ CompletableFuture<Http2Connection> getConnectionFor(HttpRequestImpl req,
failures.add(key);
}
} finally {
lock.unlock();
connectionPoolLock.unlock();
}
});
}
Expand All @@ -169,7 +169,7 @@ boolean offerConnection(Http2Connection c) {
}

String key = c.key();
lock.lock();
connectionPoolLock.lock();
try {
if (stopping) {
if (debug.on()) debug.log("stopping - closing connection: %s", c);
Expand All @@ -192,21 +192,27 @@ boolean offerConnection(Http2Connection c) {
debug.log("put in the connection pool: %s", c);
return true;
} finally {
lock.unlock();
connectionPoolLock.unlock();
}
}

void deleteConnection(Http2Connection c) {
/**
* Removes the connection from the pool (if it was in the pool).
* This method doesn't close the connection.
*
* @param c the connection to remove from the pool
*/
void removeFromPool(Http2Connection c) {
if (debug.on())
debug.log("removing from the connection pool: %s", c);
lock.lock();
connectionPoolLock.lock();
try {
if (connections.remove(c.key(), c)) {
if (debug.on())
debug.log("removed from the connection pool: %s", c);
}
} finally {
lock.unlock();
connectionPoolLock.unlock();
}
}

Expand All @@ -215,11 +221,11 @@ void stop() {
if (debug.on()) debug.log("stopping");
STOPPED = new EOFException("HTTP/2 client stopped");
STOPPED.setStackTrace(new StackTraceElement[0]);
lock.lock();
connectionPoolLock.lock();
try {
stopping = true;
} finally {
lock.unlock();
connectionPoolLock.unlock();
}
do {
connections.values().forEach(this::close);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ class Http2Connection {

private static final int MAX_CLIENT_STREAM_ID = Integer.MAX_VALUE; // 2147483647
private static final int MAX_SERVER_STREAM_ID = Integer.MAX_VALUE - 1; // 2147483646
private IdleConnectionTimeoutEvent idleConnectionTimeoutEvent; // may be null
// may be null; must be accessed/updated with the stateLock held
private IdleConnectionTimeoutEvent idleConnectionTimeoutEvent;

/**
* Flag set when no more streams to be opened on this connection.
Expand Down Expand Up @@ -196,31 +197,65 @@ class Http2Connection {
// and has not sent the final stream flag
final class IdleConnectionTimeoutEvent extends TimeoutEvent {

private boolean fired;
// expected to be accessed/updated with "stateLock" being held
private boolean cancelled;

IdleConnectionTimeoutEvent(Duration duration) {
super(duration);
fired = false;
}

/**
* {@link #shutdown(Throwable) Shuts down} the connection, unless this event is
* {@link #cancelled}
*/
@Override
public void handle() {
fired = true;
// first check if the connection is still idle.
// must be done with the "stateLock" held, to allow for synchronizing actions like
// closing the connection and checking out from connection pool (which too is expected
// to use this same lock)
stateLock.lock();
try {
if (cancelled) {
if (debug.on()) {
debug.log("Not initiating idle connection shutdown");
}
return;
}
if (!markIdleShutdownInitiated()) {
if (debug.on()) {
debug.log("Unexpected state %s, skipping idle connection shutdown",
describeClosedState(closedState));
}
return;
}
} finally {
stateLock.unlock();
}
if (debug.on()) {
debug.log("HTTP connection idle for too long");
debug.log("Initiating shutdown of HTTP connection which is idle for too long");
}
HttpConnectTimeoutException hte = new HttpConnectTimeoutException("HTTP connection idle, no active streams. Shutting down.");
HttpConnectTimeoutException hte = new HttpConnectTimeoutException(
"HTTP connection idle, no active streams. Shutting down.");
shutdown(hte);
}

/**
* Cancels this event. Should be called with stateLock held
*/
void cancel() {
assert stateLock.isHeldByCurrentThread() : "Current thread doesn't hold " + stateLock;
// mark as cancelled to prevent potentially already triggered event from actually
// doing the shutdown
this.cancelled = true;
// cancel the timer to prevent the event from being triggered (if it hasn't already)
client().cancelTimer(this);
}

@Override
public String toString() {
return "IdleConnectionTimeoutEvent, " + super.toString();
}

public boolean isFired() {
return fired;
}
}

// A small class that allows to control frames with respect to the state of
Expand Down Expand Up @@ -294,8 +329,11 @@ void markPrefaceSent() {
private static final int HALF_CLOSED_LOCAL = 1;
private static final int HALF_CLOSED_REMOTE = 2;
private static final int SHUTDOWN_REQUESTED = 4;
private final Lock stateLock = new ReentrantLock();
volatile int closedState;
// state when idle connection management initiates a shutdown of the connection, after
// which the connection will go into SHUTDOWN_REQUESTED state
private static final int IDLE_SHUTDOWN_INITIATED = 8;
private final ReentrantLock stateLock = new ReentrantLock();
private volatile int closedState;

//-------------------------------------
final HttpConnection connection;
Expand Down Expand Up @@ -496,11 +534,11 @@ private boolean reserveStream0(boolean clientInitiated) throws IOException {
}
if (clientInitiated && (lastReservedClientStreamid + 2) >= MAX_CLIENT_STREAM_ID) {
setFinalStream();
client2.deleteConnection(this);
client2.removeFromPool(this);
return false;
} else if (!clientInitiated && (lastReservedServerStreamid + 2) >= MAX_SERVER_STREAM_ID) {
setFinalStream();
client2.deleteConnection(this);
client2.removeFromPool(this);
return false;
}
if (clientInitiated)
Expand Down Expand Up @@ -775,7 +813,7 @@ void shutdown(Throwable t) {
Log.logError("Shutting down connection");
}
}
client2.deleteConnection(this);
client2.removeFromPool(this);
for (Stream<?> s : streams.values()) {
try {
s.connectionClosing(t);
Expand Down Expand Up @@ -994,8 +1032,7 @@ private void handleConnectionFrame(Http2Frame frame)
}

boolean isOpen() {
return !isMarked(closedState, SHUTDOWN_REQUESTED)
&& connection.channel().isOpen();
return !isMarkedForShutdown() && connection.channel().isOpen();
}

void resetStream(int streamid, int code) {
Expand Down Expand Up @@ -1092,10 +1129,11 @@ void closeStream(int streamid) {
// Start timer if property present and not already created
stateLock.lock();
try {
// idleConnectionTimerEvent is always accessed within a lock protected block
// idleConnectionTimeoutEvent is always accessed within a lock protected block
if (streams.isEmpty() && idleConnectionTimeoutEvent == null) {
idleConnectionTimeoutEvent = client().idleConnectionTimeout()
.map(IdleConnectionTimeoutEvent::new).orElse(null);
.map(IdleConnectionTimeoutEvent::new)
.orElse(null);
if (idleConnectionTimeoutEvent != null) {
client().registerTimer(idleConnectionTimeoutEvent);
}
Expand Down Expand Up @@ -1283,23 +1321,53 @@ <T> Stream.PushedStream<T> createPushStream(Stream<T> parent, Exchange<T> pushEx
return new Stream.PushedStream<>(pg, this, pushEx);
}

/**
* Attempts to notify the idle connection management that this connection should
* be considered "in use". This way the idle connection management doesn't close
* this connection during the time the connection is handed out from the pool and any
* new stream created on that connection.
* @return true if the connection has been successfully reserved and is {@link #isOpen()}. false
* otherwise; in which case the connection must not be handed out from the pool.
*/
boolean tryReserveForPoolCheckout() {
// must be done with "stateLock" held to co-ordinate idle connection management
stateLock.lock();
try {
cancelIdleShutdownEvent();
// consider the reservation successful only if the connection's state hasn't moved
// to "being closed"
return isOpen();
} finally {
stateLock.unlock();
}
}

/**
* Cancels any event that might have been scheduled to shutdown this connection. Must be called
* with the stateLock held.
*/
private void cancelIdleShutdownEvent() {
assert stateLock.isHeldByCurrentThread() : "Current thread doesn't hold " + stateLock;
if (idleConnectionTimeoutEvent == null) {
return;
}
idleConnectionTimeoutEvent.cancel();
idleConnectionTimeoutEvent = null;
}

<T> void putStream(Stream<T> stream, int streamid) {
// increment the reference count on the HttpClientImpl
// to prevent the SelectorManager thread from exiting until
// the stream is closed.
stateLock.lock();
try {
if (!isMarked(closedState, SHUTDOWN_REQUESTED)) {
if (!isMarkedForShutdown()) {
if (debug.on()) {
debug.log("Opened stream %d", streamid);
}
client().streamReference();
streams.put(streamid, stream);
// idleConnectionTimerEvent is always accessed within a lock protected block
if (idleConnectionTimeoutEvent != null) {
client().cancelTimer(idleConnectionTimeoutEvent);
idleConnectionTimeoutEvent = null;
}
cancelIdleShutdownEvent();
return;
}
} finally {
Expand Down Expand Up @@ -1663,6 +1731,12 @@ private boolean isMarked(int state, int mask) {
return (state & mask) == mask;
}

private boolean isMarkedForShutdown() {
final int closedSt = closedState;
return isMarked(closedSt, IDLE_SHUTDOWN_INITIATED)
|| isMarked(closedSt, SHUTDOWN_REQUESTED);
}

private boolean markShutdownRequested() {
return markClosedState(SHUTDOWN_REQUESTED);
}
Expand All @@ -1675,6 +1749,10 @@ private boolean markHalfClosedLRemote() {
return markClosedState(HALF_CLOSED_REMOTE);
}

private boolean markIdleShutdownInitiated() {
return markClosedState(IDLE_SHUTDOWN_INITIATED);
}

private boolean markClosedState(int flag) {
int state, desired;
do {
Expand All @@ -1688,8 +1766,11 @@ private boolean markClosedState(int flag) {
String describeClosedState(int state) {
if (state == 0) return "active";
String desc = null;
if (isMarked(state, IDLE_SHUTDOWN_INITIATED)) {
desc = "idle-shutdown-initiated";
}
if (isMarked(state, SHUTDOWN_REQUESTED)) {
desc = "shutdown";
desc = desc == null ? "shutdown" : desc + "+shutdown";
}
if (isMarked(state, HALF_CLOSED_LOCAL | HALF_CLOSED_REMOTE)) {
if (desc == null) return "closed";
Expand Down

5 comments on commit 486c784

@openjdk-notifier
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jaikiran
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/backport 21u

@openjdk
Copy link

@openjdk openjdk bot commented on 486c784 Aug 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jaikiran The target repository 21u is not a valid target for backports.
List of valid target repositories: openjdk/jdk, openjdk/jdk11u, openjdk/jdk11u-dev, openjdk/jdk17u, openjdk/jdk17u-dev, openjdk/jdk19u, openjdk/jdk20u, openjdk/jdk21, openjdk/jdk21u, openjdk/jdk7u, openjdk/jdk8u, openjdk/jdk8u-dev, openjdk/jfx, openjdk/jfx20u, openjdk/jfx21u, openjdk/shenandoah-jdk8u, openjdk/shenandoah-jdk8u-dev.
Supplying the organization/group prefix is optional.

@jaikiran
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/backport openjdk/jdk21u

@openjdk
Copy link

@openjdk openjdk bot commented on 486c784 Aug 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jaikiran Could not automatically backport 486c7844 to openjdk/jdk21u due to conflicts in the following files:

  • src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java

Please fetch the appropriate branch/commit and manually resolve these conflicts by using the following commands in your personal fork of openjdk/jdk21u. Note: these commands are just some suggestions and you can use other equivalent commands you know.

# Fetch the up-to-date version of the target branch
$ git fetch --no-tags https://git.openjdk.org/jdk21u.git master:master

# Check out the target branch and create your own branch to backport
$ git checkout master
$ git checkout -b jaikiran-backport-486c7844

# Fetch the commit you want to backport
$ git fetch --no-tags https://git.openjdk.org/jdk.git 486c7844f902728ce580c3994f58e3e497834952

# Backport the commit
$ git cherry-pick --no-commit 486c7844f902728ce580c3994f58e3e497834952
# Resolve conflicts now

# Commit the files you have modified
$ git add files/with/resolved/conflicts
$ git commit -m 'Backport 486c7844f902728ce580c3994f58e3e497834952'

Once you have resolved the conflicts as explained above continue with creating a pull request towards the openjdk/jdk21u with the title Backport 486c7844f902728ce580c3994f58e3e497834952.

Please sign in to comment.