Skip to content

Commit

Permalink
Ensure that mock transport does not swallow req (elastic#108656)
Browse files Browse the repository at this point in the history
Currently it is possible for the MockTransportService distrupt behavior
to swallow requests if either the connection is already closed (in which
case response pruning has already occurred) or if the behavior is added
after the clear callback has been triggered.
  • Loading branch information
Tim-Brooks authored and parkertimmins committed May 17, 2024
1 parent 3e481fb commit 3a49909
Showing 1 changed file with 22 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.UpdateForV9;
Expand All @@ -48,6 +50,7 @@
import org.elasticsearch.transport.ClusterConnectionManager;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.NodeNotConnectedException;
import org.elasticsearch.transport.RequestHandlerRegistry;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transport;
Expand Down Expand Up @@ -428,7 +431,9 @@ public void addUnresponsiveRule(TransportAddress transportAddress) {
);

transport().addSendBehavior(transportAddress, new StubbableTransport.SendRequestBehavior() {

private final Set<Transport.Connection> toClose = ConcurrentHashMap.newKeySet();
private final RefCounted refs = AbstractRefCounted.of(this::closeConnections);

@Override
public void sendRequest(
Expand All @@ -437,19 +442,32 @@ public void sendRequest(
String action,
TransportRequest request,
TransportRequestOptions options
) {
// don't send anything, the receiving node is unresponsive
toClose.add(connection);
) throws IOException {
if (connection.isClosed()) {
throw new NodeNotConnectedException(connection.getNode(), "connection already closed");
} else if (refs.tryIncRef()) {
// don't send anything, the receiving node is unresponsive
toClose.add(connection);
refs.decRef();
} else {
connection.sendRequest(requestId, action, request, options);
}
}

@Override
public void clearCallback() {
// close to simulate that tcp-ip eventually times out and closes connection (necessary to ensure transport eventually
// responds).
refs.decRef();
}

private void closeConnections() {
// close to simulate that tcp-ip eventually times out and closes connection (necessary to ensure transport eventually
// responds).
try {
IOUtils.close(toClose);
} catch (IOException e) {
throw new RuntimeException(e);
throw new AssertionError(e);
}
}
});
Expand Down

0 comments on commit 3a49909

Please sign in to comment.