Skip to content

Commit

Permalink
refresh cassandra node ips on connection failure/retry
Browse files Browse the repository at this point in the history
  • Loading branch information
matlockx committed May 4, 2017
1 parent cab4307 commit 6708c95
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 64 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Expand Up @@ -2,7 +2,7 @@ organization := "com.github.krasserm"

name := "akka-persistence-cassandra"

version := "0.7-SNAPSHOT"
version := "0.7.1-SNAPSHOT"

scalaVersion := "2.11.7"

Expand Down
123 changes: 65 additions & 58 deletions src/main/scala/akka/persistence/cassandra/CassandraPluginConfig.scala
@@ -1,11 +1,11 @@
package akka.persistence.cassandra

import java.util.concurrent.TimeUnit
import java.net.InetSocketAddress
import java.util.concurrent.TimeUnit

import akka.persistence.cassandra.compaction.CassandraCompactionStrategy
import com.datastax.driver.core.policies.{TokenAwarePolicy, DCAwareRoundRobinPolicy}
import com.datastax.driver.core.{QueryOptions, Cluster, ConsistencyLevel, SSLOptions}
import com.datastax.driver.core.policies.{DCAwareRoundRobinPolicy, TokenAwarePolicy}
import com.datastax.driver.core.{Cluster, ConsistencyLevel, QueryOptions, SSLOptions}
import com.typesafe.config.Config

import scala.collection.JavaConverters._
Expand All @@ -25,9 +25,9 @@ class CassandraPluginConfig(config: Config) {

val keyspaceAutoCreate: Boolean = config.getBoolean("keyspace-autocreate")
val keyspaceAutoCreateRetries: Int = config.getInt("keyspace-autocreate-retries")

val connectionRetries: Int = config.getInt("connect-retries")
val connectionRetryDelay : FiniteDuration = config.getDuration("connect-retry-delay", TimeUnit.MILLISECONDS).millis
val connectionRetryDelay: FiniteDuration = config.getDuration("connect-retry-delay", TimeUnit.MILLISECONDS).millis

val replicationStrategy: String = getReplicationStrategy(
config.getString("replication-strategy"),
Expand All @@ -37,40 +37,44 @@ class CassandraPluginConfig(config: Config) {
val readConsistency: ConsistencyLevel = ConsistencyLevel.valueOf(config.getString("read-consistency"))
val writeConsistency: ConsistencyLevel = ConsistencyLevel.valueOf(config.getString("write-consistency"))
val port: Int = config.getInt("port")
val contactPoints = getContactPoints(config.getStringList("contact-points").asScala, port)
val fetchSize = config.getInt("max-result-size")
val contactPoints: Seq[InetSocketAddress] = getContactPoints(config.getStringList("contact-points").asScala, port)
val fetchSize: Int = config.getInt("max-result-size")

val clusterBuilder: Cluster.Builder = Cluster.builder
.addContactPointsWithPorts(contactPoints.asJava)
.withQueryOptions(new QueryOptions().setFetchSize(fetchSize))
def getClusterBuilder: Cluster.Builder = {

if (config.hasPath("authentication")) {
clusterBuilder.withCredentials(
config.getString("authentication.username"),
config.getString("authentication.password"))
}
val clusterBuilder = Cluster.builder
.addContactPointsWithPorts(contactPoints.asJava)
.withQueryOptions(new QueryOptions().setFetchSize(fetchSize))

if (config.hasPath("local-datacenter")) {
clusterBuilder.withLoadBalancingPolicy(
new TokenAwarePolicy(
new DCAwareRoundRobinPolicy(config.getString("local-datacenter"))
if (config.hasPath("authentication")) {
clusterBuilder.withCredentials(
config.getString("authentication.username"),
config.getString("authentication.password"))
}

if (config.hasPath("local-datacenter")) {
clusterBuilder.withLoadBalancingPolicy(
new TokenAwarePolicy(
new DCAwareRoundRobinPolicy(config.getString("local-datacenter"))
)
)
)
}
}

if (config.hasPath("ssl")) {
val trustStorePath: String = config.getString("ssl.truststore.path")
val trustStorePW: String = config.getString("ssl.truststore.password")
val keyStorePath: String = config.getString("ssl.keystore.path")
val keyStorePW: String = config.getString("ssl.keystore.password")

if (config.hasPath("ssl")) {
val trustStorePath: String = config.getString("ssl.truststore.path")
val trustStorePW: String = config.getString("ssl.truststore.password")
val keyStorePath: String = config.getString("ssl.keystore.path")
val keyStorePW: String = config.getString("ssl.keystore.password")

val context = SSLSetup.constructContext(
trustStorePath,
trustStorePW,
keyStorePath,
keyStorePW )

clusterBuilder.withSSL(new SSLOptions(context,SSLOptions.DEFAULT_SSL_CIPHER_SUITES))
val context = SSLSetup.constructContext(
trustStorePath,
trustStorePW,
keyStorePath,
keyStorePW)

clusterBuilder.withSSL(new SSLOptions(context, SSLOptions.DEFAULT_SSL_CIPHER_SUITES))
}
clusterBuilder
}
}

Expand All @@ -80,34 +84,36 @@ object CassandraPluginConfig {


/**
* Builds list of InetSocketAddress out of host:port pairs or host entries + given port parameter.
*/
* Builds list of InetSocketAddress out of host:port pairs or host entries + given port parameter.
*/
def getContactPoints(contactPoints: Seq[String], port: Int): Seq[InetSocketAddress] = {
contactPoints match {
case null | Nil => throw new IllegalArgumentException("A contact point list cannot be empty.")
case hosts => hosts map {
ipWithPort => ipWithPort.split(":") match {
case Array(host, port) => new InetSocketAddress(host, port.toInt)
case Array(host) => new InetSocketAddress(host, port)
case msg => throw new IllegalArgumentException(s"A contact point should have the form [host:port] or [host] but was: $msg.")
}
ipWithPort =>
ipWithPort.split(":") match {
case Array(host, p) => new InetSocketAddress(host, p.toInt)
case Array(host) => new InetSocketAddress(host, port)
case msg => throw new IllegalArgumentException(s"A contact point should have the form [host:port] or [host] but was: $msg.")
}
}
}
}

/**
* Builds replication strategy command to create a keyspace.
*/
* Builds replication strategy command to create a keyspace.
*/
def getReplicationStrategy(strategy: String, replicationFactor: Int, dataCenterReplicationFactors: Seq[String]): String = {

def getDataCenterReplicationFactorList(dcrfList: Seq[String]): String = {
val result: Seq[String] = dcrfList match {
case null | Nil => throw new IllegalArgumentException("data-center-replication-factors cannot be empty when using NetworkTopologyStrategy.")
case dcrfs => dcrfs.map {
dataCenterWithReplicationFactor => dataCenterWithReplicationFactor.split(":") match {
case Array(dataCenter, replicationFactor) => s"'$dataCenter':$replicationFactor"
case msg => throw new IllegalArgumentException(s"A data-center-replication-factor must have the form [dataCenterName:replicationFactor] but was: $msg.")
}
dataCenterWithReplicationFactor =>
dataCenterWithReplicationFactor.split(":") match {
case Array(dataCenter, rf) => s"'$dataCenter':$rf"
case msg => throw new IllegalArgumentException(s"A data-center-replication-factor must have the form [dataCenterName:replicationFactor] but was: $msg.")
}
}
}
result.mkString(",")
Expand All @@ -121,23 +127,24 @@ object CassandraPluginConfig {
}

/**
* Validates that the supplied keyspace name is valid based on docs found here:
* http://docs.datastax.com/en/cql/3.0/cql/cql_reference/create_keyspace_r.html
* @param keyspaceName - the keyspace name to validate.
* @return - String if the keyspace name is valid, throws IllegalArgumentException otherwise.
*/
* Validates that the supplied keyspace name is valid based on docs found here:
* http://docs.datastax.com/en/cql/3.0/cql/cql_reference/create_keyspace_r.html
*
* @param keyspaceName - the keyspace name to validate.
* @return - String if the keyspace name is valid, throws IllegalArgumentException otherwise.
*/
def validateKeyspaceName(keyspaceName: String): String = keyspaceName.matches(keyspaceNameRegex) match {
case true => keyspaceName
case false => throw new IllegalArgumentException(s"Invalid keyspace name. A keyspace may 32 or fewer alpha-numeric characters and underscores. Value was: $keyspaceName")
}

/**
* Validates that the supplied table name meets Cassandra's table name requirements.
* According to docs here: https://cassandra.apache.org/doc/cql3/CQL.html#createTableStmt :
*
* @param tableName - the table name to validate
* @return - String if the tableName is valid, throws an IllegalArgumentException otherwise.
*/
* Validates that the supplied table name meets Cassandra's table name requirements.
* According to docs here: https://cassandra.apache.org/doc/cql3/CQL.html#createTableStmt :
*
* @param tableName - the table name to validate
* @return - String if the tableName is valid, throws an IllegalArgumentException otherwise.
*/
def validateTableName(tableName: String): String = tableName.matches(keyspaceNameRegex) match {
case true => tableName
case false => throw new IllegalArgumentException(s"Invalid table name. A table name may 32 or fewer alpha-numeric characters and underscores. Value was: $tableName")
Expand Down
Expand Up @@ -52,7 +52,7 @@ class CassandraJournal(cfg: Config) extends AsyncWriteJournal with CassandraReco
val preparedInsertDeletedTo = underlying.prepare(insertDeletedTo).setConsistencyLevel(writeConsistency)

private def connect(): Session = {
retry(config.connectionRetries + 1, config.connectionRetryDelay.toMillis)(clusterBuilder.build().connect())
retry(config.connectionRetries + 1, config.connectionRetryDelay.toMillis)(getClusterBuilder.build().connect())
}

def close(): Unit = {
Expand Down
Expand Up @@ -46,7 +46,7 @@ class CassandraSnapshotStore(cfg: Config) extends SnapshotStore with CassandraSt
underlying.prepare(selectSnapshotMetadata(limit = None)).setConsistencyLevel(readConsistency)

private def connect(): Session = {
retry(config.connectionRetries + 1, config.connectionRetryDelay.toMillis)(clusterBuilder.build().connect())
retry(config.connectionRetries + 1, config.connectionRetryDelay.toMillis)(getClusterBuilder.build().connect())
}

def close(): Unit = {
Expand Down
Expand Up @@ -39,7 +39,7 @@ class CassandraCompactionStrategySpec extends WordSpec with MustMatchers with Ca
override protected def beforeAll(): Unit = {
super.beforeAll()

cluster = clusterBuilder.build()
cluster = getClusterBuilder.build()
session = cluster.connect()

session.execute("CREATE KEYSPACE IF NOT EXISTS testKeyspace WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }")
Expand Down
Expand Up @@ -119,7 +119,7 @@ class CassandraConfigCheckerSpec extends TestKit(ActorSystem("test", config)) wi

def createCassandraConfigChecker(implicit pluginConfig: CassandraPluginConfig, cfg: Config): CassandraConfigChecker = {

val clusterSession = pluginConfig.clusterBuilder.build.connect()
val clusterSession = pluginConfig.getClusterBuilder.build.connect()

new CassandraConfigChecker {
override def session: Session = clusterSession
Expand Down
Expand Up @@ -37,7 +37,7 @@ class CassandraSnapshotStoreSpec extends SnapshotStoreSpec(CassandraSnapshotStor

override def beforeAll(): Unit = {
super.beforeAll()
cluster = clusterBuilder.build()
cluster = getClusterBuilder.build()
session = cluster.connect()
}

Expand Down

0 comments on commit 6708c95

Please sign in to comment.