Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/docs/asciidoc/api.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,16 @@ a new connection is open. The value must be between 1 and 255.
|To delay the connection opening until necessary.
|false

|`requestedHeartbeat`
|Heartbeat requested by the client.
|60 seconds

|`forceReplicaForConsumers`
|Retry connecting until a replica is available for consumers.
The client retries 5 times before falling back to the stream leader node.
Set to `true` only for clustered environments, not for 1-node environments, where only the stream leader is available.
|`false`

|`id`
|Informational ID for the environment instance.
Used as a prefix for connection names.
Expand Down
35 changes: 31 additions & 4 deletions src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,8 @@ public interface EnvironmentBuilder {
* <p><i>The default implementation is overridden automatically if the following conditions are
* met: the host to connect to is <code>localhost</code>, the user is <code>guest</code>, and no
* address resolver has been provided. The client will then always tries to connect to <code>
* localhost</code> to facilitate local development.
* Just provide a pass-through address resolver to avoid this
* behavior, e.g.:</i>
* localhost</code> to facilitate local development. Just provide a pass-through address resolver
* to avoid this behavior, e.g.:</i>
*
* <pre>
* Environment.builder()
Expand Down Expand Up @@ -184,7 +183,7 @@ public interface EnvironmentBuilder {
EnvironmentBuilder virtualHost(String virtualHost);

/**
* The hearbeat to request.
* The heartbeat to request.
*
* <p>Default is 60 seconds.
*
Expand Down Expand Up @@ -323,6 +322,34 @@ EnvironmentBuilder topologyUpdateBackOffDelayPolicy(
*/
EnvironmentBuilder lazyInitialization(boolean lazy);

/**
* Flag to force the connection to a stream replica for consumers.
*
* <p>The library will always prefer to connect to a stream replica to consume from, but it will
* fall back to the stream leader if no replica is available. This is the default behavior. Set
* this flag to <code>true</code> to make the library wait for a replica to become available if
* only the stream leader is available. This can lead to longer recovery time but help to offload
* a stream leader and let it deal only with write requests.
*
* <p>Note the library performs only 5 attempts to locate a replica before falling back to the
* leader when the flag is set to <code>true</code>.
*
* <p>The {@link #recoveryBackOffDelayPolicy(BackOffDelayPolicy)} and {@link
* #topologyUpdateBackOffDelayPolicy(BackOffDelayPolicy)} policies control the time between
* attempts.
*
* <p><b>Do not set this flag to <code>true</code> when streams have only 1 member (the leader),
* e.g. for local development.</b>
*
* <p>Default is false.
*
* @param forceReplica whether to force the connection to a replica or not
* @return this builder instance
* @see #recoveryBackOffDelayPolicy(BackOffDelayPolicy)
* @see #topologyUpdateBackOffDelayPolicy(BackOffDelayPolicy)
*/
EnvironmentBuilder forceReplicaForConsumers(boolean forceReplica);

/**
* Create the {@link Environment} instance.
*
Expand Down
8 changes: 7 additions & 1 deletion src/main/java/com/rabbitmq/stream/impl/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ public long applyAsLong(Object value) {
FlushConsolidationHandler.class.getSimpleName();
private final String NETTY_HANDLER_STREAM = StreamHandler.class.getSimpleName();
private final String host;
private final String clientConnectionName;
private final int port;
private final Map<String, String> serverProperties;
private final Map<String, String> connectionProperties;
Expand Down Expand Up @@ -313,6 +314,7 @@ public void initChannel(SocketChannel ch) {
f = b.connect(parameters.host, parameters.port).sync();
this.host = parameters.host;
this.port = parameters.port;
this.clientConnectionName = clientConnectionName;
} catch (Exception e) {
String message =
format(
Expand Down Expand Up @@ -2696,7 +2698,11 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.READER_IDLE) {
LOGGER.info("Closing connection because it's been idle for too long");
LOGGER.info(
"Closing connection {} on {}:{} because it's been idle for too long",
clientConnectionName,
host,
port);
closing.set(true);
closingSequence(ShutdownContext.ShutdownReason.HEARTBEAT_FAILURE);
} else if (e.state() == IdleState.WRITER_IDLE) {
Expand Down
57 changes: 44 additions & 13 deletions src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.rabbitmq.stream.impl.Utils.namedFunction;
import static com.rabbitmq.stream.impl.Utils.namedRunnable;
import static com.rabbitmq.stream.impl.Utils.quote;
import static java.lang.String.format;

import com.rabbitmq.stream.*;
import com.rabbitmq.stream.Consumer;
Expand All @@ -41,10 +42,12 @@
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.*;
Expand All @@ -56,6 +59,7 @@
class ConsumersCoordinator {

static final int MAX_SUBSCRIPTIONS_PER_CLIENT = 256;
static final int MAX_ATTEMPT_BEFORE_FALLING_BACK_TO_LEADER = 5;

static final OffsetSpecification DEFAULT_OFFSET_SPECIFICATION = OffsetSpecification.next();

Expand All @@ -74,16 +78,19 @@ class ConsumersCoordinator {
private final ExecutorServiceFactory executorServiceFactory =
new DefaultExecutorServiceFactory(
Runtime.getRuntime().availableProcessors(), 10, "rabbitmq-stream-consumer-connection-");
private final boolean forceReplica;

ConsumersCoordinator(
StreamEnvironment environment,
int maxConsumersByConnection,
Function<ClientConnectionType, String> connectionNamingStrategy,
ClientFactory clientFactory) {
ClientFactory clientFactory,
boolean forceReplica) {
this.environment = environment;
this.clientFactory = clientFactory;
this.maxConsumersByConnection = maxConsumersByConnection;
this.connectionNamingStrategy = connectionNamingStrategy;
this.forceReplica = forceReplica;
}

private static String keyForClientSubscription(Client.Broker broker) {
Expand All @@ -108,7 +115,7 @@ Runnable subscribe(
MessageHandler messageHandler,
Map<String, String> subscriptionProperties,
ConsumerFlowStrategy flowStrategy) {
List<Client.Broker> candidates = findBrokersForStream(stream);
List<Client.Broker> candidates = findBrokersForStream(stream, forceReplica);
Client.Broker newNode = pickBroker(candidates);
if (newNode == null) {
throw new IllegalStateException("No available node to subscribe to");
Expand Down Expand Up @@ -201,11 +208,8 @@ private void addToManager(
// manager connection is dead or stream not available
// scheduling manager closing if necessary in another thread to avoid blocking this one
if (pickedManager.isEmpty()) {
ClientSubscriptionsManager manager = pickedManager;
ConsumersCoordinator.this.environment.execute(
() -> {
manager.closeIfEmpty();
},
pickedManager::closeIfEmpty,
"Consumer manager closing after timeout, consumer %d on stream '%s'",
tracker.consumer.id(),
tracker.stream);
Expand All @@ -225,12 +229,14 @@ int managerCount() {
}

// package protected for testing
List<Client.Broker> findBrokersForStream(String stream) {
List<Client.Broker> findBrokersForStream(String stream, boolean forceReplica) {
LOGGER.debug(
"Candidate lookup to consumer from '{}', forcing replica? {}", stream, forceReplica);
Map<String, Client.StreamMetadata> metadata =
this.environment.locatorOperation(
namedFunction(
c -> c.metadata(stream), "Candidate lookup to consume from '%s'", stream));
if (metadata.size() == 0 || metadata.get(stream) == null) {
if (metadata.isEmpty() || metadata.get(stream) == null) {
// this is not supposed to happen
throw new StreamDoesNotExistException(stream);
}
Expand All @@ -253,8 +259,17 @@ List<Client.Broker> findBrokersForStream(String stream) {

List<Client.Broker> brokers;
if (replicas == null || replicas.isEmpty()) {
brokers = Collections.singletonList(streamMetadata.getLeader());
LOGGER.debug("Only leader node {} for consuming from {}", streamMetadata.getLeader(), stream);
if (forceReplica) {
throw new IllegalStateException(
format(
"Only the leader node is available for consuming from %s and "
+ "consuming from leader has been deactivated for this consumer",
stream));
} else {
brokers = Collections.singletonList(streamMetadata.getLeader());
LOGGER.debug(
"Only leader node {} for consuming from {}", streamMetadata.getLeader(), stream);
}
} else {
LOGGER.debug("Replicas for consuming from {}: {}", stream, replicas);
brokers = new ArrayList<>(replicas);
Expand All @@ -265,6 +280,22 @@ List<Client.Broker> findBrokersForStream(String stream) {
return brokers;
}

private Callable<List<Broker>> findBrokersForStream(String stream) {
AtomicInteger attemptNumber = new AtomicInteger();
return () -> {
boolean mustUseReplica;
if (forceReplica) {
mustUseReplica =
attemptNumber.incrementAndGet() <= MAX_ATTEMPT_BEFORE_FALLING_BACK_TO_LEADER;
} else {
mustUseReplica = false;
}
LOGGER.debug(
"Looking for broker(s) for stream {}, forcing replica {}", stream, mustUseReplica);
return findBrokersForStream(stream, mustUseReplica);
};
}

private Client.Broker pickBroker(List<Client.Broker> brokers) {
if (brokers.isEmpty()) {
return null;
Expand Down Expand Up @@ -792,7 +823,7 @@ private void assignConsumersToStream(
}
};

AsyncRetry.asyncRetry(() -> findBrokersForStream(stream))
AsyncRetry.asyncRetry(findBrokersForStream(stream))
.description("Candidate lookup to consume from '%s'", stream)
.scheduler(environment.scheduledExecutorService())
.retry(ex -> !(ex instanceof StreamDoesNotExistException))
Expand Down Expand Up @@ -885,9 +916,9 @@ private void recoverSubscription(List<Broker> candidates, SubscriptionTracker tr
// maybe not a good candidate, let's refresh and retry for this one
candidates =
Utils.callAndMaybeRetry(
() -> findBrokersForStream(tracker.stream),
findBrokersForStream(tracker.stream),
ex -> !(ex instanceof StreamDoesNotExistException),
environment.recoveryBackOffDelayPolicy(),
recoveryBackOffDelayPolicy(),
"Candidate lookup to consume from '%s' (subscription recovery)",
tracker.stream);
} catch (Exception e) {
Expand Down
14 changes: 8 additions & 6 deletions src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private void addToManager(Broker node, AgentTracker tracker) {
try {
pickedManager.register(tracker);
LOGGER.debug(
"Assigned {} tracker {} (stream '{}') to manager {} (node {}), subscription ID {}",
"Assigned {} tracker {} (stream '{}') to manager {} (node {}), publisher ID {}",
tracker.type(),
tracker.uniqueId(),
tracker.stream(),
Expand Down Expand Up @@ -605,8 +605,9 @@ private ClientProducersManager(
}
if (shutdownContext.isShutdownUnexpected()) {
LOGGER.debug(
"Recovering {} producer(s) after unexpected connection termination",
producers.size());
"Recovering {} producer(s) and {} tracking consumer(s) after unexpected connection termination",
producers.size(),
trackingConsumerTrackers.size());
producers.forEach((publishingId, tracker) -> tracker.unavailable());
trackingConsumerTrackers.forEach(AgentTracker::unavailable);
// execute in thread pool to free the IO thread
Expand Down Expand Up @@ -701,7 +702,8 @@ private void assignProducersToNewManagers(
.thenAccept(
broker -> {
String key = keyForNode(broker);
LOGGER.debug("Assigning {} producer(s) to {}", trackers.size(), key);
LOGGER.debug(
"Assigning {} producer(s) and consumer tracker(s) to {}", trackers.size(), key);
trackers.forEach(tracker -> maybeRecoverAgent(broker, tracker));
})
.exceptionally(
Expand Down Expand Up @@ -767,10 +769,10 @@ private void recoverAgent(Broker node, AgentTracker tracker) {
| ClientClosedException
| StreamNotAvailableException e) {
LOGGER.debug(
"{} re-assignment on stream {} timed out or connection closed or stream not available, "
"{} re-assignment on stream {} (ID {}) timed out or connection closed or stream not available, "
+ "refreshing candidate leader and retrying",
tracker.type(),
tracker.id(),
tracker.identifiable() ? tracker.id() : "N/A",
tracker.stream());
// maybe not a good candidate, let's refresh and retry for this one
node =
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ class StreamEnvironment implements Environment {
boolean lazyInit,
Function<ClientConnectionType, String> connectionNamingStrategy,
Function<Client.ClientParameters, Client> clientFactory,
ObservationCollector<?> observationCollector) {
ObservationCollector<?> observationCollector,
boolean forceReplicaForConsumers) {
this.recoveryBackOffDelayPolicy = recoveryBackOffDelayPolicy;
this.topologyUpdateBackOffDelayPolicy = topologyBackOffDelayPolicy;
this.byteBufAllocator = byteBufAllocator;
Expand Down Expand Up @@ -208,7 +209,8 @@ class StreamEnvironment implements Environment {
this,
maxConsumersByConnection,
connectionNamingStrategy,
Utils.coordinatorClientFactory(this));
Utils.coordinatorClientFactory(this),
forceReplicaForConsumers);
this.offsetTrackingCoordinator = new OffsetTrackingCoordinator(this);
ClientParameters clientParametersForInit = locatorParametersCopy();
Runnable locatorInitSequence =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class StreamEnvironmentBuilder implements EnvironmentBuilder {
private int maxConsumersByConnection = ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT;
private CompressionCodecFactory compressionCodecFactory;
private boolean lazyInit = false;
private boolean forceReplicaForConsumers = false;
private Function<Client.ClientParameters, Client> clientFactory = Client::new;
private ObservationCollector<?> observationCollector = ObservationCollector.NO_OP;

Expand Down Expand Up @@ -266,6 +267,12 @@ public EnvironmentBuilder lazyInitialization(boolean lazy) {
return this;
}

@Override
public EnvironmentBuilder forceReplicaForConsumers(boolean forceReplica) {
this.forceReplicaForConsumers = forceReplica;
return this;
}

@Override
public TlsConfiguration tls() {
this.tls.enable();
Expand Down Expand Up @@ -318,7 +325,8 @@ public Environment build() {
lazyInit,
connectionNamingStrategy,
this.clientFactory,
this.observationCollector);
this.observationCollector,
this.forceReplicaForConsumers);
}

static final class DefaultTlsConfiguration implements TlsConfiguration {
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/rabbitmq/stream/impl/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ static <T, R> Function<T, R> namedFunction(Function<T, R> task, String format, O
}

static <T> T callAndMaybeRetry(
Supplier<T> operation, Predicate<Exception> retryCondition, String format, Object... args) {
Callable<T> operation, Predicate<Exception> retryCondition, String format, Object... args) {
return callAndMaybeRetry(
operation,
retryCondition,
Expand All @@ -217,7 +217,7 @@ static <T> T callAndMaybeRetry(
}

static <T> T callAndMaybeRetry(
Supplier<T> operation,
Callable<T> operation,
Predicate<Exception> retryCondition,
BackOffDelayPolicy delayPolicy,
String format,
Expand All @@ -230,7 +230,7 @@ static <T> T callAndMaybeRetry(
while (keepTrying) {
try {
attempt++;
T result = operation.get();
T result = operation.call();
Duration operationDuration = Duration.ofNanos(System.nanoTime() - startTime);
LOGGER.debug(
"Operation '{}' completed in {} ms after {} attempt(s)",
Expand Down
Loading