Skip to content

Commit

Permalink
Switch to ConcurrentLinkedQueue to avoid expensive size calls #2602
Browse files Browse the repository at this point in the history
See also #2601
  • Loading branch information
mp911de committed Feb 26, 2024
1 parent 6185ebd commit faffc7e
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 6 deletions.
6 changes: 3 additions & 3 deletions src/main/java/io/lettuce/core/internal/LettuceFactories.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
Expand All @@ -41,13 +41,13 @@ public class LettuceFactories {
/**
* Creates a new, optionally bounded, {@link Queue} that does not require external synchronization.
*
* @param maxSize queue size. If {@link Integer#MAX_VALUE}, then creates an {@link ConcurrentLinkedDeque unbounded queue}.
* @param maxSize queue size. If {@link Integer#MAX_VALUE}, then creates an {@link ConcurrentLinkedQueue unbounded queue}.
* @return a new, empty {@link Queue}.
*/
public static <T> Queue<T> newConcurrentQueue(int maxSize) {

if (maxSize == Integer.MAX_VALUE) {
return new ConcurrentLinkedDeque<>();
return new ConcurrentLinkedQueue<>();
}

return maxSize > ARRAY_QUEUE_THRESHOLD ? new LinkedBlockingQueue<>(maxSize) : new ArrayBlockingQueue<>(maxSize);
Expand Down
23 changes: 20 additions & 3 deletions src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ public void notifyDrainQueuedCommands(HasQueuedCommands queuedCommands) {
logger.debug("{} notifyQueuedCommands adding {} command(s) to buffer", logPrefix(), commands.size());
}

commands.addAll(drainCommands(disconnectedBuffer));
drainCommands(disconnectedBuffer, commands);

for (RedisCommand<?, ?, ?> command : commands) {

Expand Down Expand Up @@ -745,8 +745,8 @@ protected <T> T doExclusive(Supplier<T> supplier) {

List<RedisCommand<?, ?, ?>> target = new ArrayList<>(disconnectedBuffer.size() + commandBuffer.size());

target.addAll(drainCommands(disconnectedBuffer));
target.addAll(drainCommands(commandBuffer));
drainCommands(disconnectedBuffer, target);
drainCommands(commandBuffer, target);

return target;
}
Expand All @@ -769,9 +769,26 @@ protected <T> T doExclusive(Supplier<T> supplier) {
}
}

drainCommands(source, target);
return target;
}

/**
* Drain commands from a queue and return only active commands.
*
* @param source the source queue.
*/
private static void drainCommands(Queue<? extends RedisCommand<?, ?, ?>> source, Collection<RedisCommand<?, ?, ?>> target) {

RedisCommand<?, ?, ?> cmd;
while ((cmd = source.poll()) != null) {

if (!cmd.isDone() && !ActivationCommand.isActivationCommand(cmd)) {
target.add(cmd);
}
}
}

private void cancelBufferedCommands(String message) {
cancelCommands(message, doExclusive(this::drainCommands), RedisCommand::cancel);
}
Expand Down

0 comments on commit faffc7e

Please sign in to comment.