Skip to content

Commit

Permalink
#110 Create a single jvm wide cluster and session object
Browse files Browse the repository at this point in the history
  • Loading branch information
Kevin Conaway authored and jsevellec committed Dec 2, 2016
1 parent da43d18 commit 2b231f4
Show file tree
Hide file tree
Showing 11 changed files with 65 additions and 94 deletions.
Expand Up @@ -44,26 +44,19 @@ protected void startServer(TestContext testContext) throws Exception {
initialized = true;
}

String clusterName = EmbeddedCassandraServerHelper.getClusterName();
String host = EmbeddedCassandraServerHelper.getHost();
int rpcPort = EmbeddedCassandraServerHelper.getRpcPort();
int nativeTransportPort = EmbeddedCassandraServerHelper.getNativeTransportPort();

CassandraDataSet cassandraDataSet = AnnotationUtils.findAnnotation(testContext.getTestClass(), CassandraDataSet.class);
if (cassandraDataSet != null) {
List<String> dataset = null;
ListIterator<String> datasetIterator = null;
String keyspace = cassandraDataSet.keyspace();

// TODO : find a way to hide them and avoid switch, need some refactoring cassandra-unit
switch (cassandraDataSet.type()) {
case cql:
dataset = dataSetLocations(testContext, cassandraDataSet);
datasetIterator = dataset.listIterator();

Cluster cluster = new Cluster.Builder().addContactPoints(host).withPort(nativeTransportPort).build();
Session session = cluster.connect();

CQLDataLoader cqlDataLoader = new CQLDataLoader(session);
CQLDataLoader cqlDataLoader = new CQLDataLoader(EmbeddedCassandraServerHelper.getSession());
while (datasetIterator.hasNext()) {
String next = datasetIterator.next();
boolean dropAndCreateKeyspace = datasetIterator.previousIndex() == 0;
Expand All @@ -73,6 +66,11 @@ protected void startServer(TestContext testContext) throws Exception {
default:
dataset = dataSetLocations(testContext, cassandraDataSet);
datasetIterator = dataset.listIterator();

String clusterName = EmbeddedCassandraServerHelper.getClusterName();
String host = EmbeddedCassandraServerHelper.getHost();
int rpcPort = EmbeddedCassandraServerHelper.getRpcPort();

DataLoader dataLoader = new DataLoader(clusterName, host + ":" + rpcPort);
while (datasetIterator.hasNext()) {
String next = datasetIterator.next();
Expand Down
Expand Up @@ -3,6 +3,7 @@
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
Expand Down Expand Up @@ -32,11 +33,7 @@ public void should_work_twice() {
}

private void test() {
Cluster cluster = Cluster.builder()
.addContactPoints("127.0.0.1")
.withPort(9142)
.build();
Session session = cluster.connect("cassandra_unit_keyspace");
Session session = EmbeddedCassandraServerHelper.getSession();
ResultSet result = session.execute("select * from testCQLTable1 WHERE id=1690e8da-5bf8-49e8-9583-4dff8a570717");
String val = result.iterator().next().getString("value");
assertEquals("1- Cql loaded string", val);
Expand Down
Expand Up @@ -3,6 +3,7 @@
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
Expand Down Expand Up @@ -32,11 +33,7 @@ public void should_work_twice() {
}

private void test() {
Cluster cluster = Cluster.builder()
.addContactPoints("127.0.0.1")
.withPort(9142)
.build();
Session session = cluster.connect("cassandra_unit_keyspace");
Session session = EmbeddedCassandraServerHelper.getSession();
ResultSet result = session.execute("select * from testCQLTableRootLocation WHERE id=1690e8da-5bf8-49e8-9583-4dff8a570797");
String val = result.iterator().next().getString("value");
assertEquals("Root- Cql loaded string", val);
Expand Down
Expand Up @@ -3,6 +3,7 @@
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
Expand Down Expand Up @@ -32,11 +33,7 @@ public void should_work_twice() {
}

private void test() {
Cluster cluster = Cluster.builder()
.addContactPoints("127.0.0.1")
.withPort(9142)
.build();
Session session = cluster.connect("cassandra_unit_keyspace");
Session session = EmbeddedCassandraServerHelper.getSession();

ResultSet result = session.execute("select * from testCQLTable1 WHERE id=1690e8da-5bf8-49e8-9583-4dff8a570717");
String val = result.iterator().next().getString("value");
Expand Down
Expand Up @@ -3,6 +3,7 @@
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
Expand Down Expand Up @@ -31,11 +32,7 @@ public void should_work_twice() {
}

private void test() {
Cluster cluster = Cluster.builder()
.addContactPoints("127.0.0.1")
.withPort(9142)
.build();
Session session = cluster.connect("cassandra_unit_keyspace");
Session session = EmbeddedCassandraServerHelper.getSession();
ResultSet result = session.execute("select * from testCQLTable WHERE id=1690e8da-5bf8-49e8-9583-4dff8a570737");
String val = result.iterator().next().getString("value");
assertEquals("Cql loaded string", val);
Expand Down
Expand Up @@ -3,6 +3,7 @@
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
Expand Down Expand Up @@ -32,11 +33,7 @@ public void should_work_twice() {
}

private void test() {
Cluster cluster = Cluster.builder()
.addContactPoints("127.0.0.1")
.withPort(9142)
.build();
Session session = cluster.connect("myownkeyspace");
Session session = EmbeddedCassandraServerHelper.getSession();
ResultSet result = session.execute("select * from testCQLTableKS WHERE id=1690e8da-5bf8-49e8-9583-4dff8a570787");
String val = result.iterator().next().getString("value");
assertEquals("KS- Cql loaded string", val);
Expand Down
Expand Up @@ -4,6 +4,7 @@
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;

import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -31,11 +32,7 @@ public class CassandraStartAndLoadWithSpringDependencyInjectionTest {

@Test
public void should_work() {
Cluster cluster = Cluster.builder()
.addContactPoints("127.0.0.1")
.withPort(9142)
.build();
Session session = cluster.connect("cassandra_unit_keyspace");
Session session = EmbeddedCassandraServerHelper.getSession();
ResultSet result = session.execute("select * from testCQLTable WHERE id=1690e8da-5bf8-49e8-9583-4dff8a570737");
String val = result.iterator().next().getString("value");
assertEquals("Cql loaded string", val);
Expand Down
Expand Up @@ -3,6 +3,7 @@
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
Expand Down Expand Up @@ -32,11 +33,7 @@ public void should_work_twice() {
}

private void test() {
Cluster cluster = Cluster.builder()
.addContactPoints("127.0.0.1")
.withPort(9142)
.build();
Session session = cluster.connect("cassandra_unit_keyspace");
Session session = EmbeddedCassandraServerHelper.getSession();
ResultSet result = session.execute("select * from testCQLTable WHERE id=1690e8da-5bf8-49e8-9583-4dff8a570737");
String val = result.iterator().next().getString("value");
assertEquals("Cql loaded string", val);
Expand Down
Expand Up @@ -5,6 +5,7 @@

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import org.cassandraunit.utils.EmbeddedCassandraServerHelper;

/**
* @author Gaëtan Le Brun
Expand All @@ -27,22 +28,8 @@ public static int getInstancesCounter() {
return instancesCounter;
}

@PostConstruct
public void init() {
cluster = Cluster.builder()
.addContactPoints("127.0.0.1")
.withPort(9142)
.build();
session = cluster.connect("cassandra_unit_keyspace");
}

@PreDestroy
public void preDestroy() {
session.close();
cluster.close();
}

public Session getSession() {
return session;
return EmbeddedCassandraServerHelper.getSession();
}
}
Expand Up @@ -57,11 +57,8 @@ public CassandraCQLUnit(CQLDataSet dataSet, String configurationFileName, long s

@Override
protected void load() {
String hostIp = EmbeddedCassandraServerHelper.getHost();
int port = EmbeddedCassandraServerHelper.getNativeTransportPort();
cluster = new Cluster.Builder().addContactPoints(hostIp).withPort(port).withSocketOptions(getSocketOptions())
.build();
session = cluster.connect();
cluster = EmbeddedCassandraServerHelper.getCluster();
session = EmbeddedCassandraServerHelper.getSession();
CQLDataLoader dataLoader = new CQLDataLoader(session);
dataLoader.load(dataSet);
session = dataLoader.getSession();
Expand All @@ -70,10 +67,6 @@ protected void load() {
@Override
protected void after() {
super.after();
try (Cluster c = cluster; Session s = session) {
session = null;
cluster = null;
}
}

// Getters for those who do not like to directly access fields
Expand Down
@@ -1,6 +1,7 @@
package org.cassandraunit.utils;

import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.TableMetadata;
import me.prettyprint.cassandra.service.CassandraHostConfigurator;
import me.prettyprint.hector.api.Cluster;
Expand Down Expand Up @@ -54,6 +55,8 @@ public class EmbeddedCassandraServerHelper {

private static CassandraDaemon cassandraDaemon = null;
private static String launchedYamlFile;
private static com.datastax.driver.core.Cluster cluster;
private static Session session;

public static void startEmbeddedCassandra() throws TTransportException, IOException, InterruptedException, ConfigurationException {
startEmbeddedCassandra(DEFAULT_STARTUP_TIMEOUT);
Expand Down Expand Up @@ -136,6 +139,21 @@ public void run() {
log.error("Cassandra daemon did not start after " + timeout + " ms. Consider increasing the timeout");
throw new AssertionError("Cassandra daemon did not start within timeout");
}

cluster = com.datastax.driver.core.Cluster.builder()
.addContactPoints(EmbeddedCassandraServerHelper.getHost())
.withPort(EmbeddedCassandraServerHelper.getNativeTransportPort())
.build();

session = cluster.connect();

Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
session.close();
cluster.close();
}
}));
} catch (InterruptedException e) {
log.error("Interrupted waiting for Cassandra daemon to start:", e);
throw new AssertionError(e);
Expand Down Expand Up @@ -180,6 +198,14 @@ public static void cleanDataEmbeddedCassandra(String keyspace, String... exclude
}
}

public static com.datastax.driver.core.Cluster getCluster() {
return cluster;
}

public static Session getSession() {
return session;
}

/**
* Get the embedded cassandra cluster name
*
Expand Down Expand Up @@ -217,19 +243,13 @@ public static int getNativeTransportPort() {
}

private static void cleanDataWithNativeDriver(String keyspace, String... excludedTables) {
String host = DatabaseDescriptor.getRpcAddress().getHostName();
int port = DatabaseDescriptor.getNativeTransportPort();
try (com.datastax.driver.core.Cluster cluster =
com.datastax.driver.core.Cluster.builder().addContactPoint(host).withPort(port).build();
com.datastax.driver.core.Session session = cluster.connect()) {
final KeyspaceMetadata keyspaceMetadata = cluster.getMetadata().getKeyspace(keyspace);
final Collection<TableMetadata> tables = keyspaceMetadata.getTables();
List<String> excludeTableList = Arrays.asList(excludedTables);
for (TableMetadata table : tables) {
final String tableName = table.getName();
if (!excludeTableList.contains(tableName)) {
session.execute("truncate table " + tableName);
}
final KeyspaceMetadata keyspaceMetadata = cluster.getMetadata().getKeyspace(keyspace);
final Collection<TableMetadata> tables = keyspaceMetadata.getTables();
List<String> excludeTableList = Arrays.asList(excludedTables);
for (TableMetadata table : tables) {
final String tableName = table.getName();
if (!excludeTableList.contains(tableName)) {
session.execute("truncate table " + tableName);
}
}
}
Expand Down Expand Up @@ -288,21 +308,15 @@ private static void dropKeyspacesWithHector() {
}

private static void dropKeyspacesWithNativeDriver() {
String host = DatabaseDescriptor.getRpcAddress().getHostName();
int port = DatabaseDescriptor.getNativeTransportPort();
try (com.datastax.driver.core.Cluster cluster =
com.datastax.driver.core.Cluster.builder().addContactPoint(host).withPort(port).build();
com.datastax.driver.core.Session session = cluster.connect()) {
List<String> keyspaces = new ArrayList<String>();
for (com.datastax.driver.core.KeyspaceMetadata keyspace : cluster.getMetadata().getKeyspaces()) {
if (!isSystemKeyspaceName(keyspace.getName())) {
keyspaces.add(keyspace.getName());
}
}
for (String keyspace : keyspaces) {
session.execute("DROP KEYSPACE " + keyspace);
List<String> keyspaces = new ArrayList<String>();
for (com.datastax.driver.core.KeyspaceMetadata keyspace : cluster.getMetadata().getKeyspaces()) {
if (!isSystemKeyspaceName(keyspace.getName())) {
keyspaces.add(keyspace.getName());
}
}
for (String keyspace : keyspaces) {
session.execute("DROP KEYSPACE " + keyspace);
}
}

private static boolean isSystemKeyspaceName(String keyspaceName) {
Expand Down

0 comments on commit 2b231f4

Please sign in to comment.