Skip to content

Commit

Permalink
Do not confirmSelect more than once per channel
Browse files Browse the repository at this point in the history
In order to avoid unnecessary blocking RPC calls and conform to best
practices, the Channel now checks if it is already activated confirm
mode before sending a confirm.select RPC call.

If confirm mode is already activated, calling confirmSelect() again
returns immediately without sending an RPC call.

Closes #1056

(cherry picked from commit 7253c94)

Conflicts:
	src/test/java/com/rabbitmq/client/test/ChannelNTest.java
  • Loading branch information
sergio91pt authored and acogoluegnes committed Jun 13, 2023
1 parent 129dc6a commit 0dc9ea2
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 6 deletions.
11 changes: 10 additions & 1 deletion src/main/java/com/rabbitmq/client/impl/ChannelN.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel
private final SortedSet<Long> unconfirmedSet =
Collections.synchronizedSortedSet(new TreeSet<Long>());

/** Whether the confirm select method has been successfully activated */
private boolean confirmSelectActivated = false;

/** Whether any nacks have been received since the last waitForConfirms(). */
private volatile boolean onlyAcksReceived = true;

Expand Down Expand Up @@ -1553,10 +1556,16 @@ public Tx.RollbackOk txRollback()
public Confirm.SelectOk confirmSelect()
throws IOException
{
if (confirmSelectActivated) {
return new Confirm.SelectOk();
}

if (nextPublishSeqNo == 0) nextPublishSeqNo = 1;
return (Confirm.SelectOk)
Confirm.SelectOk result = (Confirm.SelectOk)
exnWrappingRpc(new Confirm.Select(false)).getMethod();

confirmSelectActivated = true;
return result;
}

/** Public API - {@inheritDoc} */
Expand Down
35 changes: 30 additions & 5 deletions src/test/java/com/rabbitmq/client/test/ChannelNTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,22 @@

package com.rabbitmq.client.test;

import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Command;
import com.rabbitmq.client.Method;
import com.rabbitmq.client.TrafficListener;
import com.rabbitmq.client.impl.*;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertNotNull;

public class ChannelNTest {

Expand Down Expand Up @@ -67,7 +68,7 @@ public void callingBasicCancelForUnknownConsumerThrowsException() throws Excepti
@Test
public void qosShouldBeUnsignedShort() {
AMQConnection connection = Mockito.mock(AMQConnection.class);
AtomicReference<com.rabbitmq.client.AMQP.Basic.Qos> qosMethod = new AtomicReference<>();
AtomicReference<AMQP.Basic.Qos> qosMethod = new AtomicReference<>();
ChannelN channel = new ChannelN(connection, 1, consumerWorkService) {
@Override
public AMQCommand exnWrappingRpc(Method m) {
Expand Down Expand Up @@ -106,6 +107,30 @@ public TestConfig(int value, Consumer call, int expected) {
});
}

@Test
public void confirmSelectOnlySendsRPCCallOnce() throws Exception {
AMQConnection connection = Mockito.mock(AMQConnection.class);
TrafficListener trafficListener = Mockito.mock(TrafficListener.class);

Mockito.when(connection.getTrafficListener()).thenReturn(trafficListener);

ChannelN channel = new ChannelN(connection, 1, consumerWorkService);

Future<AMQImpl.Confirm.SelectOk> future = executorService.submit(() -> {
try {
return channel.confirmSelect();
} catch (IOException e) {
throw new RuntimeException(e);
}
});

channel.handleCompleteInboundCommand(new AMQCommand(new AMQImpl.Confirm.SelectOk()));

assertNotNull(future.get(1, TimeUnit.SECONDS));
assertNotNull(channel.confirmSelect());
Mockito.verify(trafficListener, Mockito.times(1)).write(Mockito.any(Command.class));
}

interface Consumer {

void apply(int value) throws Exception;
Expand Down

0 comments on commit 0dc9ea2

Please sign in to comment.