Skip to content

Commit

Permalink
Merge pull request #810 from shintasmith/fix_ttl_test_ubuntu16
Browse files Browse the repository at this point in the history
create a datastax RetryPolicy that can retry n times
  • Loading branch information
shintasmith committed May 22, 2017
2 parents 105246b + 00c3a14 commit 358242e
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 1 deletion.
Expand Up @@ -124,6 +124,13 @@ public int getRequestTimeout() {
return config.getIntegerProperty(CoreConfig.CASSANDRA_REQUEST_TIMEOUT);
}

public int getReadTimeoutMaxRetries() { return config.getIntegerProperty(CoreConfig.DATASTAX_READ_TIMEOUT_MAX_RETRIES); }

public int getWriteTimeoutMaxRetries() { return config.getIntegerProperty(CoreConfig.DATASTAX_WRITE_TIMEOUT_MAX_RETRIES); }

public int getUnavailableMaxRetries() { return config.getIntegerProperty(CoreConfig.DATASTAX_UNAVAILABLE_MAX_RETRIES); }


// prevent people from instantiating directly
private IOConfig() {
}
Expand Down
Expand Up @@ -54,11 +54,16 @@ private DatastaxIO() {
private static void connect() {
Set<InetSocketAddress> dbHosts = ioconfig.getUniqueBinaryTransportHostsAsInetSocketAddresses();

int readTimeoutMaxRetries = ioconfig.getReadTimeoutMaxRetries();
int writeTimeoutMaxRetries = ioconfig.getWriteTimeoutMaxRetries();
int unavailableMaxRetries = ioconfig.getUnavailableMaxRetries();

CodecRegistry codecRegistry = new CodecRegistry();

cluster = Cluster.builder()
.withLoadBalancingPolicy(new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().withLocalDc(ioconfig.getDatacenterName()).build(), false))
.withPoolingOptions(getPoolingOptions())
.withRetryPolicy(new RetryNTimes(readTimeoutMaxRetries, writeTimeoutMaxRetries, unavailableMaxRetries))
.withCodecRegistry(codecRegistry)
.withSocketOptions(getSocketOptions())
.addContactPointsWithPorts(dbHosts)
Expand Down
@@ -0,0 +1,93 @@
package com.rackspacecloud.blueflood.io.datastax;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.exceptions.DriverException;
import com.datastax.driver.core.policies.RetryPolicy;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.WriteType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A class that retry on read/write/unavailable timeouts n times, where
* n is a configurable number
*/
public class RetryNTimes implements RetryPolicy {

private static final Logger LOG = LoggerFactory.getLogger(RetryNTimes.class);
private final int readAttempts;
private final int writeAttempts;
private final int unavailableAttempts;

/**
* A Datastax {@link RetryPolicy} that is capable of retrying n times
* @param readAttempts
* @param writeAttempts
* @param unavailableAttempts
*/
public RetryNTimes(int readAttempts, int writeAttempts, int unavailableAttempts) {
this.readAttempts = readAttempts;
this.writeAttempts = writeAttempts;
this.unavailableAttempts = unavailableAttempts;
}

@Override
public RetryDecision onReadTimeout(Statement stmnt, ConsistencyLevel cl,
int requiredResponses, int receivedResponses,
boolean dataReceived, int rTime) {
if (dataReceived) {
return RetryDecision.ignore();
} else if (rTime < readAttempts) {
LOG.info(String.format("Retrying on ReadTimeout: stmnt %s, " +
"consistency %s, requiredResponse %d, " +
"receivedResponse %d, dataReceived %s, rTime %d",
stmnt, cl, requiredResponses, receivedResponses, dataReceived, rTime));
return RetryDecision.retry(cl);
} else {
return RetryDecision.rethrow();
}

}

@Override
public RetryDecision onWriteTimeout(Statement stmnt, ConsistencyLevel cl,
WriteType wt, int requiredResponses,
int receivedResponses, int wTime) {
if (wTime < writeAttempts) {
LOG.info(String.format("Retrying on WriteTimeout: stmnt %s, " +
"consistency %s, writeType %s, requiredResponse %d, " +
"receivedResponse %d, rTime %d",
stmnt, cl, wt.toString(), requiredResponses, receivedResponses, wTime));
return RetryDecision.retry(cl);
}
return RetryDecision.rethrow();
}

@Override
public RetryDecision onUnavailable(Statement stmnt, ConsistencyLevel cl,
int requiredResponses, int receivedResponses, int uTime) {
if (uTime < unavailableAttempts) {
LOG.info(String.format("Retrying on unavailable: stmnt %s, consistency %s, " +
"requiredResponse %d, receivedResponse %d, rTime %d",
stmnt, cl, requiredResponses, receivedResponses, uTime));
return RetryDecision.retry(ConsistencyLevel.ONE);
}
return RetryDecision.rethrow();
}

@Override
public RetryDecision onRequestError(Statement statement, ConsistencyLevel cl, DriverException e, int nbRetry) {
LOG.info(String.format("Trying nextHost on requestError: stmnt %s, consistency %s, driver ex %s, nbRetry %d",
statement, cl, e, nbRetry));
return RetryDecision.tryNextHost(cl);
}

@Override
public void init(Cluster cluster) {
}

@Override
public void close() {
}
}
Expand Up @@ -41,6 +41,9 @@ public enum CoreConfig implements ConfigDefaults {
DATASTAX_CORE_CONNECTIONS_PER_HOST("5"),
DATASTAX_MAX_CONNECTIONS_PER_HOST("10"),
DATASTAX_MAX_REQUESTS_PER_CONNECTION("1024"),
DATASTAX_READ_TIMEOUT_MAX_RETRIES("3"),
DATASTAX_WRITE_TIMEOUT_MAX_RETRIES("3"),
DATASTAX_UNAVAILABLE_MAX_RETRIES("1"),

ROLLUP_KEYSPACE("DATA"),
CLUSTER_NAME("Test Cluster"),
Expand Down
@@ -0,0 +1,79 @@
package com.rackspacecloud.blueflood.io.datastax;

import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.WriteType;
import com.datastax.driver.core.policies.RetryPolicy;
import org.junit.Test;

import static org.mockito.Mockito.mock;
import static org.junit.Assert.*;

/**
* Unit test for RetryNTimes policy
*/
public class RetryNTimesTest {

@Test
public void firstTimeRetryOnReadTimeout_shouldRetry() throws Exception {
RetryNTimes retryPolicy = new RetryNTimes(3, 3, 3);
Statement mockStatement = mock( Statement.class );
RetryPolicy.RetryDecision retryResult = retryPolicy.onReadTimeout(mockStatement, ConsistencyLevel.LOCAL_ONE, 1, 0, false, 0);
RetryPolicy.RetryDecision retryExpected = RetryPolicy.RetryDecision.retry(ConsistencyLevel.LOCAL_ONE);
assertRetryDecisionEquals(retryExpected, retryResult);
}

@Test
public void maxTimeRetryOnReadTimeout_shouldRethrow() throws Exception {
RetryNTimes retryPolicy = new RetryNTimes(3, 3, 3);
Statement mockStatement = mock( Statement.class );

RetryPolicy.RetryDecision retryResult = retryPolicy.onReadTimeout(mockStatement, ConsistencyLevel.LOCAL_ONE, 1, 0, false, 3);
RetryPolicy.RetryDecision retryExpected = RetryPolicy.RetryDecision.rethrow();
assertRetryDecisionEquals(retryExpected, retryResult);
}

@Test
public void firstTimeRetryOnWriteTimeout_shouldRetry() throws Exception {
RetryNTimes retryPolicy = new RetryNTimes(3, 3, 3);
Statement mockStatement = mock( Statement.class );
RetryPolicy.RetryDecision retryResult = retryPolicy.onWriteTimeout(mockStatement, ConsistencyLevel.LOCAL_ONE, WriteType.BATCH, 1, 0, 0);
RetryPolicy.RetryDecision retryExpected = RetryPolicy.RetryDecision.retry(ConsistencyLevel.LOCAL_ONE);
assertRetryDecisionEquals(retryExpected, retryResult);
}

@Test
public void maxTimeRetryOnWriteTimeout_shouldRethrow() throws Exception {
RetryNTimes retryPolicy = new RetryNTimes(3, 3, 3);
Statement mockStatement = mock( Statement.class );

RetryPolicy.RetryDecision retryResult = retryPolicy.onWriteTimeout(mockStatement, ConsistencyLevel.LOCAL_ONE, WriteType.BATCH, 1, 0, 3);
RetryPolicy.RetryDecision retryExpected = RetryPolicy.RetryDecision.rethrow();
assertRetryDecisionEquals(retryExpected, retryResult);
}

@Test
public void firstTimeRetryOnUnavailable_shouldRetry() throws Exception {
RetryNTimes retryPolicy = new RetryNTimes(3, 3, 3);
Statement mockStatement = mock( Statement.class );
RetryPolicy.RetryDecision retryResult = retryPolicy.onUnavailable(mockStatement, ConsistencyLevel.LOCAL_ONE, 1, 0, 0);
RetryPolicy.RetryDecision retryExpected = RetryPolicy.RetryDecision.retry(ConsistencyLevel.ONE);
assertRetryDecisionEquals(retryExpected, retryResult);
}

@Test
public void maxTimeRetryOnUnavailable_shouldRethrow() throws Exception {
RetryNTimes retryPolicy = new RetryNTimes(3, 3, 3);
Statement mockStatement = mock( Statement.class );

RetryPolicy.RetryDecision retryResult = retryPolicy.onUnavailable(mockStatement, ConsistencyLevel.LOCAL_ONE, 1, 0, 3);
RetryPolicy.RetryDecision retryExpected = RetryPolicy.RetryDecision.rethrow();
assertRetryDecisionEquals(retryExpected, retryResult);
}

private void assertRetryDecisionEquals(RetryPolicy.RetryDecision expected, RetryPolicy.RetryDecision result) {
assertEquals("first time retry type", expected.getType(), result.getType());
assertEquals("first time retry consistency level", expected.getRetryConsistencyLevel(), result.getRetryConsistencyLevel());
assertEquals("first time retry current", expected.isRetryCurrent(), result.isRetryCurrent());
}
}
Expand Up @@ -112,7 +112,7 @@ public void testHttpIngestionHappyCase() throws Exception {
// the port is no longer open) is to catch the exception object and
// check its message. Hence, this try/catch.

Assert.assertEquals("Connection refused", ex.getMessage());
Assert.assertTrue("Connection refused", ex.getMessage().contains("Connection refused"));
}
}

Expand Down

0 comments on commit 358242e

Please sign in to comment.