Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[14.0.x] ISPN-15626 Hot Rod client tries to schedule operation twice #12039

Merged
merged 1 commit into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.ScheduledFuture;
import io.netty.util.internal.PlatformDependent;
import net.jcip.annotations.GuardedBy;

/**
* This is a custom implementation of {@link io.netty.channel.Channel} pooling.
Expand All @@ -30,17 +31,38 @@
* <p>
* It should be also more allocation-efficient since it does not create futures and invokes the callback directly if the
* channel is available.
*
* <h2>Thread-safety</h2>
* <p>
* The connections are handled LIFO, pending requests are handled FIFO.
* There are two queues for managing connections and pending requests. The connections are handled LIFO, pending
* requests are handled FIFO. Although each queue is thread-safe on it's own, operations over the two queues need to be
* synchronized using a {@link StampedLock}.
* </p>
* <p>
* The synchronization is necessary to make sure enqueued operations are visible when releasing a channel, and that
* an idle channel is visible before enqueuing an operation. The idea followed is that:
*
* <ul>
* <li>Adding an operation uses a write lock;</li>
* <li>Existence checks uses a write lock;</li>
* <li>Polling an operation uses a read lock.</li>
* </ul>
*
* Since the queues are thread-safe, the poll operation are visible even with the shared lock.
* </p>
*/
class ChannelPool {
enum ChannelEventType { CONNECTED, CLOSED_IDLE, CLOSED_ACTIVE, CONNECT_FAILED}
private static final AtomicIntegerFieldUpdater<TimeoutCallback> invokedUpdater = AtomicIntegerFieldUpdater.newUpdater(TimeoutCallback.class, "invoked");
private static final Log log = LogFactory.getLog(ChannelPool.class);
private static final int MAX_FULL_CHANNELS_SEEN = 10;

@GuardedBy("lock")
private final Deque<Channel> channels = PlatformDependent.newConcurrentDeque();

@GuardedBy("lock")
private final Deque<ChannelOperation> callbacks = PlatformDependent.newConcurrentDeque();

private final EventExecutor executor;
private final SocketAddress address;
private final ChannelInitializer newChannelInvoker;
Expand Down Expand Up @@ -76,7 +98,7 @@ public void acquire(ChannelOperation callback) {
}

// We could acquire an active channel and submit the callback.
if (executeDirectlyIfPossible(callback)) return;
if (executeDirectlyIfPossible(callback, false)) return;

// wait action
if (maxWait > 0) {
Expand All @@ -86,23 +108,17 @@ public void acquire(ChannelOperation callback) {
}

// Between the check time and adding the callback to the queue, we could have a channel available.
// Let's just try again.
// Let's just try again, this execution bypasses the FIFO ordering.
if (!executeOrEnqueue(callback)) {
boolean remove = false;
try {
remove = executeDirectlyIfPossible(callback);
} finally {
if (remove) {
callbacks.remove(callback);
}
}
// If the operation executes, it will remove the operation just before executing.
executeDirectlyIfPossible(callback, true);
}

// The pool was terminated while the callback tried to acquire a channel. Let's complete it with an exception.
if (terminated) close();
}

boolean executeDirectlyIfPossible(ChannelOperation callback) {
boolean executeDirectlyIfPossible(ChannelOperation callback, boolean checkCallback) {
Channel channel;
int fullChannelsSeen = 0;
while ((channel = channels.pollFirst()) != null) {
Expand All @@ -123,15 +139,15 @@ boolean executeDirectlyIfPossible(ChannelOperation callback) {
break;
}
}
return activateChannel(channel, callback, false);
return activateChannel(channel, callback, false, checkCallback);
}
int current = created.get();
while (current < maxConnections) {
if (created.compareAndSet(current, current + 1)) {
int currentActive = active.incrementAndGet();
if (log.isTraceEnabled()) log.tracef("[%s] Creating new channel, created = %d, active = %d", address, current + 1, currentActive);
// create new connection and apply callback
createAndInvoke(callback);
createAndInvoke(callback, checkCallback);
return true;
}
current = created.get();
Expand All @@ -146,7 +162,7 @@ boolean executeDirectlyIfPossible(ChannelOperation callback) {
int currentCreated = created.incrementAndGet();
int currentActive = active.incrementAndGet();
if (log.isTraceEnabled()) log.tracef("[%s] Creating new channel, created = %d, active = %d", address, currentCreated, currentActive);
createAndInvoke(callback);
createAndInvoke(callback, checkCallback);
return true;
default:
throw new IllegalArgumentException(String.valueOf(exhaustedAction));
Expand Down Expand Up @@ -182,49 +198,40 @@ private boolean executeOrEnqueue(ChannelOperation callback) {
} finally {
lock.unlockWrite(stamp);
}
return activateChannel(channel, callback, false);
return activateChannel(channel, callback, false, false);
}

private void createAndInvoke(ChannelOperation callback) {
private void createAndInvoke(ChannelOperation callback, boolean checkCallback) {
try {
newChannelInvoker.createChannel().whenComplete((channel, throwable) -> {
if (throwable != null) {
int currentActive = active.decrementAndGet();
if (currentActive < 0) {
HOTROD.invalidActiveCountAfterClose(channel);
}
int currentCreated = created.decrementAndGet();
if (currentCreated < 0) {
HOTROD.invalidCreatedCountAfterClose(channel);
}
if (log.isTraceEnabled()) log.tracef(throwable, "[%s] Channel could not be created, created = %d, active = %d, connected = %d",
address, currentCreated, currentActive, connected.get());
// Update about a possibly failing server before cancelling the callback.
connectionFailureListener.accept(this, ChannelEventType.CONNECT_FAILED);
callback.cancel(address, throwable);
maybeRejectPendingCallbacks(throwable);
} else {
suspected = false;
int currentConnected = connected.incrementAndGet();
if (log.isTraceEnabled()) log.tracef("[%s] Channel connected, created = %d, active = %d, connected = %d",
address, created.get(), active.get(), currentConnected);
callback.invoke(channel);
connectionFailureListener.accept(this, ChannelEventType.CONNECTED);
}
});
newChannelInvoker.createChannel().whenComplete((channel, t) -> handleChannelCreated(callback, checkCallback, channel, t));
} catch (Throwable t) {
handleChannelCreated(callback, checkCallback, null, t);
}
}

private void handleChannelCreated(ChannelOperation callback, boolean checkCallback, Channel channel, Throwable throwable) {
if (throwable != null) {
int currentActive = active.decrementAndGet();
if (currentActive < 0) {
HOTROD.invalidActiveCountAfterClose(channel);
}
int currentCreated = created.decrementAndGet();
if (log.isTraceEnabled()) log.tracef(t, "[%s] Channel could not be created, created = %d, active = %d, connected = %d",
address, currentCreated, currentActive, connected.get());
if (currentCreated < 0) {
HOTROD.warnf("Invalid created count after channel create failure");
HOTROD.invalidCreatedCountAfterClose(channel);
}
if (currentActive < 0) {
HOTROD.warnf("Invalid active count after channel create failure");
}
callback.cancel(address, t);
maybeRejectPendingCallbacks(t);
if (log.isTraceEnabled()) log.tracef(throwable, "[%s] Channel could not be created, created = %d, active = %d, connected = %d",
address, currentCreated, currentActive, connected.get());
// Update about a possibly failing server before cancelling the callback.
connectionFailureListener.accept(this, ChannelEventType.CONNECT_FAILED);
callback.cancel(address, throwable);
maybeRejectPendingCallbacks(throwable);
} else {
suspected = false;
int currentConnected = connected.incrementAndGet();
if (log.isTraceEnabled()) log.tracef("[%s] Channel connected, created = %d, active = %d, connected = %d",
address, created.get(), active.get(), currentConnected);
invokeCallback(channel, callback, checkCallback);
connectionFailureListener.accept(this, ChannelEventType.CONNECTED);
}
}

Expand Down Expand Up @@ -270,7 +277,9 @@ public void release(Channel channel, ChannelRecord record) {
} finally {
lock.unlockRead(stamp);
}
activateChannel(channel, callback, true);

// Utilize the executor to avoid a stack overflow with multiple releases.
activateChannel(channel, callback, true, false);
}

/**
Expand Down Expand Up @@ -298,34 +307,75 @@ public void releaseClosedChannel(Channel channel, ChannelRecord channelRecord) {
connectionFailureListener.accept( this, idle ? ChannelEventType.CLOSED_IDLE : ChannelEventType.CLOSED_ACTIVE);
}

private boolean activateChannel(Channel channel, ChannelOperation callback, boolean useExecutor) {
private boolean activateChannel(Channel channel, ChannelOperation callback, boolean useExecutor, boolean checkCallback) {
if (!channel.isActive()) return false;
int currentActive = active.incrementAndGet();
if (log.isTraceEnabled()) log.tracef("[%s] Activated record %s, created = %d, active = %d", address, channel, created.get(), currentActive);
ChannelRecord record = ChannelRecord.of(channel);
record.setAcquired();
if (useExecutor) {
// Do not execute another operation in releasing thread, we could run out of stack
executor.execute(() -> {
try {
callback.invoke(channel);
} catch (Throwable t) {
log.tracef(t, "Closing channel %s due to exception", channel);
discardChannel(channel);
}
});
executor.execute(() -> invokeCallback(channel, callback, checkCallback));
} else {
try {
callback.invoke(channel);
} catch (Throwable t) {
log.tracef(t, "Closing channel %s due to exception", channel);
discardChannel(channel);
throw t;
}
return invokeCallback(channel, callback, checkCallback);
}
return true;
}

/**
* Invokes the callback utilizing the given channel.
* <p>
* This method is central to avoid concurrently executing an enqueued operation. The {@param removeBeforeInvoke},
* if set, ensures we only invoke the operation if it is still enqueued. This is the slow-path, where locks need
* to be acquired.
* </p>
*
* <p>
* The channel is released back to the pool in case the operation is not in the queue.
* </p>
*
* @param channel: The channel to invoke the callback on.
* @param callback: The callback to invoke.
* @param removeBeforeInvoke: Whether to de-queue the callback before executing it.
* @return <code>true</code> if executed the callback, <code>false</code>, otherwise.
*/
private boolean invokeCallback(Channel channel, ChannelOperation callback, boolean removeBeforeInvoke) {
if (removeBeforeInvoke && !removeCallback(callback)) {
log.debugf("Operation %s picked-up twice, returning channel to pool", callback);
release(channel, ChannelRecord.of(channel));
return false;
}

try {
callback.invoke(channel);
} catch (Throwable t) {
log.tracef(t, "Closing channel %s due to exception", channel);
discardChannel(channel);
throw t;
}

return true;
}

/**
* Acquires an exclusive lock over the callbacks to remove the operation.
* <p>
* This method should be invoked just before the operation is executed. Make sure to check on the return
* value in case of executing enqueued operations.
* </p>
*
* @param operation: Operation to remove from the list.
* @return <code>true</code> if the operation was still enqueued, <code>false</code>, otherwise.
*/
private boolean removeCallback(ChannelOperation operation) {
long stamp = lock.writeLock();
try {
return callbacks.remove(operation);
} finally {
lock.unlockWrite(stamp);
}
}

private void discardChannel(Channel channel) {
channel.close();
}
Expand Down Expand Up @@ -372,7 +422,7 @@ public void inspectPool() {
int currentActive = active.incrementAndGet();
if (log.isTraceEnabled()) log.tracef("[%s] Creating new channel to inspect server, created = %d, active = %d", address, currentCreated, currentActive);
suspected = true;
createAndInvoke(cb);
createAndInvoke(cb, false);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.infinispan.client.hotrod.impl.transport.netty;

import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;

import io.netty.channel.Channel;

class AcquireChannelOperation implements ChannelOperation {
private final CompletableFuture<Channel> cf;

public AcquireChannelOperation(CompletableFuture<Channel> cf) {
this.cf = cf;
}

@Override
public void invoke(Channel channel) {
cf.complete(channel);
}

@Override
public void cancel(SocketAddress address, Throwable cause) {
Exception e = new TimeoutException("Timed out for: " + address);
e.addSuppressed(cause);
cf.completeExceptionally(e);
}
}