From 39776f07210385b4bfc9269aeb2feda9de5074b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 20 Sep 2023 16:18:30 +0200 Subject: [PATCH 1/2] Force replica WIP Conflicts: src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java --- .../rabbitmq/stream/EnvironmentBuilder.java | 5 +- .../stream/impl/ConsumersCoordinator.java | 55 ++++++-- .../stream/impl/StreamEnvironment.java | 3 +- .../java/com/rabbitmq/stream/impl/Utils.java | 6 +- .../stream/impl/ConsumersCoordinatorTest.java | 129 ++++++++++++++++-- 5 files changed, 166 insertions(+), 32 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java b/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java index ebf25d7d48..da03059ef2 100644 --- a/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java +++ b/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java @@ -69,9 +69,8 @@ public interface EnvironmentBuilder { *

The default implementation is overridden automatically if the following conditions are * met: the host to connect to is localhost, the user is guest, and no * address resolver has been provided. The client will then always tries to connect to - * localhost to facilitate local development. - * Just provide a pass-through address resolver to avoid this - * behavior, e.g.: + * localhost to facilitate local development. Just provide a pass-through address resolver + * to avoid this behavior, e.g.: * *

    * Environment.builder()
diff --git a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java
index 4938ea0f7e..81de6f8c07 100644
--- a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java
+++ b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java
@@ -20,6 +20,7 @@
 import static com.rabbitmq.stream.impl.Utils.namedFunction;
 import static com.rabbitmq.stream.impl.Utils.namedRunnable;
 import static com.rabbitmq.stream.impl.Utils.quote;
+import static java.lang.String.format;
 
 import com.rabbitmq.stream.*;
 import com.rabbitmq.stream.Consumer;
@@ -41,10 +42,12 @@
 import java.util.Objects;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.*;
@@ -56,6 +59,7 @@
 class ConsumersCoordinator {
 
   static final int MAX_SUBSCRIPTIONS_PER_CLIENT = 256;
+  static final int MAX_ATTEMPT_BEFORE_FALLING_BACK_TO_LEADER = 5;
 
   static final OffsetSpecification DEFAULT_OFFSET_SPECIFICATION = OffsetSpecification.next();
 
@@ -74,16 +78,19 @@ class ConsumersCoordinator {
   private final ExecutorServiceFactory executorServiceFactory =
       new DefaultExecutorServiceFactory(
           Runtime.getRuntime().availableProcessors(), 10, "rabbitmq-stream-consumer-connection-");
+  private final boolean forceReplica;
 
   ConsumersCoordinator(
       StreamEnvironment environment,
       int maxConsumersByConnection,
       Function connectionNamingStrategy,
-      ClientFactory clientFactory) {
+      ClientFactory clientFactory,
+      boolean forceReplica) {
     this.environment = environment;
     this.clientFactory = clientFactory;
     this.maxConsumersByConnection = maxConsumersByConnection;
     this.connectionNamingStrategy = connectionNamingStrategy;
+    this.forceReplica = forceReplica;
   }
 
   private static String keyForClientSubscription(Client.Broker broker) {
@@ -108,7 +115,7 @@ Runnable subscribe(
       MessageHandler messageHandler,
       Map subscriptionProperties,
       ConsumerFlowStrategy flowStrategy) {
-    List candidates = findBrokersForStream(stream);
+    List candidates = findBrokersForStream(stream, forceReplica);
     Client.Broker newNode = pickBroker(candidates);
     if (newNode == null) {
       throw new IllegalStateException("No available node to subscribe to");
@@ -201,11 +208,8 @@ private void addToManager(
         // manager connection is dead or stream not available
         // scheduling manager closing if necessary in another thread to avoid blocking this one
         if (pickedManager.isEmpty()) {
-          ClientSubscriptionsManager manager = pickedManager;
           ConsumersCoordinator.this.environment.execute(
-              () -> {
-                manager.closeIfEmpty();
-              },
+              pickedManager::closeIfEmpty,
               "Consumer manager closing after timeout, consumer %d on stream '%s'",
               tracker.consumer.id(),
               tracker.stream);
@@ -225,12 +229,14 @@ int managerCount() {
   }
 
   // package protected for testing
-  List findBrokersForStream(String stream) {
+  List findBrokersForStream(String stream, boolean forceReplica) {
+    LOGGER.debug(
+        "Candidate lookup to consumer from '{}', forcing replica? {}", stream, forceReplica);
     Map metadata =
         this.environment.locatorOperation(
             namedFunction(
                 c -> c.metadata(stream), "Candidate lookup to consume from '%s'", stream));
-    if (metadata.size() == 0 || metadata.get(stream) == null) {
+    if (metadata.isEmpty() || metadata.get(stream) == null) {
       // this is not supposed to happen
       throw new StreamDoesNotExistException(stream);
     }
@@ -253,8 +259,17 @@ List findBrokersForStream(String stream) {
 
     List brokers;
     if (replicas == null || replicas.isEmpty()) {
-      brokers = Collections.singletonList(streamMetadata.getLeader());
-      LOGGER.debug("Only leader node {} for consuming from {}", streamMetadata.getLeader(), stream);
+      if (forceReplica) {
+        throw new IllegalStateException(
+            format(
+                "Only the leader node is available for consuming from %s and "
+                    + "consuming from leader has been deactivated for this consumer",
+                stream));
+      } else {
+        brokers = Collections.singletonList(streamMetadata.getLeader());
+        LOGGER.debug(
+            "Only leader node {} for consuming from {}", streamMetadata.getLeader(), stream);
+      }
     } else {
       LOGGER.debug("Replicas for consuming from {}: {}", stream, replicas);
       brokers = new ArrayList<>(replicas);
@@ -265,6 +280,20 @@ List findBrokersForStream(String stream) {
     return brokers;
   }
 
+  private Callable> findBrokersForStream(String stream) {
+    AtomicInteger attemptNumber = new AtomicInteger();
+    return () -> {
+      boolean mustUseReplica;
+      if (forceReplica) {
+        mustUseReplica =
+            attemptNumber.incrementAndGet() <= MAX_ATTEMPT_BEFORE_FALLING_BACK_TO_LEADER;
+      } else {
+        mustUseReplica = false;
+      }
+      return findBrokersForStream(stream, mustUseReplica);
+    };
+  }
+
   private Client.Broker pickBroker(List brokers) {
     if (brokers.isEmpty()) {
       return null;
@@ -792,7 +821,7 @@ private void assignConsumersToStream(
             }
           };
 
-      AsyncRetry.asyncRetry(() -> findBrokersForStream(stream))
+      AsyncRetry.asyncRetry(findBrokersForStream(stream))
           .description("Candidate lookup to consume from '%s'", stream)
           .scheduler(environment.scheduledExecutorService())
           .retry(ex -> !(ex instanceof StreamDoesNotExistException))
@@ -885,9 +914,9 @@ private void recoverSubscription(List candidates, SubscriptionTracker tr
           // maybe not a good candidate, let's refresh and retry for this one
           candidates =
               Utils.callAndMaybeRetry(
-                  () -> findBrokersForStream(tracker.stream),
+                  findBrokersForStream(tracker.stream),
                   ex -> !(ex instanceof StreamDoesNotExistException),
-                  environment.recoveryBackOffDelayPolicy(),
+                  recoveryBackOffDelayPolicy(),
                   "Candidate lookup to consume from '%s' (subscription recovery)",
                   tracker.stream);
         } catch (Exception e) {
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
index 5492e68650..9ed4aecc47 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
@@ -208,7 +208,8 @@ class StreamEnvironment implements Environment {
             this,
             maxConsumersByConnection,
             connectionNamingStrategy,
-            Utils.coordinatorClientFactory(this));
+            Utils.coordinatorClientFactory(this),
+            false);
     this.offsetTrackingCoordinator = new OffsetTrackingCoordinator(this);
     ClientParameters clientParametersForInit = locatorParametersCopy();
     Runnable locatorInitSequence =
diff --git a/src/main/java/com/rabbitmq/stream/impl/Utils.java b/src/main/java/com/rabbitmq/stream/impl/Utils.java
index 93c2721928..6dfa23c719 100644
--- a/src/main/java/com/rabbitmq/stream/impl/Utils.java
+++ b/src/main/java/com/rabbitmq/stream/impl/Utils.java
@@ -207,7 +207,7 @@ static  Function namedFunction(Function task, String format, O
   }
 
   static  T callAndMaybeRetry(
-      Supplier operation, Predicate retryCondition, String format, Object... args) {
+      Callable operation, Predicate retryCondition, String format, Object... args) {
     return callAndMaybeRetry(
         operation,
         retryCondition,
@@ -217,7 +217,7 @@ static  T callAndMaybeRetry(
   }
 
   static  T callAndMaybeRetry(
-      Supplier operation,
+      Callable operation,
       Predicate retryCondition,
       BackOffDelayPolicy delayPolicy,
       String format,
@@ -230,7 +230,7 @@ static  T callAndMaybeRetry(
     while (keepTrying) {
       try {
         attempt++;
-        T result = operation.get();
+        T result = operation.call();
         Duration operationDuration = Duration.ofNanos(System.nanoTime() - startTime);
         LOGGER.debug(
             "Operation '{}' completed in {} ms after {} attempt(s)",
diff --git a/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java b/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java
index dd50ba4fe7..06851a55c7 100644
--- a/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java
@@ -20,6 +20,7 @@
 import static com.rabbitmq.stream.impl.TestUtils.namedConsumer;
 import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
 import static java.lang.String.format;
+import static java.util.Collections.emptyList;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
@@ -159,7 +160,8 @@ public Client.ClientParameters shutdownListener(
             environment,
             ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT,
             type -> "consumer-connection",
-            clientFactory);
+            clientFactory,
+            false);
   }
 
   @AfterEach
@@ -188,7 +190,8 @@ void tearDown() throws Exception {
             environment,
             ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT,
             type -> "consumer-connection",
-            cf);
+            cf,
+            false);
 
     when(locator.metadata("stream")).thenReturn(metadata(null, replica()));
     when(clientFactory.client(any())).thenReturn(client);
@@ -229,7 +232,8 @@ void shouldGetExactNodeImmediatelyWithAdvertisedHostNameClientFactoryAndExactNod
             environment,
             ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT,
             type -> "consumer-connection",
-            cf);
+            cf,
+            false);
 
     when(locator.metadata("stream")).thenReturn(metadata(null, replica()));
     when(clientFactory.client(any())).thenReturn(client);
@@ -397,13 +401,15 @@ void subscribeShouldThrowExceptionIfNoNodeAvailableForStream() {
   @Test
   void findBrokersForStreamShouldReturnLeaderIfNoReplicas() {
     when(locator.metadata("stream")).thenReturn(metadata(leader(), null));
-    assertThat(coordinator.findBrokersForStream("stream")).hasSize(1).contains(leader());
+    assertThat(coordinator.findBrokersForStream("stream", false)).hasSize(1).contains(leader());
   }
 
   @Test
   void findBrokersForStreamShouldReturnReplicasIfThereAreSome() {
     when(locator.metadata("stream")).thenReturn(metadata(null, replicas()));
-    assertThat(coordinator.findBrokersForStream("stream")).hasSize(2).hasSameElementsAs(replicas());
+    assertThat(coordinator.findBrokersForStream("stream", false))
+        .hasSize(2)
+        .hasSameElementsAs(replicas());
   }
 
   @Test
@@ -593,8 +599,8 @@ void shouldRedistributeConsumerIfConnectionIsLost() throws Exception {
     when(locator.metadata("stream"))
         .thenReturn(metadata(null, replica()))
         .thenReturn(metadata(null, replica())) // for the second consumer
-        .thenReturn(metadata(null, Collections.emptyList()))
-        .thenReturn(metadata(null, Collections.emptyList()))
+        .thenReturn(metadata(null, emptyList()))
+        .thenReturn(metadata(null, emptyList()))
         .thenReturn(metadata(null, replica()));
 
     when(clientFactory.client(any())).thenReturn(client);
@@ -865,8 +871,8 @@ void shouldRetryRedistributionIfMetadataIsNotUpdatedImmediately() throws Excepti
     when(consumer.isOpen()).thenReturn(true);
     when(locator.metadata("stream"))
         .thenReturn(metadata(null, replicas()))
-        .thenReturn(metadata(null, Collections.emptyList()))
-        .thenReturn(metadata(null, Collections.emptyList()))
+        .thenReturn(metadata(null, emptyList()))
+        .thenReturn(metadata(null, emptyList()))
         .thenReturn(metadata(null, replicas()));
 
     when(clientFactory.client(any())).thenReturn(client);
@@ -1248,7 +1254,7 @@ void shouldRestartWhereItLeftOffAfterDisruption(Consumer offsetSpecificationArgumentCaptor =
@@ -1321,7 +1327,7 @@ void shouldReUseInitialOffsetSpecificationAfterDisruptionIfNoMessagesReceived(
     when(consumer.isOpen()).thenReturn(true);
     when(locator.metadata("stream"))
         .thenReturn(metadata(null, replicas()))
-        .thenReturn(metadata(null, Collections.emptyList()))
+        .thenReturn(metadata(null, emptyList()))
         .thenReturn(metadata(null, replicas()));
 
     ArgumentCaptor offsetSpecificationArgumentCaptor =
@@ -1386,7 +1392,7 @@ void shouldUseStoredOffsetOnRecovery(Consumer configur
     when(consumer.isOpen()).thenReturn(true);
     when(locator.metadata("stream"))
         .thenReturn(metadata(null, replicas()))
-        .thenReturn(metadata(null, Collections.emptyList()))
+        .thenReturn(metadata(null, emptyList()))
         .thenReturn(metadata(null, replicas()));
 
     when(clientFactory.client(any())).thenReturn(client);
@@ -1788,6 +1794,105 @@ void consumerShouldBeCreatedProperlyIfManagerClientIsRetried() {
     assertThat(messageHandlerCalls.get()).isEqualTo(1);
   }
 
+  @Test
+  void shouldRetryUntilReplicaIsAvailableWhenForceReplicaIsOn() throws Exception {
+    scheduledExecutorService = createScheduledExecutorService();
+    when(environment.scheduledExecutorService()).thenReturn(scheduledExecutorService);
+    Duration retryDelay = Duration.ofMillis(100);
+    when(environment.recoveryBackOffDelayPolicy()).thenReturn(BackOffDelayPolicy.fixed(retryDelay));
+    when(consumer.isOpen()).thenReturn(true);
+    when(locator.metadata("stream"))
+        .thenReturn(metadata(leader(), replica()))
+        .thenReturn(metadata(leader(), emptyList()));
+
+    when(clientFactory.client(any())).thenReturn(client);
+    AtomicInteger subscriptionCount = new AtomicInteger(0);
+    when(client.subscribe(
+            subscriptionIdCaptor.capture(),
+            anyString(),
+            any(OffsetSpecification.class),
+            anyInt(),
+            anyMap()))
+        .thenAnswer(
+            invocation -> {
+              subscriptionCount.incrementAndGet();
+              return new Client.Response(Constants.RESPONSE_CODE_OK);
+            });
+
+    coordinator =
+        new ConsumersCoordinator(
+            environment,
+            ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT,
+            type -> "consumer-connection",
+            clientFactory,
+            true);
+
+    AtomicInteger messageHandlerCalls = new AtomicInteger();
+    Runnable closingRunnable =
+        coordinator.subscribe(
+            consumer,
+            "stream",
+            OffsetSpecification.first(),
+            null,
+            NO_OP_SUBSCRIPTION_LISTENER,
+            NO_OP_TRACKING_CLOSING_CALLBACK,
+            (offset, message) -> messageHandlerCalls.incrementAndGet(),
+            Collections.emptyMap(),
+            flowStrategy());
+    verify(clientFactory, times(1)).client(any());
+    verify(client, times(1))
+        .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap());
+
+    assertThat(messageHandlerCalls.get()).isEqualTo(0);
+    messageListener.handle(
+        subscriptionIdCaptor.getAllValues().get(0),
+        1,
+        0,
+        0,
+        null,
+        new WrapperMessageBuilder().build());
+    assertThat(messageHandlerCalls.get()).isEqualTo(1);
+
+    verify(client, times(1))
+        .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap());
+
+    shutdownListener.handle(
+        new Client.ShutdownContext(Client.ShutdownContext.ShutdownReason.UNKNOWN));
+
+    waitAtMost(() -> subscriptionCount.get() == 2);
+
+    // metadata calls: the first registration, then the max number of attempts to get a replica,
+    // then the last attempt that falls back to the leader
+    verify(locator, times(1 + ConsumersCoordinator.MAX_ATTEMPT_BEFORE_FALLING_BACK_TO_LEADER + 1))
+        .metadata("stream");
+
+    // the consumer connection should be reset after the connection disruption
+    verify(consumer, times(1)).setSubscriptionClient(isNull());
+
+    verify(client, times(2))
+        .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap());
+
+    assertThat(messageHandlerCalls.get()).isEqualTo(1);
+    messageListener.handle(
+        subscriptionIdCaptor.getAllValues().get(0),
+        0,
+        0,
+        0,
+        null,
+        new WrapperMessageBuilder().build());
+    assertThat(messageHandlerCalls.get()).isEqualTo(2);
+
+    when(client.unsubscribe(subscriptionIdCaptor.getValue()))
+        .thenReturn(new Client.Response(Constants.RESPONSE_CODE_OK));
+
+    closingRunnable.run();
+    verify(client, times(1)).unsubscribe(subscriptionIdCaptor.getValue());
+
+    messageListener.handle(
+        subscriptionIdCaptor.getValue(), 0, 0, 0, null, new WrapperMessageBuilder().build());
+    assertThat(messageHandlerCalls.get()).isEqualTo(2);
+  }
+
   Client.Broker leader() {
     return new Client.Broker("leader", -1);
   }

From 07544839f1cd8bf2a4b0c859975b8316513777ca Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= 
Date: Fri, 22 Sep 2023 09:15:38 +0200
Subject: [PATCH 2/2] Add forceReplicaForConsumer flag

To force the use of a replica for consumers. The library
tries 5 times before falling back to the stream leader when
the flag is set to true (default is false, to favor faster
recovery).

The stream leader is started first and a replica can start
a few seconds later, so with bad timing a consumer can see
only the leader when it recovers. Setting the flag to true
will force the library to retry a few times to use a replica
if one becomes available.

The performance tool has also the corresponding flag now:
--force-replica-for-consumers.

This commit also adds the following options to the performance tool:
--hearbeat, --connection-recovery-interval,
--topology-recovery-interval.

Fixes #426, #427
---
 src/docs/asciidoc/api.adoc                    | 10 +++
 .../rabbitmq/stream/EnvironmentBuilder.java   | 30 ++++++++-
 .../java/com/rabbitmq/stream/impl/Client.java |  8 ++-
 .../stream/impl/ConsumersCoordinator.java     |  2 +
 .../stream/impl/ProducersCoordinator.java     | 14 +++--
 .../stream/impl/StreamEnvironment.java        |  5 +-
 .../stream/impl/StreamEnvironmentBuilder.java | 10 ++-
 .../rabbitmq/stream/perf/StreamPerfTest.java  | 63 ++++++++++++-------
 .../java/com/rabbitmq/stream/perf/Utils.java  | 43 ++++++++++++-
 .../impl/StreamEnvironmentUnitTest.java       |  9 ++-
 .../com/rabbitmq/stream/perf/UtilsTest.java   | 26 ++++++++
 11 files changed, 184 insertions(+), 36 deletions(-)

diff --git a/src/docs/asciidoc/api.adoc b/src/docs/asciidoc/api.adoc
index 16074f3246..beb736c8ac 100644
--- a/src/docs/asciidoc/api.adoc
+++ b/src/docs/asciidoc/api.adoc
@@ -204,6 +204,16 @@ a new connection is open. The value must be between 1 and 255.
 |To delay the connection opening until necessary.
 |false
 
+|`requestedHeartbeat`
+|Heartbeat requested by the client.
+|60 seconds
+
+|`forceReplicaForConsumers`
+|Retry connecting until a replica is available for consumers.
+The client retries 5 times before falling back to the stream leader node.
+Set to `true` only for clustered environments, not for 1-node environments, where only the stream leader is available.
+|`false`
+
 |`id`
 |Informational ID for the environment instance.
 Used as a prefix for connection names.
diff --git a/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java b/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java
index da03059ef2..fe632f5f55 100644
--- a/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java
+++ b/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java
@@ -183,7 +183,7 @@ public interface EnvironmentBuilder {
   EnvironmentBuilder virtualHost(String virtualHost);
 
   /**
-   * The hearbeat to request.
+   * The heartbeat to request.
    *
    * 

Default is 60 seconds. * @@ -322,6 +322,34 @@ EnvironmentBuilder topologyUpdateBackOffDelayPolicy( */ EnvironmentBuilder lazyInitialization(boolean lazy); + /** + * Flag to force the connection to a stream replica for consumers. + * + *

The library will always prefer to connect to a stream replica to consume from, but it will + * fall back to the stream leader if no replica is available. This is the default behavior. Set + * this flag to true to make the library wait for a replica to become available if + * only the stream leader is available. This can lead to longer recovery time but help to offload + * a stream leader and let it deal only with write requests. + * + *

Note the library performs only 5 attempts to locate a replica before falling back to the + * leader when the flag is set to true. + * + *

The {@link #recoveryBackOffDelayPolicy(BackOffDelayPolicy)} and {@link + * #topologyUpdateBackOffDelayPolicy(BackOffDelayPolicy)} policies control the time between + * attempts. + * + *

Do not set this flag to true when streams have only 1 member (the leader), + * e.g. for local development. + * + *

Default is false. + * + * @param forceReplica whether to force the connection to a replica or not + * @return this builder instance + * @see #recoveryBackOffDelayPolicy(BackOffDelayPolicy) + * @see #topologyUpdateBackOffDelayPolicy(BackOffDelayPolicy) + */ + EnvironmentBuilder forceReplicaForConsumers(boolean forceReplica); + /** * Create the {@link Environment} instance. * diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index d9c4adf29b..4d2cef6c6f 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -185,6 +185,7 @@ public long applyAsLong(Object value) { FlushConsolidationHandler.class.getSimpleName(); private final String NETTY_HANDLER_STREAM = StreamHandler.class.getSimpleName(); private final String host; + private final String clientConnectionName; private final int port; private final Map serverProperties; private final Map connectionProperties; @@ -313,6 +314,7 @@ public void initChannel(SocketChannel ch) { f = b.connect(parameters.host, parameters.port).sync(); this.host = parameters.host; this.port = parameters.port; + this.clientConnectionName = clientConnectionName; } catch (Exception e) { String message = format( @@ -2696,7 +2698,11 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { if (evt instanceof IdleStateEvent) { IdleStateEvent e = (IdleStateEvent) evt; if (e.state() == IdleState.READER_IDLE) { - LOGGER.info("Closing connection because it's been idle for too long"); + LOGGER.info( + "Closing connection {} on {}:{} because it's been idle for too long", + clientConnectionName, + host, + port); closing.set(true); closingSequence(ShutdownContext.ShutdownReason.HEARTBEAT_FAILURE); } else if (e.state() == IdleState.WRITER_IDLE) { diff --git a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java index 81de6f8c07..2314434ad3 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java +++ b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java @@ -290,6 +290,8 @@ private Callable> findBrokersForStream(String stream) { } else { mustUseReplica = false; } + LOGGER.debug( + "Looking for broker(s) for stream {}, forcing replica {}", stream, mustUseReplica); return findBrokersForStream(stream, mustUseReplica); }; } diff --git a/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java b/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java index 254620626c..9971cf7ebd 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java +++ b/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java @@ -162,7 +162,7 @@ private void addToManager(Broker node, AgentTracker tracker) { try { pickedManager.register(tracker); LOGGER.debug( - "Assigned {} tracker {} (stream '{}') to manager {} (node {}), subscription ID {}", + "Assigned {} tracker {} (stream '{}') to manager {} (node {}), publisher ID {}", tracker.type(), tracker.uniqueId(), tracker.stream(), @@ -605,8 +605,9 @@ private ClientProducersManager( } if (shutdownContext.isShutdownUnexpected()) { LOGGER.debug( - "Recovering {} producer(s) after unexpected connection termination", - producers.size()); + "Recovering {} producer(s) and {} tracking consumer(s) after unexpected connection termination", + producers.size(), + trackingConsumerTrackers.size()); producers.forEach((publishingId, tracker) -> tracker.unavailable()); trackingConsumerTrackers.forEach(AgentTracker::unavailable); // execute in thread pool to free the IO thread @@ -701,7 +702,8 @@ private void assignProducersToNewManagers( .thenAccept( broker -> { String key = keyForNode(broker); - LOGGER.debug("Assigning {} producer(s) to {}", trackers.size(), key); + LOGGER.debug( + "Assigning {} producer(s) and consumer tracker(s) to {}", trackers.size(), key); trackers.forEach(tracker -> maybeRecoverAgent(broker, tracker)); }) .exceptionally( @@ -767,10 +769,10 @@ private void recoverAgent(Broker node, AgentTracker tracker) { | ClientClosedException | StreamNotAvailableException e) { LOGGER.debug( - "{} re-assignment on stream {} timed out or connection closed or stream not available, " + "{} re-assignment on stream {} (ID {}) timed out or connection closed or stream not available, " + "refreshing candidate leader and retrying", tracker.type(), - tracker.id(), + tracker.identifiable() ? tracker.id() : "N/A", tracker.stream()); // maybe not a good candidate, let's refresh and retry for this one node = diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java index 9ed4aecc47..1c190546d2 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java @@ -101,7 +101,8 @@ class StreamEnvironment implements Environment { boolean lazyInit, Function connectionNamingStrategy, Function clientFactory, - ObservationCollector observationCollector) { + ObservationCollector observationCollector, + boolean forceReplicaForConsumers) { this.recoveryBackOffDelayPolicy = recoveryBackOffDelayPolicy; this.topologyUpdateBackOffDelayPolicy = topologyBackOffDelayPolicy; this.byteBufAllocator = byteBufAllocator; @@ -209,7 +210,7 @@ class StreamEnvironment implements Environment { maxConsumersByConnection, connectionNamingStrategy, Utils.coordinatorClientFactory(this), - false); + forceReplicaForConsumers); this.offsetTrackingCoordinator = new OffsetTrackingCoordinator(this); ClientParameters clientParametersForInit = locatorParametersCopy(); Runnable locatorInitSequence = diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java index e3392c893a..ef6e3e87a7 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java @@ -63,6 +63,7 @@ public class StreamEnvironmentBuilder implements EnvironmentBuilder { private int maxConsumersByConnection = ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT; private CompressionCodecFactory compressionCodecFactory; private boolean lazyInit = false; + private boolean forceReplicaForConsumers = false; private Function clientFactory = Client::new; private ObservationCollector observationCollector = ObservationCollector.NO_OP; @@ -266,6 +267,12 @@ public EnvironmentBuilder lazyInitialization(boolean lazy) { return this; } + @Override + public EnvironmentBuilder forceReplicaForConsumers(boolean forceReplica) { + this.forceReplicaForConsumers = forceReplica; + return this; + } + @Override public TlsConfiguration tls() { this.tls.enable(); @@ -318,7 +325,8 @@ public Environment build() { lazyInit, connectionNamingStrategy, this.clientFactory, - this.observationCollector); + this.observationCollector, + this.forceReplicaForConsumers); } static final class DefaultTlsConfiguration implements TlsConfiguration { diff --git a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java index 088c94b9e9..5e387d926f 100644 --- a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java +++ b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java @@ -22,24 +22,9 @@ import com.google.common.util.concurrent.RateLimiter; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; -import com.rabbitmq.stream.Address; -import com.rabbitmq.stream.AddressResolver; -import com.rabbitmq.stream.ByteCapacity; -import com.rabbitmq.stream.Codec; -import com.rabbitmq.stream.ConfirmationHandler; -import com.rabbitmq.stream.Constants; -import com.rabbitmq.stream.Consumer; -import com.rabbitmq.stream.ConsumerBuilder; -import com.rabbitmq.stream.Environment; -import com.rabbitmq.stream.EnvironmentBuilder; +import com.rabbitmq.stream.*; import com.rabbitmq.stream.EnvironmentBuilder.TlsConfiguration; -import com.rabbitmq.stream.MessageBuilder; -import com.rabbitmq.stream.OffsetSpecification; -import com.rabbitmq.stream.Producer; -import com.rabbitmq.stream.ProducerBuilder; -import com.rabbitmq.stream.StreamCreator; import com.rabbitmq.stream.StreamCreator.LeaderLocator; -import com.rabbitmq.stream.StreamException; import com.rabbitmq.stream.codec.QpidProtonCodec; import com.rabbitmq.stream.codec.SimpleCodec; import com.rabbitmq.stream.compression.Compression; @@ -443,6 +428,12 @@ public class StreamPerfTest implements Callable { split = ",") private List filterValues; + @CommandLine.Option( + names = {"--force-replica-for-consumers", "-frfc"}, + description = "force the connection to a replica for consumers", + defaultValue = "false") + private boolean forceReplicaForConsumers; + static class InstanceSyncOptions { @CommandLine.Option( @@ -484,6 +475,33 @@ static class InstanceSyncOptions { converter = Utils.NotNegativeIntegerTypeConverter.class) private int initialCredits; + @CommandLine.Option( + names = {"--heartbeat", "-b"}, + description = "requested heartbeat in seconds", + defaultValue = "60", + converter = Utils.GreaterThanOrEqualToZeroIntegerTypeConverter.class) + private int heartbeat; + + @CommandLine.Option( + names = {"--connection-recovery-interval", "-cri"}, + description = + "connection recovery interval in seconds. " + + "Examples: 5 for a fixed delay of 5 seconds, 5:10 for a first attempt after 5 seconds then " + + "10 seconds between attempts.", + defaultValue = "5", + converter = Utils.BackOffDelayPolicyTypeConverter.class) + private BackOffDelayPolicy recoveryBackOffDelayPolicy; + + @CommandLine.Option( + names = {"--topology-recovery-interval", "-tri"}, + description = + "topology recovery interval in seconds. " + + "Examples: 5 for a fixed delay of 5 seconds, 5:10 for a first attempt after 5 seconds then " + + "10 seconds between attempts.", + defaultValue = "5:1", + converter = Utils.BackOffDelayPolicyTypeConverter.class) + private BackOffDelayPolicy topologyBackOffDelayPolicy; + private MetricsCollector metricsCollector; private PerformanceMetrics performanceMetrics; private List monitorings; @@ -747,7 +765,11 @@ public Integer call() throws Exception { .maxTrackingConsumersByConnection(this.trackingConsumersByConnection) .maxConsumersByConnection(this.consumersByConnection) .rpcTimeout(Duration.ofSeconds(this.rpcTimeout)) - .requestedMaxFrameSize((int) this.requestedMaxFrameSize.toBytes()); + .requestedMaxFrameSize((int) this.requestedMaxFrameSize.toBytes()) + .forceReplicaForConsumers(this.forceReplicaForConsumers) + .requestedHeartbeat(Duration.ofSeconds(this.heartbeat)) + .recoveryBackOffDelayPolicy(this.recoveryBackOffDelayPolicy) + .topologyUpdateBackOffDelayPolicy(this.topologyBackOffDelayPolicy); if (addrResolver != null) { environmentBuilder = environmentBuilder.addressResolver(addrResolver); @@ -978,7 +1000,7 @@ public Integer call() throws Exception { final int msgSize = this.messageSize; try { - while (true && !Thread.currentThread().isInterrupted()) { + while (!Thread.currentThread().isInterrupted()) { rateLimiterCallback.run(); // Using current time for interoperability with other tools // and also across different processes. @@ -993,9 +1015,8 @@ public Integer call() throws Exception { messageBuilder.addData(payload).build(), confirmationHandler); } } catch (Exception e) { - if (e instanceof InterruptedException - || (e.getCause() != null - && e.getCause() instanceof InterruptedException)) { + if (e.getCause() != null + && e.getCause() instanceof InterruptedException) { LOGGER.info("Publisher #{} thread interrupted", i, e); } else { LOGGER.warn("Publisher #{} crashed", i, e); diff --git a/src/main/java/com/rabbitmq/stream/perf/Utils.java b/src/main/java/com/rabbitmq/stream/perf/Utils.java index 1d38ab3cee..c24b01a7f5 100644 --- a/src/main/java/com/rabbitmq/stream/perf/Utils.java +++ b/src/main/java/com/rabbitmq/stream/perf/Utils.java @@ -13,6 +13,8 @@ // info@rabbitmq.com. package com.rabbitmq.stream.perf; +import static java.time.Duration.ofSeconds; + import com.codahale.metrics.MetricRegistry; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; @@ -20,6 +22,7 @@ import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.SocketConfigurator; import com.rabbitmq.client.SocketConfigurators; +import com.rabbitmq.stream.BackOffDelayPolicy; import com.rabbitmq.stream.ByteCapacity; import com.rabbitmq.stream.OffsetSpecification; import com.rabbitmq.stream.StreamCreator.LeaderLocator; @@ -676,6 +679,44 @@ public Integer convert(String input) { } } + static class BackOffDelayPolicyTypeConverter + implements CommandLine.ITypeConverter { + + @Override + public BackOffDelayPolicy convert(String input) { + if (input == null || input.trim().isEmpty()) { + typeConversionException("Value for back-off delay policy cannot be empty"); + } + String[] values = input.split(":"); + if (values.length != 1 && values.length != 2) { + typeConversionException("Invalid value for back-off delay policy: " + input); + } + int firstAttempt = 0, nextAttempts = 0; + try { + firstAttempt = Integer.parseInt(values[0]); + if (firstAttempt <= 0) { + throw new IllegalArgumentException(); + } + } catch (Exception e) { + typeConversionException("Invalid value for back-off delay policy: " + input); + } + if (values.length == 2) { + try { + nextAttempts = Integer.parseInt(values[1]); + if (nextAttempts <= 0) { + throw new IllegalArgumentException(); + } + } catch (Exception e) { + typeConversionException("Invalid value for back-off delay policy: " + input); + } + } else { + nextAttempts = firstAttempt; + } + return BackOffDelayPolicy.fixedWithInitialDelay( + ofSeconds(firstAttempt), ofSeconds(nextAttempts)); + } + } + private static void typeConversionException(String message) { throw new TypeConversionException(message); } @@ -820,7 +861,7 @@ protected DistributionSummary createChunkSizeDistributionSummary( .tags(tags) .description("chunk size") .publishPercentiles(0.5, 0.75, 0.95, 0.99) - .distributionStatisticExpiry(Duration.ofSeconds(1)) + .distributionStatisticExpiry(ofSeconds(1)) .serviceLevelObjectives() .register(registry); } diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java index e20f9c520b..7b09cbb72b 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java @@ -96,7 +96,8 @@ Client.ClientParameters duplicate() { false, type -> "locator-connection", cf, - ObservationCollector.NO_OP); + ObservationCollector.NO_OP, + false); } @AfterEach @@ -160,7 +161,8 @@ void shouldTryUrisOnInitializationFailure() throws Exception { false, type -> "locator-connection", cf, - ObservationCollector.NO_OP); + ObservationCollector.NO_OP, + false); verify(cf, times(3)).apply(any(Client.ClientParameters.class)); } @@ -187,7 +189,8 @@ void shouldNotOpenConnectionWhenLazyInitIsEnabled( lazyInit, type -> "locator-connection", cf, - ObservationCollector.NO_OP); + ObservationCollector.NO_OP, + false); verify(cf, times(expectedConnectionCreation)).apply(any(Client.ClientParameters.class)); } diff --git a/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java b/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java index a6e09fd0bd..3b1ea0ba16 100644 --- a/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java +++ b/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java @@ -21,6 +21,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.params.provider.Arguments.of; +import com.rabbitmq.stream.BackOffDelayPolicy; import com.rabbitmq.stream.OffsetSpecification; import com.rabbitmq.stream.compression.Compression; import com.rabbitmq.stream.perf.Utils.CompressionTypeConverter; @@ -30,6 +31,7 @@ import com.rabbitmq.stream.perf.Utils.RangeTypeConverter; import com.rabbitmq.stream.perf.Utils.SniServerNamesConverter; import io.micrometer.core.instrument.Tag; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -389,6 +391,30 @@ void filterValueSetConverter() throws Exception { assertThat(converter.convert("5..10")).hasSize(6).contains("5", "6", "10"); } + @ParameterizedTest + @ValueSource(strings = {"foo", "foo:bar", "0", "1:0", "-1", "1:-2", "1:foo"}) + void backOffDelayPolicyConverterKo(String input) { + CommandLine.ITypeConverter converter = + new Utils.BackOffDelayPolicyTypeConverter(); + assertThatThrownBy(() -> converter.convert(input)).isInstanceOf(TypeConversionException.class); + } + + @ParameterizedTest + @CsvSource({"5,0:5|1:5|4:5", "5:10,0:5|1:10|4:10"}) + void backOffDelayPolicyConverterOk(String input, String expectations) throws Exception { + CommandLine.ITypeConverter converter = + new Utils.BackOffDelayPolicyTypeConverter(); + BackOffDelayPolicy policy = converter.convert(input); + Arrays.stream(expectations.split("\\|")) + .map(s -> s.split(":")) + .forEach( + attemptDelay -> { + int attempt = Integer.parseInt(attemptDelay[0]); + Duration expectedDelay = Duration.ofSeconds(Long.parseLong(attemptDelay[1])); + assertThat(policy.delay(attempt)).isEqualTo(expectedDelay); + }); + } + @Command(name = "test-command") static class TestCommand {