Skip to content

Commit

Permalink
GH-1485: Option to suppress single client.id suffx
Browse files Browse the repository at this point in the history
See #1485
  • Loading branch information
garyrussell committed May 13, 2020
1 parent 663b28f commit 38fdafd
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageLis

private int concurrency = 1;

private boolean alwaysClientIdSuffix = true;

/**
* Construct an instance with the supplied configuration properties.
* The topic partitions are distributed evenly across the delegate
Expand Down Expand Up @@ -86,6 +88,16 @@ public void setConcurrency(int concurrency) {
this.concurrency = concurrency;
}

/**
* Set to false to suppress adding a suffix to the child container's client.id when
* the concurrency is only 1.
* @param alwaysClientIdSuffix false to suppress, true (default) to include.
* @since 2.2.14
*/
public void setAlwaysClientIdSuffix(boolean alwaysClientIdSuffix) {
this.alwaysClientIdSuffix = alwaysClientIdSuffix;
}

/**
* Return the list of {@link KafkaMessageListenerContainer}s created by
* this container.
Expand Down Expand Up @@ -167,7 +179,7 @@ protected void doStart() {
if (getApplicationEventPublisher() != null) {
container.setApplicationEventPublisher(getApplicationEventPublisher());
}
container.setClientIdSuffix("-" + i);
container.setClientIdSuffix(this.concurrency > 1 || this.alwaysClientIdSuffix ? "-" + i : "");
container.setGenericErrorHandler(getGenericErrorHandler());
container.setAfterRollbackProcessor(getAfterRollbackProcessor());
container.setRecordInterceptor(getRecordInterceptor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.BitSet;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -451,6 +452,7 @@ public void testManualCommitSyncExisting() throws Exception {
latch.countDown();
});
containerProps.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
containerProps.setClientId("myClientId");

ConcurrentMessageListenerContainer<Integer, String> container =
new ConcurrentMessageListenerContainer<>(cf, containerProps);
Expand All @@ -465,6 +467,9 @@ public void testManualCommitSyncExisting() throws Exception {
template.flush();
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
assertThat(bitSet.cardinality()).isEqualTo(8);
Set<String> clientIds = container.metrics().keySet();
assertThat(clientIds).hasSize(1);
assertThat(clientIds.iterator().next()).isEqualTo("myClientId-0");
container.stop();
this.logger.info("Stop MANUAL_IMMEDIATE with Existing");
}
Expand All @@ -481,10 +486,11 @@ public void testPausedStart() throws Exception {
ConcurrentMessageListenerContainerTests.this.logger.info("paused start: " + message);
latch.countDown();
});

containerProps.setClientId("myClientId");
ConcurrentMessageListenerContainer<Integer, String> container =
new ConcurrentMessageListenerContainer<>(cf, containerProps);
container.setConcurrency(2);
container.setAlwaysClientIdSuffix(false);
container.setBeanName("testBatch");
container.pause();
container.start();
Expand All @@ -503,6 +509,11 @@ public void testPausedStart() throws Exception {
container.resume();

assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
Set<String> clientIds = container.metrics().keySet();
assertThat(clientIds).hasSize(2);
Iterator<String> iterator = clientIds.iterator();
assertThat(iterator.next()).startsWith("myClientId-");
assertThat(iterator.next()).startsWith("myClientId-");
container.stop();
this.logger.info("Stop paused start");
}
Expand Down Expand Up @@ -702,9 +713,11 @@ private void testAckOnErrorWithManualImmediateGuts(String topic, boolean ackOnEr
}

});
containerProps.setClientId("myClientId");
ConcurrentMessageListenerContainer<Integer, String> container = new ConcurrentMessageListenerContainer<>(cf,
containerProps);
container.setConcurrency(1);
container.setAlwaysClientIdSuffix(false);
container.setBeanName("testAckOnErrorWithManualImmediate");
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
Expand All @@ -717,6 +730,9 @@ private void testAckOnErrorWithManualImmediateGuts(String topic, boolean ackOnEr
template.sendDefault(0, 1, "bar");
template.flush();
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
Set<String> clientIds = container.metrics().keySet();
assertThat(clientIds).hasSize(1);
assertThat(clientIds.iterator().next()).isEqualTo("myClientId");
container.stop();

Consumer<Integer, String> consumer = cf.createConsumer();
Expand Down

0 comments on commit 38fdafd

Please sign in to comment.