Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion src/main/java/com/rabbitmq/client/ConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ public class ConnectionFactory implements Cloneable {

/** The default continuation timeout for RPC calls in channels: 10 minutes */
public static final int DEFAULT_CHANNEL_RPC_TIMEOUT = (int) MINUTES.toMillis(10);

/** The default network recovery interval: 5000 millis */
public static final long DEFAULT_NETWORK_RECOVERY_INTERVAL = 5000;

private static final String PREFERRED_TLS_PROTOCOL = "TLSv1.2";

Expand Down Expand Up @@ -111,7 +114,8 @@ public class ConnectionFactory implements Cloneable {
// long is used to make sure the users can use both ints
// and longs safely. It is unlikely that anybody'd need
// to use recovery intervals > Integer.MAX_VALUE in practice.
private long networkRecoveryInterval = 5000;
private long networkRecoveryInterval = DEFAULT_NETWORK_RECOVERY_INTERVAL;
private RecoveryDelayHandler recoveryDelayHandler;

private MetricsCollector metricsCollector;

Expand Down Expand Up @@ -960,6 +964,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
result.setShutdownTimeout(shutdownTimeout);
result.setSaslConfig(saslConfig);
result.setNetworkRecoveryInterval(networkRecoveryInterval);
result.setRecoveryDelayHandler(recoveryDelayHandler);
result.setTopologyRecovery(topologyRecovery);
result.setExceptionHandler(exceptionHandler);
result.setThreadFactory(threadFactory);
Expand Down Expand Up @@ -1077,6 +1082,22 @@ public void setNetworkRecoveryInterval(int networkRecoveryInterval) {
public void setNetworkRecoveryInterval(long networkRecoveryInterval) {
this.networkRecoveryInterval = networkRecoveryInterval;
}

/**
* Returns automatic connection recovery delay handler.
* @return recovery delay handler. May be null if not set.
*/
public RecoveryDelayHandler getRecoveryDelayHandler() {
return recoveryDelayHandler;
}

/**
* Sets the automatic connection recovery delay handler.
* @param recoveryDelayHandler the recovery delay handler
*/
public void setRecoveryDelayHandler(final RecoveryDelayHandler recoveryDelayHandler) {
this.recoveryDelayHandler = recoveryDelayHandler;
}

/**
* Sets the parameters when using NIO.
Expand Down
79 changes: 79 additions & 0 deletions src/main/java/com/rabbitmq/client/RecoveryDelayHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package com.rabbitmq.client;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

/**
* A RecoveryDelayHandler is used to tell automatic recovery how long to sleep between reconnect attempts.
*
* @since 4.3.0
*/
public interface RecoveryDelayHandler {

/**
* Get the time to sleep (in milliseconds) before attempting to reconnect and recover again.
* This method will be called with recoveryAttempts=0 before the first recovery attempt and then again after each failed recovery.
*
* @param recoveryAttempts
* The number of recovery attempts so far.
* @return the delay in milliseconds
*/
long getDelay(final int recoveryAttempts);

/**
* Basic implementation of {@link RecoveryDelayHandler} that returns the {@link ConnectionFactory#getNetworkRecoveryInterval() network recovery interval} each time.
*/
public static class DefaultRecoveryDelayHandler implements RecoveryDelayHandler {

private final long networkRecoveryInterval;

/**
* Default Constructor
* @param networkRecoveryInterval
* recovery delay time in millis
*/
public DefaultRecoveryDelayHandler(final long networkRecoveryInterval) {
this.networkRecoveryInterval = networkRecoveryInterval;
}

@Override
public long getDelay(int recoveryAttempts) {
return networkRecoveryInterval;
}
}

/**
* Backoff implementation of {@link RecoveryDelayHandler} that uses the Fibonacci sequence (by default) to increase the recovery delay time after each failed attempt.
* You can optionally use your own backoff sequence.
*/
public static class ExponentialBackoffDelayHandler implements RecoveryDelayHandler {

private final List<Long> sequence;

/**
* Default Constructor. Uses the fibonacci sequence: {0, 1000, 1000, 2000, 3000, 5000, 8000, 13000, 21000}.
*/
public ExponentialBackoffDelayHandler() {
sequence = Arrays.asList(0L, 1000L, 1000L, 2000L, 3000L, 5000L, 8000L, 13000L, 21000L);
}

/**
* Constructor for passing your own backoff sequence
*
* @param sequence
* List of recovery delay values in milliseconds.
* @throws IllegalArgumentException if the sequence is null or empty
*/
public ExponentialBackoffDelayHandler(final List<Long> sequence) {
if (sequence == null || sequence.isEmpty())
throw new IllegalArgumentException();
this.sequence = Collections.unmodifiableList(sequence);
}

@Override
public long getDelay(int recoveryAttempts) {
return sequence.get(recoveryAttempts >= sequence.size() ? sequence.size() - 1 : recoveryAttempts);
}
}
}
15 changes: 15 additions & 0 deletions src/main/java/com/rabbitmq/client/impl/ConnectionParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package com.rabbitmq.client.impl;

import com.rabbitmq.client.ExceptionHandler;
import com.rabbitmq.client.RecoveryDelayHandler;
import com.rabbitmq.client.RecoveryDelayHandler.DefaultRecoveryDelayHandler;
import com.rabbitmq.client.SaslConfig;

import java.util.Map;
Expand All @@ -38,6 +40,7 @@ public class ConnectionParams {
private int shutdownTimeout;
private SaslConfig saslConfig;
private long networkRecoveryInterval;
private RecoveryDelayHandler recoveryDelayHandler;
private boolean topologyRecovery;
private int channelRpcTimeout;
private boolean channelShouldCheckRpcResponseType;
Expand Down Expand Up @@ -102,6 +105,14 @@ public ExceptionHandler getExceptionHandler() {
public long getNetworkRecoveryInterval() {
return networkRecoveryInterval;
}

/**
* Get the recovery delay handler.
* @return recovery delay handler or if none was set a {@link DefaultRecoveryDelayHandler} will be returned with a delay of {@link #getNetworkRecoveryInterval()}.
*/
public RecoveryDelayHandler getRecoveryDelayHandler() {
return recoveryDelayHandler == null ? new DefaultRecoveryDelayHandler(networkRecoveryInterval) : recoveryDelayHandler;
}

public boolean isTopologyRecoveryEnabled() {
return topologyRecovery;
Expand Down Expand Up @@ -162,6 +173,10 @@ public void setSaslConfig(SaslConfig saslConfig) {
public void setNetworkRecoveryInterval(long networkRecoveryInterval) {
this.networkRecoveryInterval = networkRecoveryInterval;
}

public void setRecoveryDelayHandler(final RecoveryDelayHandler recoveryDelayHandler) {
this.recoveryDelayHandler = recoveryDelayHandler;
}

public void setTopologyRecovery(boolean topologyRecovery) {
this.topologyRecovery = topologyRecovery;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ public void removeConsumerRecoveryListener(ConsumerRecoveryListener listener) {
}

synchronized private void beginAutomaticRecovery() throws InterruptedException {
Thread.sleep(this.params.getNetworkRecoveryInterval());
Thread.sleep(this.params.getRecoveryDelayHandler().getDelay(0));

this.notifyRecoveryListenersStarted();

Expand Down Expand Up @@ -525,9 +525,10 @@ private void recoverBlockedListeners(final RecoveryAwareAMQConnection newConn) {
// Returns new connection if the connection was recovered,
// null if application initiated shutdown while attempting recovery.
private RecoveryAwareAMQConnection recoverConnection() throws InterruptedException {
while (!manuallyClosed)
{
int attempts = 0;
while (!manuallyClosed) {
try {
attempts++;
RecoveryAwareAMQConnection newConn = this.cf.newConnection();
synchronized(recoveryLock) {
if (!manuallyClosed) {
Expand All @@ -541,8 +542,7 @@ private RecoveryAwareAMQConnection recoverConnection() throws InterruptedExcepti
newConn.abort();
return null;
} catch (Exception e) {
// TODO: exponential back-off
Thread.sleep(this.params.getNetworkRecoveryInterval());
Thread.sleep(this.params.getRecoveryDelayHandler().getDelay(attempts));
Copy link
Contributor Author

@vikinghawk vikinghawk Oct 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thoughts on flipping the order of these 2 lines? Should it be delivering the exception before sleeping?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep it the way it is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I think at some point we will have to do it. The way it works right now is a bit weird and can complicate additional manual recovery steps.

this.getExceptionHandler().handleConnectionRecoveryException(this, e);
}
}
Expand All @@ -561,13 +561,13 @@ private void recoverChannels(final RecoveryAwareAMQConnection newConn) {
}

private void notifyRecoveryListenersComplete() {
for (RecoveryListener f : this.recoveryListeners) {
for (RecoveryListener f : Utility.copy(this.recoveryListeners)) {
f.handleRecovery(this);
}
}

private void notifyRecoveryListenersStarted() {
for (RecoveryListener f : this.recoveryListeners) {
for (RecoveryListener f : Utility.copy(this.recoveryListeners)) {
f.handleRecoveryStarted(this);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.rabbitmq.client.test;

import static org.junit.Assert.assertEquals;

import java.util.Arrays;
import java.util.Collections;

import com.rabbitmq.client.RecoveryDelayHandler;
import com.rabbitmq.client.RecoveryDelayHandler.DefaultRecoveryDelayHandler;
import com.rabbitmq.client.RecoveryDelayHandler.ExponentialBackoffDelayHandler;

import org.junit.Test;

public class RecoveryDelayHandlerTest {

@Test
public void testDefaultRecoveryDelayHandler() {
final RecoveryDelayHandler handler = new DefaultRecoveryDelayHandler(5000);
assertEquals(5000L, handler.getDelay(0));
assertEquals(5000L, handler.getDelay(1));
assertEquals(5000L, handler.getDelay(Integer.MAX_VALUE));
}

@Test
public void testExponentialBackoffDelayHandler_default() {
final RecoveryDelayHandler handler = new ExponentialBackoffDelayHandler();
assertEquals(0, handler.getDelay(0));
assertEquals(1000L, handler.getDelay(1));
assertEquals(1000L, handler.getDelay(2));
assertEquals(2000L, handler.getDelay(3));
assertEquals(3000L, handler.getDelay(4));
assertEquals(5000L, handler.getDelay(5));
assertEquals(8000L, handler.getDelay(6));
assertEquals(13000L, handler.getDelay(7));
assertEquals(21000L, handler.getDelay(8));
assertEquals(21000L, handler.getDelay(9));
assertEquals(21000L, handler.getDelay(Integer.MAX_VALUE));
}

@Test
public void testExponentialBackoffDelayHandler_sequence() {
final RecoveryDelayHandler handler = new ExponentialBackoffDelayHandler(Arrays.asList(1L, 2L));
assertEquals(1, handler.getDelay(0));
assertEquals(2, handler.getDelay(1));
assertEquals(2, handler.getDelay(2));
assertEquals(2, handler.getDelay(Integer.MAX_VALUE));
}

@Test(expected=IllegalArgumentException.class)
public void testExponentialBackoffDelayHandler_sequence_null() {
new ExponentialBackoffDelayHandler(null);
}

@Test(expected=IllegalArgumentException.class)
public void testExponentialBackoffDelayHandler_sequence_empty() {
new ExponentialBackoffDelayHandler(Collections.<Long>emptyList());
}
}