Skip to content

Commit

Permalink
DATACASS-38 - Cluster Connection Listener
Browse files Browse the repository at this point in the history
Added Host.StateListener
Added LatencyTracker
Added SSL/SSLOptions
  • Loading branch information
prowave committed Jan 22, 2014
1 parent bfcb48f commit d348f35
Show file tree
Hide file tree
Showing 8 changed files with 255 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@

import com.datastax.driver.core.AuthProvider;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.LatencyTracker;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.ProtocolOptions.Compression;
import com.datastax.driver.core.SSLOptions;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.policies.LoadBalancingPolicy;
Expand All @@ -61,6 +64,7 @@ public class CassandraClusterFactoryBean implements FactoryBean<Cluster>, Initia
public static final boolean DEFAULT_METRICS_ENABLED = true;
public static final boolean DEFAULT_DEFERRED_INITIALIZATION = false;
public static final boolean DEFAULT_JMX_REPORTING_ENABLED = true;
public static final boolean DEFAULT_SSL_ENABLED = false;
public static final int DEFAULT_PORT = 9042;

protected static final Logger log = LoggerFactory.getLogger(CassandraClusterFactoryBean.class);
Expand All @@ -84,6 +88,10 @@ public class CassandraClusterFactoryBean implements FactoryBean<Cluster>, Initia
private boolean metricsEnabled = DEFAULT_METRICS_ENABLED;
private boolean deferredInitialization = DEFAULT_DEFERRED_INITIALIZATION;
private boolean jmxReportingEnabled = DEFAULT_JMX_REPORTING_ENABLED;
private boolean sslEnabled = DEFAULT_SSL_ENABLED;
private SSLOptions sslOptions;
private Host.StateListener hostStateListener;
private LatencyTracker latencyTracker;
private Set<KeyspaceActionSpecification<?>> keyspaceSpecifications = new HashSet<KeyspaceActionSpecification<?>>();
private List<CreateKeyspaceSpecification> keyspaceCreations = new ArrayList<CreateKeyspaceSpecification>();
private List<DropKeyspaceSpecification> keyspaceDrops = new ArrayList<DropKeyspaceSpecification>();
Expand Down Expand Up @@ -167,8 +175,24 @@ public void afterPropertiesSet() throws Exception {
builder.withoutJMXReporting();
}

if (sslEnabled) {
if (sslOptions == null) {
builder.withSSL();
} else {
builder.withSSL(sslOptions);
}
}

cluster = builder.build();

if (hostStateListener != null) {
cluster.register(hostStateListener);
}

if (latencyTracker != null) {
cluster.register(latencyTracker);
}

generateSpecificationsFromFactoryBeans();

executeSpecsAndScripts(keyspaceCreations, startupScripts);
Expand Down Expand Up @@ -373,4 +397,32 @@ public void setDeferredInitialization(boolean deferredInitialization) {
public void setJmxReportingEnabled(boolean jmxReportingEnabled) {
this.jmxReportingEnabled = jmxReportingEnabled;
}

/**
* @param sslEnabled The sslEnabled to set.
*/
public void setSslEnabled(boolean sslEnabled) {
this.sslEnabled = sslEnabled;
}

/**
* @param sslOptions The sslOptions to set.
*/
public void setSslOptions(SSLOptions sslOptions) {
this.sslOptions = sslOptions;
}

/**
* @param hostStateListener The hostStateListener to set.
*/
public void setHostStateListener(Host.StateListener hostStateListener) {
this.hostStateListener = hostStateListener;
}

/**
* @param latencyTracker The latencyTracker to set.
*/
public void setLatencyTracker(LatencyTracker latencyTracker) {
this.latencyTracker = latencyTracker;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,6 @@ protected void doParse(Element element, ParserContext context, BeanDefinitionBui
builder.addPropertyValue("compressionType", compression);
}

String authProvider = element.getAttribute("auth-info-provider-ref");
if (StringUtils.hasText(authProvider)) {
builder.addPropertyReference("authProvider", authProvider);
}

String username = element.getAttribute("username");
if (StringUtils.hasText(username)) {
builder.addPropertyValue("username", username);
Expand All @@ -140,6 +135,16 @@ protected void doParse(Element element, ParserContext context, BeanDefinitionBui
builder.addPropertyValue("jmxReportingEnabled", jmxReportingEnabled);
}

String sslEnabled = element.getAttribute("sslEnabled");
if (StringUtils.hasText(sslEnabled)) {
builder.addPropertyValue("sslEnabled", sslEnabled);
}

String authProvider = element.getAttribute("auth-info-provider-ref");
if (StringUtils.hasText(authProvider)) {
builder.addPropertyReference("authProvider", authProvider);
}

String loadBalancingPolicy = element.getAttribute("load-balancing-policy-ref");
if (StringUtils.hasText(loadBalancingPolicy)) {
builder.addPropertyReference("loadBalancingPolicy", loadBalancingPolicy);
Expand All @@ -155,6 +160,21 @@ protected void doParse(Element element, ParserContext context, BeanDefinitionBui
builder.addPropertyReference("retryPolicy", retryPolicy);
}

String sslOptions = element.getAttribute("ssl-options-ref");
if (StringUtils.hasText(sslOptions)) {
builder.addPropertyReference("sslOptions", sslOptions);
}

String hostStateListener = element.getAttribute("host-state-listener-ref");
if (StringUtils.hasText(hostStateListener)) {
builder.addPropertyReference("hostStateListener", hostStateListener);
}

String latencyTracker = element.getAttribute("latency-tracker-ref");
if (StringUtils.hasText(latencyTracker)) {
builder.addPropertyReference("latencyTracker", latencyTracker);
}

parseChildElements(element, context, builder);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,23 +136,6 @@ The protocol compression option. Default is "NONE".
</xsd:annotation>
</xsd:attribute>

<xsd:attribute name="auth-info-provider-ref" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
AuthInfoProvider implementation.
]]></xsd:documentation>
</xsd:annotation>
<xsd:simpleType>
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:assignable-to type="com.datastax.driver.core.AuthInfoProvider" />
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
<xsd:union memberTypes="xsd:string" />
</xsd:simpleType>
</xsd:attribute>
<xsd:attribute name="username" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
Expand Down Expand Up @@ -189,7 +172,31 @@ Determine if we defer initalizing the cluster until a connection is requested. D
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="load-balancing-policy-ref" use="optional">
<xsd:attribute name="sslEnabled" type="xsd:string" default="false">
<xsd:annotation>
<xsd:documentation><![CDATA[
Determine if SSL is used for Cassandra communication. Defaults to false.
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="auth-info-provider-ref" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
AuthInfoProvider implementation.
]]></xsd:documentation>
</xsd:annotation>
<xsd:simpleType>
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:assignable-to type="com.datastax.driver.core.AuthInfoProvider" />
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
<xsd:union memberTypes="xsd:string" />
</xsd:simpleType>
</xsd:attribute>
<xsd:attribute name="load-balancing-policy-ref" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
LoadBalancingPolicy implementation.
Expand Down Expand Up @@ -243,6 +250,60 @@ RetryPolicy implementation.
<xsd:union memberTypes="xsd:string" />
</xsd:simpleType>
</xsd:attribute>
<xsd:attribute name="ssl-options-ref" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
Custom SSL Options. sslEnabled must be true for sslOptions to be used.
]]></xsd:documentation>
</xsd:annotation>
<xsd:simpleType>
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:assignable-to
type="com.datastax.driver.core.SSLOptions" />
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
<xsd:union memberTypes="xsd:string" />
</xsd:simpleType>
</xsd:attribute>
<xsd:attribute name="host-state-listener-ref" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
Custom Host State Listener for the Cassandra Cluster.
]]></xsd:documentation>
</xsd:annotation>
<xsd:simpleType>
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:assignable-to
type="com.datastax.driver.core.Host.StateListener" />
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
<xsd:union memberTypes="xsd:string" />
</xsd:simpleType>
</xsd:attribute>
<xsd:attribute name="latency-tracker-ref" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
Custom Latency Tracker for the Cassandra Cluster.
]]></xsd:documentation>
</xsd:annotation>
<xsd:simpleType>
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:assignable-to
type="com.datastax.driver.core.LatencyTracker" />
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
<xsd:union memberTypes="xsd:string" />
</xsd:simpleType>
</xsd:attribute>
</xsd:complexType>
<xsd:simpleType name="clusterRef" final="union">
<xsd:annotation>
Expand Down Expand Up @@ -479,7 +540,7 @@ Provides the ability to specify replication factors by data center.
default="SimpleStrategy">
<xsd:annotation>
<xsd:documentation><![CDATA[
The name of the replication class; default is "SimpleStrategy".
The name of the replication class; default is "SIMPLE_STRATEGY".
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class AbstractEmbeddedCassandraIntegrationTest {

static Logger log = LoggerFactory.getLogger(AbstractEmbeddedCassandraIntegrationTest.class);

protected static final String CASSANDRA_CONFIG = "spring-cassandra.yaml";
protected static String CASSANDRA_CONFIG = "spring-cassandra.yaml";
protected static final String CASSANDRA_HOST = "localhost";
protected static final int CASSANDRA_NATIVE_PORT = 9042;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2011-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cassandra.test.integration.config.xml;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datastax.driver.core.Host;
import com.datastax.driver.core.Host.StateListener;

/**
* @author David Webb
*
*/
public class TestHostStateListener implements StateListener {

private final static Logger log = LoggerFactory.getLogger(TestHostStateListener.class);

@Override
public void onAdd(Host host) {
log.info("Host Added: " + host.getAddress());
}

@Override
public void onUp(Host host) {
log.info("Host Up: " + host.getAddress());
}

@Override
public void onDown(Host host) {
log.info("Host Down: " + host.getAddress());
}

@Override
public void onRemove(Host host) {
log.info("Host Removed: " + host.getAddress());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2011-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cassandra.test.integration.config.xml;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datastax.driver.core.Host;
import com.datastax.driver.core.LatencyTracker;

/**
* @author David Webb
*
*/
public class TestLatencyTracker implements LatencyTracker {

private final static Logger log = LoggerFactory.getLogger(TestLatencyTracker.class);

@Override
public void update(Host host, long newLatencyNanos) {
log.info("Latency Tracker: " + host.getAddress() + ", " + newLatencyNanos + " nanoseconds.");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
</bean>

<bean id="retryPolicy" class="com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy" />

<bean id="hostStateListener" class="org.springframework.cassandra.test.integration.config.xml.TestHostStateListener"/>

<bean id="latencyTracker" class="org.springframework.cassandra.test.integration.config.xml.TestLatencyTracker"/>

<cassandra:cluster id="cassandra-cluster"
contactPoints="${cluster.contactPoints}" port="${cluster.port}"
Expand All @@ -29,7 +33,9 @@
deferredInitialization="${cluster.deferredInit}" metricsEnabled="${cluster.metricsEnabled}"
jmxReportingEnabled="${cluster.jmxReportingEnabled}"
reconnection-policy-ref="reconnectionPolicy"
retry-policy-ref="retryPolicy">
retry-policy-ref="retryPolicy"
host-state-listener-ref="hostStateListener"
latency-tracker-ref="latencyTracker">
<cassandra:local-pooling-options
min-simultaneous-requests="${local.min.requests}"
max-simultaneous-requests="${local.max.requests}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ cluster.deferredInit=true
cluster.metricsEnabled=false
cluster.jmxReportingEnabled=false
cluster.reconnection.delayMillis=5000
cluster.sslEnabled= true
keyspace.name=ppncxct
keyspace.action=CREATE
dc1.name=DCJAX
Expand Down

0 comments on commit d348f35

Please sign in to comment.