diff --git a/build.sbt b/build.sbt index a12f5e12..24b696c3 100644 --- a/build.sbt +++ b/build.sbt @@ -2,7 +2,7 @@ organization := "com.github.krasserm" name := "akka-persistence-cassandra" -version := "0.7-SNAPSHOT" +version := "0.7.1-SNAPSHOT" scalaVersion := "2.11.7" diff --git a/src/main/scala/akka/persistence/cassandra/CassandraPluginConfig.scala b/src/main/scala/akka/persistence/cassandra/CassandraPluginConfig.scala index 4b61c401..0009c9c4 100644 --- a/src/main/scala/akka/persistence/cassandra/CassandraPluginConfig.scala +++ b/src/main/scala/akka/persistence/cassandra/CassandraPluginConfig.scala @@ -1,11 +1,11 @@ package akka.persistence.cassandra -import java.util.concurrent.TimeUnit import java.net.InetSocketAddress +import java.util.concurrent.TimeUnit import akka.persistence.cassandra.compaction.CassandraCompactionStrategy -import com.datastax.driver.core.policies.{TokenAwarePolicy, DCAwareRoundRobinPolicy} -import com.datastax.driver.core.{QueryOptions, Cluster, ConsistencyLevel, SSLOptions} +import com.datastax.driver.core.policies.{DCAwareRoundRobinPolicy, TokenAwarePolicy} +import com.datastax.driver.core.{Cluster, ConsistencyLevel, QueryOptions, SSLOptions} import com.typesafe.config.Config import scala.collection.JavaConverters._ @@ -25,9 +25,9 @@ class CassandraPluginConfig(config: Config) { val keyspaceAutoCreate: Boolean = config.getBoolean("keyspace-autocreate") val keyspaceAutoCreateRetries: Int = config.getInt("keyspace-autocreate-retries") - + val connectionRetries: Int = config.getInt("connect-retries") - val connectionRetryDelay : FiniteDuration = config.getDuration("connect-retry-delay", TimeUnit.MILLISECONDS).millis + val connectionRetryDelay: FiniteDuration = config.getDuration("connect-retry-delay", TimeUnit.MILLISECONDS).millis val replicationStrategy: String = getReplicationStrategy( config.getString("replication-strategy"), @@ -37,40 +37,44 @@ class CassandraPluginConfig(config: Config) { val readConsistency: ConsistencyLevel = ConsistencyLevel.valueOf(config.getString("read-consistency")) val writeConsistency: ConsistencyLevel = ConsistencyLevel.valueOf(config.getString("write-consistency")) val port: Int = config.getInt("port") - val contactPoints = getContactPoints(config.getStringList("contact-points").asScala, port) - val fetchSize = config.getInt("max-result-size") + val contactPoints: Seq[InetSocketAddress] = getContactPoints(config.getStringList("contact-points").asScala, port) + val fetchSize: Int = config.getInt("max-result-size") - val clusterBuilder: Cluster.Builder = Cluster.builder - .addContactPointsWithPorts(contactPoints.asJava) - .withQueryOptions(new QueryOptions().setFetchSize(fetchSize)) + def getClusterBuilder: Cluster.Builder = { - if (config.hasPath("authentication")) { - clusterBuilder.withCredentials( - config.getString("authentication.username"), - config.getString("authentication.password")) - } + val clusterBuilder = Cluster.builder + .addContactPointsWithPorts(contactPoints.asJava) + .withQueryOptions(new QueryOptions().setFetchSize(fetchSize)) - if (config.hasPath("local-datacenter")) { - clusterBuilder.withLoadBalancingPolicy( - new TokenAwarePolicy( - new DCAwareRoundRobinPolicy(config.getString("local-datacenter")) + if (config.hasPath("authentication")) { + clusterBuilder.withCredentials( + config.getString("authentication.username"), + config.getString("authentication.password")) + } + + if (config.hasPath("local-datacenter")) { + clusterBuilder.withLoadBalancingPolicy( + new TokenAwarePolicy( + new DCAwareRoundRobinPolicy(config.getString("local-datacenter")) + ) ) - ) - } + } + + if (config.hasPath("ssl")) { + val trustStorePath: String = config.getString("ssl.truststore.path") + val trustStorePW: String = config.getString("ssl.truststore.password") + val keyStorePath: String = config.getString("ssl.keystore.path") + val keyStorePW: String = config.getString("ssl.keystore.password") - if (config.hasPath("ssl")) { - val trustStorePath: String = config.getString("ssl.truststore.path") - val trustStorePW: String = config.getString("ssl.truststore.password") - val keyStorePath: String = config.getString("ssl.keystore.path") - val keyStorePW: String = config.getString("ssl.keystore.password") - - val context = SSLSetup.constructContext( - trustStorePath, - trustStorePW, - keyStorePath, - keyStorePW ) - - clusterBuilder.withSSL(new SSLOptions(context,SSLOptions.DEFAULT_SSL_CIPHER_SUITES)) + val context = SSLSetup.constructContext( + trustStorePath, + trustStorePW, + keyStorePath, + keyStorePW) + + clusterBuilder.withSSL(new SSLOptions(context, SSLOptions.DEFAULT_SSL_CIPHER_SUITES)) + } + clusterBuilder } } @@ -80,34 +84,36 @@ object CassandraPluginConfig { /** - * Builds list of InetSocketAddress out of host:port pairs or host entries + given port parameter. - */ + * Builds list of InetSocketAddress out of host:port pairs or host entries + given port parameter. + */ def getContactPoints(contactPoints: Seq[String], port: Int): Seq[InetSocketAddress] = { contactPoints match { case null | Nil => throw new IllegalArgumentException("A contact point list cannot be empty.") case hosts => hosts map { - ipWithPort => ipWithPort.split(":") match { - case Array(host, port) => new InetSocketAddress(host, port.toInt) - case Array(host) => new InetSocketAddress(host, port) - case msg => throw new IllegalArgumentException(s"A contact point should have the form [host:port] or [host] but was: $msg.") - } + ipWithPort => + ipWithPort.split(":") match { + case Array(host, p) => new InetSocketAddress(host, p.toInt) + case Array(host) => new InetSocketAddress(host, port) + case msg => throw new IllegalArgumentException(s"A contact point should have the form [host:port] or [host] but was: $msg.") + } } } } /** - * Builds replication strategy command to create a keyspace. - */ + * Builds replication strategy command to create a keyspace. + */ def getReplicationStrategy(strategy: String, replicationFactor: Int, dataCenterReplicationFactors: Seq[String]): String = { def getDataCenterReplicationFactorList(dcrfList: Seq[String]): String = { val result: Seq[String] = dcrfList match { case null | Nil => throw new IllegalArgumentException("data-center-replication-factors cannot be empty when using NetworkTopologyStrategy.") case dcrfs => dcrfs.map { - dataCenterWithReplicationFactor => dataCenterWithReplicationFactor.split(":") match { - case Array(dataCenter, replicationFactor) => s"'$dataCenter':$replicationFactor" - case msg => throw new IllegalArgumentException(s"A data-center-replication-factor must have the form [dataCenterName:replicationFactor] but was: $msg.") - } + dataCenterWithReplicationFactor => + dataCenterWithReplicationFactor.split(":") match { + case Array(dataCenter, rf) => s"'$dataCenter':$rf" + case msg => throw new IllegalArgumentException(s"A data-center-replication-factor must have the form [dataCenterName:replicationFactor] but was: $msg.") + } } } result.mkString(",") @@ -121,23 +127,24 @@ object CassandraPluginConfig { } /** - * Validates that the supplied keyspace name is valid based on docs found here: - * http://docs.datastax.com/en/cql/3.0/cql/cql_reference/create_keyspace_r.html - * @param keyspaceName - the keyspace name to validate. - * @return - String if the keyspace name is valid, throws IllegalArgumentException otherwise. - */ + * Validates that the supplied keyspace name is valid based on docs found here: + * http://docs.datastax.com/en/cql/3.0/cql/cql_reference/create_keyspace_r.html + * + * @param keyspaceName - the keyspace name to validate. + * @return - String if the keyspace name is valid, throws IllegalArgumentException otherwise. + */ def validateKeyspaceName(keyspaceName: String): String = keyspaceName.matches(keyspaceNameRegex) match { case true => keyspaceName case false => throw new IllegalArgumentException(s"Invalid keyspace name. A keyspace may 32 or fewer alpha-numeric characters and underscores. Value was: $keyspaceName") } /** - * Validates that the supplied table name meets Cassandra's table name requirements. - * According to docs here: https://cassandra.apache.org/doc/cql3/CQL.html#createTableStmt : - * - * @param tableName - the table name to validate - * @return - String if the tableName is valid, throws an IllegalArgumentException otherwise. - */ + * Validates that the supplied table name meets Cassandra's table name requirements. + * According to docs here: https://cassandra.apache.org/doc/cql3/CQL.html#createTableStmt : + * + * @param tableName - the table name to validate + * @return - String if the tableName is valid, throws an IllegalArgumentException otherwise. + */ def validateTableName(tableName: String): String = tableName.matches(keyspaceNameRegex) match { case true => tableName case false => throw new IllegalArgumentException(s"Invalid table name. A table name may 32 or fewer alpha-numeric characters and underscores. Value was: $tableName") diff --git a/src/main/scala/akka/persistence/cassandra/journal/CassandraJournal.scala b/src/main/scala/akka/persistence/cassandra/journal/CassandraJournal.scala index 18d1d8c8..793f2ced 100644 --- a/src/main/scala/akka/persistence/cassandra/journal/CassandraJournal.scala +++ b/src/main/scala/akka/persistence/cassandra/journal/CassandraJournal.scala @@ -52,7 +52,7 @@ class CassandraJournal(cfg: Config) extends AsyncWriteJournal with CassandraReco val preparedInsertDeletedTo = underlying.prepare(insertDeletedTo).setConsistencyLevel(writeConsistency) private def connect(): Session = { - retry(config.connectionRetries + 1, config.connectionRetryDelay.toMillis)(clusterBuilder.build().connect()) + retry(config.connectionRetries + 1, config.connectionRetryDelay.toMillis)(getClusterBuilder.build().connect()) } def close(): Unit = { diff --git a/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStore.scala b/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStore.scala index 9af7a1b5..fd7d6740 100644 --- a/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStore.scala +++ b/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStore.scala @@ -46,7 +46,7 @@ class CassandraSnapshotStore(cfg: Config) extends SnapshotStore with CassandraSt underlying.prepare(selectSnapshotMetadata(limit = None)).setConsistencyLevel(readConsistency) private def connect(): Session = { - retry(config.connectionRetries + 1, config.connectionRetryDelay.toMillis)(clusterBuilder.build().connect()) + retry(config.connectionRetries + 1, config.connectionRetryDelay.toMillis)(getClusterBuilder.build().connect()) } def close(): Unit = { diff --git a/src/test/scala/akka/persistence/cassandra/compaction/CassandraCompactionStrategySpec.scala b/src/test/scala/akka/persistence/cassandra/compaction/CassandraCompactionStrategySpec.scala index 16febb24..4e433965 100644 --- a/src/test/scala/akka/persistence/cassandra/compaction/CassandraCompactionStrategySpec.scala +++ b/src/test/scala/akka/persistence/cassandra/compaction/CassandraCompactionStrategySpec.scala @@ -39,7 +39,7 @@ class CassandraCompactionStrategySpec extends WordSpec with MustMatchers with Ca override protected def beforeAll(): Unit = { super.beforeAll() - cluster = clusterBuilder.build() + cluster = getClusterBuilder.build() session = cluster.connect() session.execute("CREATE KEYSPACE IF NOT EXISTS testKeyspace WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }") diff --git a/src/test/scala/akka/persistence/cassandra/journal/CassandraConfigCheckerSpec.scala b/src/test/scala/akka/persistence/cassandra/journal/CassandraConfigCheckerSpec.scala index 43d4cf18..61694802 100644 --- a/src/test/scala/akka/persistence/cassandra/journal/CassandraConfigCheckerSpec.scala +++ b/src/test/scala/akka/persistence/cassandra/journal/CassandraConfigCheckerSpec.scala @@ -119,7 +119,7 @@ class CassandraConfigCheckerSpec extends TestKit(ActorSystem("test", config)) wi def createCassandraConfigChecker(implicit pluginConfig: CassandraPluginConfig, cfg: Config): CassandraConfigChecker = { - val clusterSession = pluginConfig.clusterBuilder.build.connect() + val clusterSession = pluginConfig.getClusterBuilder.build.connect() new CassandraConfigChecker { override def session: Session = clusterSession diff --git a/src/test/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStoreSpec.scala b/src/test/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStoreSpec.scala index 602dd95b..a2db959a 100644 --- a/src/test/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStoreSpec.scala +++ b/src/test/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStoreSpec.scala @@ -37,7 +37,7 @@ class CassandraSnapshotStoreSpec extends SnapshotStoreSpec(CassandraSnapshotStor override def beforeAll(): Unit = { super.beforeAll() - cluster = clusterBuilder.build() + cluster = getClusterBuilder.build() session = cluster.connect() }