Skip to content

Commit

Permalink
Pull some helper test stuff into a CassandraTestUtil class
Browse files Browse the repository at this point in the history
  • Loading branch information
pingles committed Oct 24, 2011
1 parent d7db16e commit 4b15b2e
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 46 deletions.
31 changes: 29 additions & 2 deletions src/test/java/org/pingles/cascading/cassandra/CassandraClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.apache.cassandra.thrift.CfDef;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.KsDef;
import org.apache.cassandra.thrift.NotFoundException;
import org.apache.cassandra.thrift.SchemaDisagreementException;
import org.apache.thrift.TException;
import org.apache.cassandra.thrift.TBinaryProtocol;
Expand All @@ -13,7 +14,6 @@
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -59,13 +59,40 @@ public void close() {

public boolean keyspaceExists(String keyspaceName) throws TException, InvalidRequestException {
List<KsDef> keyspaces = describeKeyspaces();

for (KsDef ksDef : keyspaces) {
if (ksDef.name.equals(keyspaceName)) {
return true;
}
}
return false;
}

public boolean columnFamilyExists(String keyspace, String columnFamily) throws TException, NotFoundException, InvalidRequestException {
List<CfDef> columnFamilies = listColumnFamilies(keyspace);
for (CfDef cfDef : columnFamilies) {
if (cfDef.name.equals(columnFamily)) {
return true;
}
}
return false;
}

private List<CfDef> listColumnFamilies(String keyspace) throws TException, NotFoundException, InvalidRequestException {
client.send_describe_keyspace(keyspace);
KsDef ksDef = client.recv_describe_keyspace();
return ksDef.cf_defs;
}

public String createColumnFamily(String keyspace, String name) throws TException, SchemaDisagreementException, InvalidRequestException {
CfDef cfDef = new CfDef();
cfDef.name = name;
cfDef.keyspace = keyspace;

client.send_set_keyspace(keyspace);
client.recv_set_keyspace();

client.send_system_add_column_family(cfDef);
String s = client.recv_system_add_column_family();
return s;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,28 @@
import cascading.tap.Lfs;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import junit.framework.TestCase;
import org.apache.cassandra.service.EmbeddedCassandraService;
import org.apache.cassandra.thrift.KsDef;
import org.apache.cassandra.thrift.Cassandra;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;

import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

@RunWith(JUnit4.class)
public class CassandraSchemeTest {
private static final Logger LOGGER = LoggerFactory.getLogger(CassandraSchemeTest.class);
private static EmbeddedCassandraService cassandra;

private static final String CASSANDRA_YAML = "./src/test/resources/cassandra.yaml";
private String inputFile = "./src/test/data/small.txt";
private final String keyspaceName = "TestKeyspace";
private final String columnFamilyName = "TestColumnFamily";
private static EmbeddedCassandraService cassandra;
transient private static Map<Object, Object> properties = new HashMap<Object, Object>();

@BeforeClass
public static void startCassandra() {
Expand All @@ -48,26 +43,11 @@ public static void startCassandra() {
}

@Before
public void ensureTestKeyspace() throws Exception {
CassandraClient client = new CassandraClient(getRpcHost(), getRpcPort());
client.open();
StringBuilder sb = new StringBuilder();
for (KsDef ks : client.describeKeyspaces()) {
if (sb.length() > 0) {
sb.append(", ");
}
sb.append(ks.name);
}
LOGGER.info("Current keyspaces: {}", sb.toString());
if (!client.keyspaceExists(keyspaceName)) {
LOGGER.info("Creating test keyspace {}", keyspaceName);
client.createKeyspace(keyspaceName);
}
client.close();
public void ensureCassandraTestStores() throws Exception {
CassandraTestUtil.ensureKeyspace(keyspaceName);
CassandraTestUtil.ensureColumnFamily(keyspaceName, columnFamilyName);
}

transient private static Map<Object, Object> properties = new HashMap<Object, Object>();

@Test
public void testSomething() {
// create flow to read from local file and insert into HBase
Expand All @@ -76,29 +56,12 @@ public void testSomething() {
Fields keyFields = new Fields("num");
Fields valueFields = new Fields("lower", "upper");

Tap sink = new CassandraTap(getRpcHost(), getRpcPort(), keyspaceName, columnFamilyName, new CassandraScheme(keyFields, valueFields));
Tap sink = new CassandraTap(CassandraTestUtil.getRpcHost(), CassandraTestUtil.getRpcPort(), keyspaceName, columnFamilyName, new CassandraScheme(keyFields, valueFields));

Flow parseFlow = new FlowConnector(properties).connect(source, sink, parsePipe);
parseFlow.complete();

assert(true);
}

protected Integer getRpcPort() {
return (Integer) getCassandraConfiguration("rpc_port");
}

protected String getRpcHost() {
return "127.0.0.1";
}

private Object getCassandraConfiguration(String fieldName) {
Yaml y = new Yaml();
try {
Map m = (Map) y.load(new FileReader(CASSANDRA_YAML));
return m.get(fieldName);
} catch (FileNotFoundException e) {
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package org.pingles.cascading.cassandra;

import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.NotFoundException;
import org.apache.cassandra.thrift.SchemaDisagreementException;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;

import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Map;

public class CassandraTestUtil {
private static final Logger LOGGER = LoggerFactory.getLogger(CassandraSchemeTest.class);
private static final String CASSANDRA_YAML = "./src/test/resources/cassandra.yaml";

public static void ensureKeyspace(String name) throws TException, InterruptedException, InvalidRequestException, SchemaDisagreementException {
CassandraClient client = new CassandraClient(getRpcHost(), getRpcPort());
client.open();
if (!client.keyspaceExists(name)) {
LOGGER.info("Creating test keyspace {}", name);
client.createKeyspace(name);
}
client.close();
}

public static void ensureColumnFamily(String keyspace, String columnFamily) throws TException, InterruptedException, NotFoundException, InvalidRequestException, SchemaDisagreementException {
CassandraClient client = new CassandraClient(getRpcHost(), getRpcPort());
client.open();
if (!client.columnFamilyExists(keyspace, columnFamily)) {
LOGGER.info("Creating test column family {}", columnFamily);
client.createColumnFamily(keyspace, columnFamily);
}
client.close();
}

public static Integer getRpcPort() {
return (Integer) getCassandraConfiguration("rpc_port");
}

public static String getRpcHost() {
return "127.0.0.1";
}

public static Object getCassandraConfiguration(String fieldName) {
Yaml y = new Yaml();
try {
Map m = (Map) y.load(new FileReader(CASSANDRA_YAML));
return m.get(fieldName);
} catch (FileNotFoundException e) {
return null;
}
}
}

0 comments on commit 4b15b2e

Please sign in to comment.