diff --git a/src/docs/asciidoc/api.adoc b/src/docs/asciidoc/api.adoc index 16074f3246..beb736c8ac 100644 --- a/src/docs/asciidoc/api.adoc +++ b/src/docs/asciidoc/api.adoc @@ -204,6 +204,16 @@ a new connection is open. The value must be between 1 and 255. |To delay the connection opening until necessary. |false +|`requestedHeartbeat` +|Heartbeat requested by the client. +|60 seconds + +|`forceReplicaForConsumers` +|Retry connecting until a replica is available for consumers. +The client retries 5 times before falling back to the stream leader node. +Set to `true` only for clustered environments, not for 1-node environments, where only the stream leader is available. +|`false` + |`id` |Informational ID for the environment instance. Used as a prefix for connection names. diff --git a/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java b/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java index ebf25d7d48..fe632f5f55 100644 --- a/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java +++ b/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java @@ -69,9 +69,8 @@ public interface EnvironmentBuilder { *
The default implementation is overridden automatically if the following conditions are
* met: the host to connect to is localhost, the user is guest, and no
* address resolver has been provided. The client will then always tries to connect to
- * localhost to facilitate local development.
- * Just provide a pass-through address resolver to avoid this
- * behavior, e.g.:
+ * localhost to facilitate local development. Just provide a pass-through address resolver
+ * to avoid this behavior, e.g.:
*
*
* Environment.builder()
@@ -184,7 +183,7 @@ public interface EnvironmentBuilder {
EnvironmentBuilder virtualHost(String virtualHost);
/**
- * The hearbeat to request.
+ * The heartbeat to request.
*
* Default is 60 seconds.
*
@@ -323,6 +322,34 @@ EnvironmentBuilder topologyUpdateBackOffDelayPolicy(
*/
EnvironmentBuilder lazyInitialization(boolean lazy);
+ /**
+ * Flag to force the connection to a stream replica for consumers.
+ *
+ *
The library will always prefer to connect to a stream replica to consume from, but it will
+ * fall back to the stream leader if no replica is available. This is the default behavior. Set
+ * this flag to true to make the library wait for a replica to become available if
+ * only the stream leader is available. This can lead to longer recovery time but help to offload
+ * a stream leader and let it deal only with write requests.
+ *
+ *
Note the library performs only 5 attempts to locate a replica before falling back to the
+ * leader when the flag is set to true.
+ *
+ *
The {@link #recoveryBackOffDelayPolicy(BackOffDelayPolicy)} and {@link
+ * #topologyUpdateBackOffDelayPolicy(BackOffDelayPolicy)} policies control the time between
+ * attempts.
+ *
+ *
Do not set this flag to true when streams have only 1 member (the leader),
+ * e.g. for local development.
+ *
+ *
Default is false.
+ *
+ * @param forceReplica whether to force the connection to a replica or not
+ * @return this builder instance
+ * @see #recoveryBackOffDelayPolicy(BackOffDelayPolicy)
+ * @see #topologyUpdateBackOffDelayPolicy(BackOffDelayPolicy)
+ */
+ EnvironmentBuilder forceReplicaForConsumers(boolean forceReplica);
+
/**
* Create the {@link Environment} instance.
*
diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java
index d9c4adf29b..4d2cef6c6f 100644
--- a/src/main/java/com/rabbitmq/stream/impl/Client.java
+++ b/src/main/java/com/rabbitmq/stream/impl/Client.java
@@ -185,6 +185,7 @@ public long applyAsLong(Object value) {
FlushConsolidationHandler.class.getSimpleName();
private final String NETTY_HANDLER_STREAM = StreamHandler.class.getSimpleName();
private final String host;
+ private final String clientConnectionName;
private final int port;
private final Map serverProperties;
private final Map connectionProperties;
@@ -313,6 +314,7 @@ public void initChannel(SocketChannel ch) {
f = b.connect(parameters.host, parameters.port).sync();
this.host = parameters.host;
this.port = parameters.port;
+ this.clientConnectionName = clientConnectionName;
} catch (Exception e) {
String message =
format(
@@ -2696,7 +2698,11 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.READER_IDLE) {
- LOGGER.info("Closing connection because it's been idle for too long");
+ LOGGER.info(
+ "Closing connection {} on {}:{} because it's been idle for too long",
+ clientConnectionName,
+ host,
+ port);
closing.set(true);
closingSequence(ShutdownContext.ShutdownReason.HEARTBEAT_FAILURE);
} else if (e.state() == IdleState.WRITER_IDLE) {
diff --git a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java
index 4938ea0f7e..2314434ad3 100644
--- a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java
+++ b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java
@@ -20,6 +20,7 @@
import static com.rabbitmq.stream.impl.Utils.namedFunction;
import static com.rabbitmq.stream.impl.Utils.namedRunnable;
import static com.rabbitmq.stream.impl.Utils.quote;
+import static java.lang.String.format;
import com.rabbitmq.stream.*;
import com.rabbitmq.stream.Consumer;
@@ -41,10 +42,12 @@
import java.util.Objects;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.*;
@@ -56,6 +59,7 @@
class ConsumersCoordinator {
static final int MAX_SUBSCRIPTIONS_PER_CLIENT = 256;
+ static final int MAX_ATTEMPT_BEFORE_FALLING_BACK_TO_LEADER = 5;
static final OffsetSpecification DEFAULT_OFFSET_SPECIFICATION = OffsetSpecification.next();
@@ -74,16 +78,19 @@ class ConsumersCoordinator {
private final ExecutorServiceFactory executorServiceFactory =
new DefaultExecutorServiceFactory(
Runtime.getRuntime().availableProcessors(), 10, "rabbitmq-stream-consumer-connection-");
+ private final boolean forceReplica;
ConsumersCoordinator(
StreamEnvironment environment,
int maxConsumersByConnection,
Function connectionNamingStrategy,
- ClientFactory clientFactory) {
+ ClientFactory clientFactory,
+ boolean forceReplica) {
this.environment = environment;
this.clientFactory = clientFactory;
this.maxConsumersByConnection = maxConsumersByConnection;
this.connectionNamingStrategy = connectionNamingStrategy;
+ this.forceReplica = forceReplica;
}
private static String keyForClientSubscription(Client.Broker broker) {
@@ -108,7 +115,7 @@ Runnable subscribe(
MessageHandler messageHandler,
Map subscriptionProperties,
ConsumerFlowStrategy flowStrategy) {
- List candidates = findBrokersForStream(stream);
+ List candidates = findBrokersForStream(stream, forceReplica);
Client.Broker newNode = pickBroker(candidates);
if (newNode == null) {
throw new IllegalStateException("No available node to subscribe to");
@@ -201,11 +208,8 @@ private void addToManager(
// manager connection is dead or stream not available
// scheduling manager closing if necessary in another thread to avoid blocking this one
if (pickedManager.isEmpty()) {
- ClientSubscriptionsManager manager = pickedManager;
ConsumersCoordinator.this.environment.execute(
- () -> {
- manager.closeIfEmpty();
- },
+ pickedManager::closeIfEmpty,
"Consumer manager closing after timeout, consumer %d on stream '%s'",
tracker.consumer.id(),
tracker.stream);
@@ -225,12 +229,14 @@ int managerCount() {
}
// package protected for testing
- List findBrokersForStream(String stream) {
+ List findBrokersForStream(String stream, boolean forceReplica) {
+ LOGGER.debug(
+ "Candidate lookup to consumer from '{}', forcing replica? {}", stream, forceReplica);
Map metadata =
this.environment.locatorOperation(
namedFunction(
c -> c.metadata(stream), "Candidate lookup to consume from '%s'", stream));
- if (metadata.size() == 0 || metadata.get(stream) == null) {
+ if (metadata.isEmpty() || metadata.get(stream) == null) {
// this is not supposed to happen
throw new StreamDoesNotExistException(stream);
}
@@ -253,8 +259,17 @@ List findBrokersForStream(String stream) {
List brokers;
if (replicas == null || replicas.isEmpty()) {
- brokers = Collections.singletonList(streamMetadata.getLeader());
- LOGGER.debug("Only leader node {} for consuming from {}", streamMetadata.getLeader(), stream);
+ if (forceReplica) {
+ throw new IllegalStateException(
+ format(
+ "Only the leader node is available for consuming from %s and "
+ + "consuming from leader has been deactivated for this consumer",
+ stream));
+ } else {
+ brokers = Collections.singletonList(streamMetadata.getLeader());
+ LOGGER.debug(
+ "Only leader node {} for consuming from {}", streamMetadata.getLeader(), stream);
+ }
} else {
LOGGER.debug("Replicas for consuming from {}: {}", stream, replicas);
brokers = new ArrayList<>(replicas);
@@ -265,6 +280,22 @@ List findBrokersForStream(String stream) {
return brokers;
}
+ private Callable> findBrokersForStream(String stream) {
+ AtomicInteger attemptNumber = new AtomicInteger();
+ return () -> {
+ boolean mustUseReplica;
+ if (forceReplica) {
+ mustUseReplica =
+ attemptNumber.incrementAndGet() <= MAX_ATTEMPT_BEFORE_FALLING_BACK_TO_LEADER;
+ } else {
+ mustUseReplica = false;
+ }
+ LOGGER.debug(
+ "Looking for broker(s) for stream {}, forcing replica {}", stream, mustUseReplica);
+ return findBrokersForStream(stream, mustUseReplica);
+ };
+ }
+
private Client.Broker pickBroker(List brokers) {
if (brokers.isEmpty()) {
return null;
@@ -792,7 +823,7 @@ private void assignConsumersToStream(
}
};
- AsyncRetry.asyncRetry(() -> findBrokersForStream(stream))
+ AsyncRetry.asyncRetry(findBrokersForStream(stream))
.description("Candidate lookup to consume from '%s'", stream)
.scheduler(environment.scheduledExecutorService())
.retry(ex -> !(ex instanceof StreamDoesNotExistException))
@@ -885,9 +916,9 @@ private void recoverSubscription(List candidates, SubscriptionTracker tr
// maybe not a good candidate, let's refresh and retry for this one
candidates =
Utils.callAndMaybeRetry(
- () -> findBrokersForStream(tracker.stream),
+ findBrokersForStream(tracker.stream),
ex -> !(ex instanceof StreamDoesNotExistException),
- environment.recoveryBackOffDelayPolicy(),
+ recoveryBackOffDelayPolicy(),
"Candidate lookup to consume from '%s' (subscription recovery)",
tracker.stream);
} catch (Exception e) {
diff --git a/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java b/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java
index 254620626c..9971cf7ebd 100644
--- a/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java
+++ b/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java
@@ -162,7 +162,7 @@ private void addToManager(Broker node, AgentTracker tracker) {
try {
pickedManager.register(tracker);
LOGGER.debug(
- "Assigned {} tracker {} (stream '{}') to manager {} (node {}), subscription ID {}",
+ "Assigned {} tracker {} (stream '{}') to manager {} (node {}), publisher ID {}",
tracker.type(),
tracker.uniqueId(),
tracker.stream(),
@@ -605,8 +605,9 @@ private ClientProducersManager(
}
if (shutdownContext.isShutdownUnexpected()) {
LOGGER.debug(
- "Recovering {} producer(s) after unexpected connection termination",
- producers.size());
+ "Recovering {} producer(s) and {} tracking consumer(s) after unexpected connection termination",
+ producers.size(),
+ trackingConsumerTrackers.size());
producers.forEach((publishingId, tracker) -> tracker.unavailable());
trackingConsumerTrackers.forEach(AgentTracker::unavailable);
// execute in thread pool to free the IO thread
@@ -701,7 +702,8 @@ private void assignProducersToNewManagers(
.thenAccept(
broker -> {
String key = keyForNode(broker);
- LOGGER.debug("Assigning {} producer(s) to {}", trackers.size(), key);
+ LOGGER.debug(
+ "Assigning {} producer(s) and consumer tracker(s) to {}", trackers.size(), key);
trackers.forEach(tracker -> maybeRecoverAgent(broker, tracker));
})
.exceptionally(
@@ -767,10 +769,10 @@ private void recoverAgent(Broker node, AgentTracker tracker) {
| ClientClosedException
| StreamNotAvailableException e) {
LOGGER.debug(
- "{} re-assignment on stream {} timed out or connection closed or stream not available, "
+ "{} re-assignment on stream {} (ID {}) timed out or connection closed or stream not available, "
+ "refreshing candidate leader and retrying",
tracker.type(),
- tracker.id(),
+ tracker.identifiable() ? tracker.id() : "N/A",
tracker.stream());
// maybe not a good candidate, let's refresh and retry for this one
node =
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
index 5492e68650..1c190546d2 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
@@ -101,7 +101,8 @@ class StreamEnvironment implements Environment {
boolean lazyInit,
Function connectionNamingStrategy,
Function clientFactory,
- ObservationCollector> observationCollector) {
+ ObservationCollector> observationCollector,
+ boolean forceReplicaForConsumers) {
this.recoveryBackOffDelayPolicy = recoveryBackOffDelayPolicy;
this.topologyUpdateBackOffDelayPolicy = topologyBackOffDelayPolicy;
this.byteBufAllocator = byteBufAllocator;
@@ -208,7 +209,8 @@ class StreamEnvironment implements Environment {
this,
maxConsumersByConnection,
connectionNamingStrategy,
- Utils.coordinatorClientFactory(this));
+ Utils.coordinatorClientFactory(this),
+ forceReplicaForConsumers);
this.offsetTrackingCoordinator = new OffsetTrackingCoordinator(this);
ClientParameters clientParametersForInit = locatorParametersCopy();
Runnable locatorInitSequence =
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java
index e3392c893a..ef6e3e87a7 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java
@@ -63,6 +63,7 @@ public class StreamEnvironmentBuilder implements EnvironmentBuilder {
private int maxConsumersByConnection = ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT;
private CompressionCodecFactory compressionCodecFactory;
private boolean lazyInit = false;
+ private boolean forceReplicaForConsumers = false;
private Function clientFactory = Client::new;
private ObservationCollector> observationCollector = ObservationCollector.NO_OP;
@@ -266,6 +267,12 @@ public EnvironmentBuilder lazyInitialization(boolean lazy) {
return this;
}
+ @Override
+ public EnvironmentBuilder forceReplicaForConsumers(boolean forceReplica) {
+ this.forceReplicaForConsumers = forceReplica;
+ return this;
+ }
+
@Override
public TlsConfiguration tls() {
this.tls.enable();
@@ -318,7 +325,8 @@ public Environment build() {
lazyInit,
connectionNamingStrategy,
this.clientFactory,
- this.observationCollector);
+ this.observationCollector,
+ this.forceReplicaForConsumers);
}
static final class DefaultTlsConfiguration implements TlsConfiguration {
diff --git a/src/main/java/com/rabbitmq/stream/impl/Utils.java b/src/main/java/com/rabbitmq/stream/impl/Utils.java
index 93c2721928..6dfa23c719 100644
--- a/src/main/java/com/rabbitmq/stream/impl/Utils.java
+++ b/src/main/java/com/rabbitmq/stream/impl/Utils.java
@@ -207,7 +207,7 @@ static Function namedFunction(Function task, String format, O
}
static T callAndMaybeRetry(
- Supplier operation, Predicate retryCondition, String format, Object... args) {
+ Callable operation, Predicate retryCondition, String format, Object... args) {
return callAndMaybeRetry(
operation,
retryCondition,
@@ -217,7 +217,7 @@ static T callAndMaybeRetry(
}
static T callAndMaybeRetry(
- Supplier operation,
+ Callable operation,
Predicate retryCondition,
BackOffDelayPolicy delayPolicy,
String format,
@@ -230,7 +230,7 @@ static T callAndMaybeRetry(
while (keepTrying) {
try {
attempt++;
- T result = operation.get();
+ T result = operation.call();
Duration operationDuration = Duration.ofNanos(System.nanoTime() - startTime);
LOGGER.debug(
"Operation '{}' completed in {} ms after {} attempt(s)",
diff --git a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java
index 088c94b9e9..5e387d926f 100644
--- a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java
+++ b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java
@@ -22,24 +22,9 @@
import com.google.common.util.concurrent.RateLimiter;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
-import com.rabbitmq.stream.Address;
-import com.rabbitmq.stream.AddressResolver;
-import com.rabbitmq.stream.ByteCapacity;
-import com.rabbitmq.stream.Codec;
-import com.rabbitmq.stream.ConfirmationHandler;
-import com.rabbitmq.stream.Constants;
-import com.rabbitmq.stream.Consumer;
-import com.rabbitmq.stream.ConsumerBuilder;
-import com.rabbitmq.stream.Environment;
-import com.rabbitmq.stream.EnvironmentBuilder;
+import com.rabbitmq.stream.*;
import com.rabbitmq.stream.EnvironmentBuilder.TlsConfiguration;
-import com.rabbitmq.stream.MessageBuilder;
-import com.rabbitmq.stream.OffsetSpecification;
-import com.rabbitmq.stream.Producer;
-import com.rabbitmq.stream.ProducerBuilder;
-import com.rabbitmq.stream.StreamCreator;
import com.rabbitmq.stream.StreamCreator.LeaderLocator;
-import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.codec.QpidProtonCodec;
import com.rabbitmq.stream.codec.SimpleCodec;
import com.rabbitmq.stream.compression.Compression;
@@ -443,6 +428,12 @@ public class StreamPerfTest implements Callable {
split = ",")
private List filterValues;
+ @CommandLine.Option(
+ names = {"--force-replica-for-consumers", "-frfc"},
+ description = "force the connection to a replica for consumers",
+ defaultValue = "false")
+ private boolean forceReplicaForConsumers;
+
static class InstanceSyncOptions {
@CommandLine.Option(
@@ -484,6 +475,33 @@ static class InstanceSyncOptions {
converter = Utils.NotNegativeIntegerTypeConverter.class)
private int initialCredits;
+ @CommandLine.Option(
+ names = {"--heartbeat", "-b"},
+ description = "requested heartbeat in seconds",
+ defaultValue = "60",
+ converter = Utils.GreaterThanOrEqualToZeroIntegerTypeConverter.class)
+ private int heartbeat;
+
+ @CommandLine.Option(
+ names = {"--connection-recovery-interval", "-cri"},
+ description =
+ "connection recovery interval in seconds. "
+ + "Examples: 5 for a fixed delay of 5 seconds, 5:10 for a first attempt after 5 seconds then "
+ + "10 seconds between attempts.",
+ defaultValue = "5",
+ converter = Utils.BackOffDelayPolicyTypeConverter.class)
+ private BackOffDelayPolicy recoveryBackOffDelayPolicy;
+
+ @CommandLine.Option(
+ names = {"--topology-recovery-interval", "-tri"},
+ description =
+ "topology recovery interval in seconds. "
+ + "Examples: 5 for a fixed delay of 5 seconds, 5:10 for a first attempt after 5 seconds then "
+ + "10 seconds between attempts.",
+ defaultValue = "5:1",
+ converter = Utils.BackOffDelayPolicyTypeConverter.class)
+ private BackOffDelayPolicy topologyBackOffDelayPolicy;
+
private MetricsCollector metricsCollector;
private PerformanceMetrics performanceMetrics;
private List monitorings;
@@ -747,7 +765,11 @@ public Integer call() throws Exception {
.maxTrackingConsumersByConnection(this.trackingConsumersByConnection)
.maxConsumersByConnection(this.consumersByConnection)
.rpcTimeout(Duration.ofSeconds(this.rpcTimeout))
- .requestedMaxFrameSize((int) this.requestedMaxFrameSize.toBytes());
+ .requestedMaxFrameSize((int) this.requestedMaxFrameSize.toBytes())
+ .forceReplicaForConsumers(this.forceReplicaForConsumers)
+ .requestedHeartbeat(Duration.ofSeconds(this.heartbeat))
+ .recoveryBackOffDelayPolicy(this.recoveryBackOffDelayPolicy)
+ .topologyUpdateBackOffDelayPolicy(this.topologyBackOffDelayPolicy);
if (addrResolver != null) {
environmentBuilder = environmentBuilder.addressResolver(addrResolver);
@@ -978,7 +1000,7 @@ public Integer call() throws Exception {
final int msgSize = this.messageSize;
try {
- while (true && !Thread.currentThread().isInterrupted()) {
+ while (!Thread.currentThread().isInterrupted()) {
rateLimiterCallback.run();
// Using current time for interoperability with other tools
// and also across different processes.
@@ -993,9 +1015,8 @@ public Integer call() throws Exception {
messageBuilder.addData(payload).build(), confirmationHandler);
}
} catch (Exception e) {
- if (e instanceof InterruptedException
- || (e.getCause() != null
- && e.getCause() instanceof InterruptedException)) {
+ if (e.getCause() != null
+ && e.getCause() instanceof InterruptedException) {
LOGGER.info("Publisher #{} thread interrupted", i, e);
} else {
LOGGER.warn("Publisher #{} crashed", i, e);
diff --git a/src/main/java/com/rabbitmq/stream/perf/Utils.java b/src/main/java/com/rabbitmq/stream/perf/Utils.java
index 1d38ab3cee..c24b01a7f5 100644
--- a/src/main/java/com/rabbitmq/stream/perf/Utils.java
+++ b/src/main/java/com/rabbitmq/stream/perf/Utils.java
@@ -13,6 +13,8 @@
// info@rabbitmq.com.
package com.rabbitmq.stream.perf;
+import static java.time.Duration.ofSeconds;
+
import com.codahale.metrics.MetricRegistry;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
@@ -20,6 +22,7 @@
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.SocketConfigurator;
import com.rabbitmq.client.SocketConfigurators;
+import com.rabbitmq.stream.BackOffDelayPolicy;
import com.rabbitmq.stream.ByteCapacity;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.StreamCreator.LeaderLocator;
@@ -676,6 +679,44 @@ public Integer convert(String input) {
}
}
+ static class BackOffDelayPolicyTypeConverter
+ implements CommandLine.ITypeConverter {
+
+ @Override
+ public BackOffDelayPolicy convert(String input) {
+ if (input == null || input.trim().isEmpty()) {
+ typeConversionException("Value for back-off delay policy cannot be empty");
+ }
+ String[] values = input.split(":");
+ if (values.length != 1 && values.length != 2) {
+ typeConversionException("Invalid value for back-off delay policy: " + input);
+ }
+ int firstAttempt = 0, nextAttempts = 0;
+ try {
+ firstAttempt = Integer.parseInt(values[0]);
+ if (firstAttempt <= 0) {
+ throw new IllegalArgumentException();
+ }
+ } catch (Exception e) {
+ typeConversionException("Invalid value for back-off delay policy: " + input);
+ }
+ if (values.length == 2) {
+ try {
+ nextAttempts = Integer.parseInt(values[1]);
+ if (nextAttempts <= 0) {
+ throw new IllegalArgumentException();
+ }
+ } catch (Exception e) {
+ typeConversionException("Invalid value for back-off delay policy: " + input);
+ }
+ } else {
+ nextAttempts = firstAttempt;
+ }
+ return BackOffDelayPolicy.fixedWithInitialDelay(
+ ofSeconds(firstAttempt), ofSeconds(nextAttempts));
+ }
+ }
+
private static void typeConversionException(String message) {
throw new TypeConversionException(message);
}
@@ -820,7 +861,7 @@ protected DistributionSummary createChunkSizeDistributionSummary(
.tags(tags)
.description("chunk size")
.publishPercentiles(0.5, 0.75, 0.95, 0.99)
- .distributionStatisticExpiry(Duration.ofSeconds(1))
+ .distributionStatisticExpiry(ofSeconds(1))
.serviceLevelObjectives()
.register(registry);
}
diff --git a/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java b/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java
index dd50ba4fe7..06851a55c7 100644
--- a/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java
@@ -20,6 +20,7 @@
import static com.rabbitmq.stream.impl.TestUtils.namedConsumer;
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
import static java.lang.String.format;
+import static java.util.Collections.emptyList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
@@ -159,7 +160,8 @@ public Client.ClientParameters shutdownListener(
environment,
ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT,
type -> "consumer-connection",
- clientFactory);
+ clientFactory,
+ false);
}
@AfterEach
@@ -188,7 +190,8 @@ void tearDown() throws Exception {
environment,
ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT,
type -> "consumer-connection",
- cf);
+ cf,
+ false);
when(locator.metadata("stream")).thenReturn(metadata(null, replica()));
when(clientFactory.client(any())).thenReturn(client);
@@ -229,7 +232,8 @@ void shouldGetExactNodeImmediatelyWithAdvertisedHostNameClientFactoryAndExactNod
environment,
ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT,
type -> "consumer-connection",
- cf);
+ cf,
+ false);
when(locator.metadata("stream")).thenReturn(metadata(null, replica()));
when(clientFactory.client(any())).thenReturn(client);
@@ -397,13 +401,15 @@ void subscribeShouldThrowExceptionIfNoNodeAvailableForStream() {
@Test
void findBrokersForStreamShouldReturnLeaderIfNoReplicas() {
when(locator.metadata("stream")).thenReturn(metadata(leader(), null));
- assertThat(coordinator.findBrokersForStream("stream")).hasSize(1).contains(leader());
+ assertThat(coordinator.findBrokersForStream("stream", false)).hasSize(1).contains(leader());
}
@Test
void findBrokersForStreamShouldReturnReplicasIfThereAreSome() {
when(locator.metadata("stream")).thenReturn(metadata(null, replicas()));
- assertThat(coordinator.findBrokersForStream("stream")).hasSize(2).hasSameElementsAs(replicas());
+ assertThat(coordinator.findBrokersForStream("stream", false))
+ .hasSize(2)
+ .hasSameElementsAs(replicas());
}
@Test
@@ -593,8 +599,8 @@ void shouldRedistributeConsumerIfConnectionIsLost() throws Exception {
when(locator.metadata("stream"))
.thenReturn(metadata(null, replica()))
.thenReturn(metadata(null, replica())) // for the second consumer
- .thenReturn(metadata(null, Collections.emptyList()))
- .thenReturn(metadata(null, Collections.emptyList()))
+ .thenReturn(metadata(null, emptyList()))
+ .thenReturn(metadata(null, emptyList()))
.thenReturn(metadata(null, replica()));
when(clientFactory.client(any())).thenReturn(client);
@@ -865,8 +871,8 @@ void shouldRetryRedistributionIfMetadataIsNotUpdatedImmediately() throws Excepti
when(consumer.isOpen()).thenReturn(true);
when(locator.metadata("stream"))
.thenReturn(metadata(null, replicas()))
- .thenReturn(metadata(null, Collections.emptyList()))
- .thenReturn(metadata(null, Collections.emptyList()))
+ .thenReturn(metadata(null, emptyList()))
+ .thenReturn(metadata(null, emptyList()))
.thenReturn(metadata(null, replicas()));
when(clientFactory.client(any())).thenReturn(client);
@@ -1248,7 +1254,7 @@ void shouldRestartWhereItLeftOffAfterDisruption(Consumer offsetSpecificationArgumentCaptor =
@@ -1321,7 +1327,7 @@ void shouldReUseInitialOffsetSpecificationAfterDisruptionIfNoMessagesReceived(
when(consumer.isOpen()).thenReturn(true);
when(locator.metadata("stream"))
.thenReturn(metadata(null, replicas()))
- .thenReturn(metadata(null, Collections.emptyList()))
+ .thenReturn(metadata(null, emptyList()))
.thenReturn(metadata(null, replicas()));
ArgumentCaptor offsetSpecificationArgumentCaptor =
@@ -1386,7 +1392,7 @@ void shouldUseStoredOffsetOnRecovery(Consumer configur
when(consumer.isOpen()).thenReturn(true);
when(locator.metadata("stream"))
.thenReturn(metadata(null, replicas()))
- .thenReturn(metadata(null, Collections.emptyList()))
+ .thenReturn(metadata(null, emptyList()))
.thenReturn(metadata(null, replicas()));
when(clientFactory.client(any())).thenReturn(client);
@@ -1788,6 +1794,105 @@ void consumerShouldBeCreatedProperlyIfManagerClientIsRetried() {
assertThat(messageHandlerCalls.get()).isEqualTo(1);
}
+ @Test
+ void shouldRetryUntilReplicaIsAvailableWhenForceReplicaIsOn() throws Exception {
+ scheduledExecutorService = createScheduledExecutorService();
+ when(environment.scheduledExecutorService()).thenReturn(scheduledExecutorService);
+ Duration retryDelay = Duration.ofMillis(100);
+ when(environment.recoveryBackOffDelayPolicy()).thenReturn(BackOffDelayPolicy.fixed(retryDelay));
+ when(consumer.isOpen()).thenReturn(true);
+ when(locator.metadata("stream"))
+ .thenReturn(metadata(leader(), replica()))
+ .thenReturn(metadata(leader(), emptyList()));
+
+ when(clientFactory.client(any())).thenReturn(client);
+ AtomicInteger subscriptionCount = new AtomicInteger(0);
+ when(client.subscribe(
+ subscriptionIdCaptor.capture(),
+ anyString(),
+ any(OffsetSpecification.class),
+ anyInt(),
+ anyMap()))
+ .thenAnswer(
+ invocation -> {
+ subscriptionCount.incrementAndGet();
+ return new Client.Response(Constants.RESPONSE_CODE_OK);
+ });
+
+ coordinator =
+ new ConsumersCoordinator(
+ environment,
+ ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT,
+ type -> "consumer-connection",
+ clientFactory,
+ true);
+
+ AtomicInteger messageHandlerCalls = new AtomicInteger();
+ Runnable closingRunnable =
+ coordinator.subscribe(
+ consumer,
+ "stream",
+ OffsetSpecification.first(),
+ null,
+ NO_OP_SUBSCRIPTION_LISTENER,
+ NO_OP_TRACKING_CLOSING_CALLBACK,
+ (offset, message) -> messageHandlerCalls.incrementAndGet(),
+ Collections.emptyMap(),
+ flowStrategy());
+ verify(clientFactory, times(1)).client(any());
+ verify(client, times(1))
+ .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap());
+
+ assertThat(messageHandlerCalls.get()).isEqualTo(0);
+ messageListener.handle(
+ subscriptionIdCaptor.getAllValues().get(0),
+ 1,
+ 0,
+ 0,
+ null,
+ new WrapperMessageBuilder().build());
+ assertThat(messageHandlerCalls.get()).isEqualTo(1);
+
+ verify(client, times(1))
+ .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap());
+
+ shutdownListener.handle(
+ new Client.ShutdownContext(Client.ShutdownContext.ShutdownReason.UNKNOWN));
+
+ waitAtMost(() -> subscriptionCount.get() == 2);
+
+ // metadata calls: the first registration, then the max number of attempts to get a replica,
+ // then the last attempt that falls back to the leader
+ verify(locator, times(1 + ConsumersCoordinator.MAX_ATTEMPT_BEFORE_FALLING_BACK_TO_LEADER + 1))
+ .metadata("stream");
+
+ // the consumer connection should be reset after the connection disruption
+ verify(consumer, times(1)).setSubscriptionClient(isNull());
+
+ verify(client, times(2))
+ .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap());
+
+ assertThat(messageHandlerCalls.get()).isEqualTo(1);
+ messageListener.handle(
+ subscriptionIdCaptor.getAllValues().get(0),
+ 0,
+ 0,
+ 0,
+ null,
+ new WrapperMessageBuilder().build());
+ assertThat(messageHandlerCalls.get()).isEqualTo(2);
+
+ when(client.unsubscribe(subscriptionIdCaptor.getValue()))
+ .thenReturn(new Client.Response(Constants.RESPONSE_CODE_OK));
+
+ closingRunnable.run();
+ verify(client, times(1)).unsubscribe(subscriptionIdCaptor.getValue());
+
+ messageListener.handle(
+ subscriptionIdCaptor.getValue(), 0, 0, 0, null, new WrapperMessageBuilder().build());
+ assertThat(messageHandlerCalls.get()).isEqualTo(2);
+ }
+
Client.Broker leader() {
return new Client.Broker("leader", -1);
}
diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java
index e20f9c520b..7b09cbb72b 100644
--- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java
@@ -96,7 +96,8 @@ Client.ClientParameters duplicate() {
false,
type -> "locator-connection",
cf,
- ObservationCollector.NO_OP);
+ ObservationCollector.NO_OP,
+ false);
}
@AfterEach
@@ -160,7 +161,8 @@ void shouldTryUrisOnInitializationFailure() throws Exception {
false,
type -> "locator-connection",
cf,
- ObservationCollector.NO_OP);
+ ObservationCollector.NO_OP,
+ false);
verify(cf, times(3)).apply(any(Client.ClientParameters.class));
}
@@ -187,7 +189,8 @@ void shouldNotOpenConnectionWhenLazyInitIsEnabled(
lazyInit,
type -> "locator-connection",
cf,
- ObservationCollector.NO_OP);
+ ObservationCollector.NO_OP,
+ false);
verify(cf, times(expectedConnectionCreation)).apply(any(Client.ClientParameters.class));
}
diff --git a/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java b/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java
index a6e09fd0bd..3b1ea0ba16 100644
--- a/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java
+++ b/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java
@@ -21,6 +21,7 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.params.provider.Arguments.of;
+import com.rabbitmq.stream.BackOffDelayPolicy;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.compression.Compression;
import com.rabbitmq.stream.perf.Utils.CompressionTypeConverter;
@@ -30,6 +31,7 @@
import com.rabbitmq.stream.perf.Utils.RangeTypeConverter;
import com.rabbitmq.stream.perf.Utils.SniServerNamesConverter;
import io.micrometer.core.instrument.Tag;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -389,6 +391,30 @@ void filterValueSetConverter() throws Exception {
assertThat(converter.convert("5..10")).hasSize(6).contains("5", "6", "10");
}
+ @ParameterizedTest
+ @ValueSource(strings = {"foo", "foo:bar", "0", "1:0", "-1", "1:-2", "1:foo"})
+ void backOffDelayPolicyConverterKo(String input) {
+ CommandLine.ITypeConverter converter =
+ new Utils.BackOffDelayPolicyTypeConverter();
+ assertThatThrownBy(() -> converter.convert(input)).isInstanceOf(TypeConversionException.class);
+ }
+
+ @ParameterizedTest
+ @CsvSource({"5,0:5|1:5|4:5", "5:10,0:5|1:10|4:10"})
+ void backOffDelayPolicyConverterOk(String input, String expectations) throws Exception {
+ CommandLine.ITypeConverter converter =
+ new Utils.BackOffDelayPolicyTypeConverter();
+ BackOffDelayPolicy policy = converter.convert(input);
+ Arrays.stream(expectations.split("\\|"))
+ .map(s -> s.split(":"))
+ .forEach(
+ attemptDelay -> {
+ int attempt = Integer.parseInt(attemptDelay[0]);
+ Duration expectedDelay = Duration.ofSeconds(Long.parseLong(attemptDelay[1]));
+ assertThat(policy.delay(attempt)).isEqualTo(expectedDelay);
+ });
+ }
+
@Command(name = "test-command")
static class TestCommand {