Permalink
Browse files

added composite testing examples

  • Loading branch information...
1 parent 9196632 commit 8cadfa2f717fd7cdbb7317b15a5284274c3bd3f3 zznate committed Jan 16, 2012
@@ -0,0 +1,67 @@
+package com.datastax.tutorial;
+
+import java.util.List;
+
+import me.prettyprint.cassandra.serializers.CompositeSerializer;
+import me.prettyprint.cassandra.serializers.LongSerializer;
+import me.prettyprint.hector.api.Keyspace;
+import me.prettyprint.hector.api.ResultStatus;
+import me.prettyprint.hector.api.beans.AbstractComposite;
+import me.prettyprint.hector.api.beans.ColumnSlice;
+import me.prettyprint.hector.api.beans.Composite;
+import me.prettyprint.hector.api.beans.HColumn;
+import me.prettyprint.hector.api.factory.HFactory;
+import me.prettyprint.hector.api.query.QueryResult;
+import me.prettyprint.hector.api.query.SliceQuery;
+
+/**
+ *
+ * To run this example from maven:
+ * mvn -e exec:java -Dexec.args="comp_ranges_with_long" -Dexec.mainClass="com.datastax.tutorial.TutorialRunner"
+ */
+public class CompositeRangesWithLong extends TutorialCommand {
+
+ private static final CompositeSerializer cs = new CompositeSerializer();
+
+ public CompositeRangesWithLong(Keyspace keyspace) {
+ super(keyspace);
+ }
+
+ @Override
+ public ResultStatus execute() {
+ SliceQuery<String, Composite, String> sliceQuery =
+ HFactory.createSliceQuery(keyspace, stringSerializer, cs, stringSerializer);
+ sliceQuery.setColumnFamily("CompositeRangesWithLong");
+ sliceQuery.setKey("x");
+
+ Composite startRange = new Composite();
+ startRange.addComponent(0,new Long(0), AbstractComposite.ComponentEquality.EQUAL);
+ //startRange.addComponent(new Long(0), LongSerializer.get());
+
+ Composite endRange = new Composite();
+ endRange.addComponent(0, new Long(0), AbstractComposite.ComponentEquality.GREATER_THAN_EQUAL);
+
+ sliceQuery.setRange(startRange, endRange, false, 100);
+
+ QueryResult<ColumnSlice<Composite,String>> r = sliceQuery.execute();
+ ColumnSlice<Composite, String> slice = r.get();
+
+ List<HColumn<Composite, String>> l = slice.getColumns();
+
+ System.out.println("Querying Component1 as <= " + "1" + ". Result: # columns in slice: " + l.size() + ", Columns:");
+ for (int i=0;i<l.size();i++) {
+ System.out.println(" Component1: " + l.get(i).getName().get(0, LongSerializer.get()) +
+ ", Component2: " + l.get(i).getName().get(1, LongSerializer.get()));
+ }
+
+
+
+ return null;
+ }
+
+}
+/*
+create column family CompositeRangesWithLong
+with column_type = 'Standard'
+and comparator = 'CompositeType(org.apache.cassandra.db.marshal.LongType,org.apache.cassandra.db.marshal.LongType)';
+*/
@@ -2,10 +2,12 @@
import me.prettyprint.cassandra.model.HColumnImpl;
import me.prettyprint.cassandra.serializers.CompositeSerializer;
+import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.ResultStatus;
import me.prettyprint.hector.api.beans.ColumnSlice;
import me.prettyprint.hector.api.beans.Composite;
+import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.mutation.Mutator;
import me.prettyprint.hector.api.query.QueryResult;
@@ -67,6 +69,16 @@ public ResultStatus execute() {
column.setName(dc);
column.setValue("TIME WARNER COMMUNICATIONS AXS OF AUSTIN, TX");
mutator.addInsertion("TX:512", "StateNpaIndexStatic", column);
+
+ column = new HColumnImpl<Composite, String>(cs, stringSerializer);
+ column.setClock(keyspace.createClock());
+ dc = new Composite();
+ dc.add(0, "Aardvark");
+ dc.add(1, 5830L);
+ dc.add(2, 215L);
+ column.setName(dc);
+ column.setValue("Aardvark telco");
+ mutator.addInsertion("TX:512", "StateNpaIndexStatic", column);
column = new HColumnImpl<Composite, String>(cs, stringSerializer);
column.setClock(keyspace.createClock());
@@ -86,22 +98,28 @@ public ResultStatus execute() {
sliceQuery.setKey("TX:512");
Composite startRange = new Composite();
- startRange.add(0, "Austin");
+ startRange.add(0, "A");
//startRange.add(1, 7516L);
//startRange.addComponent(new Long(0), LongSerializer.get(), "LongType", AbstractComposite.ComponentEquality.GREATER_THAN_EQUAL);
//startRange.addComponent(439L, LongSerializer.get(), "LongType", AbstractComposite.ComponentEquality.EQUAL);
//startRange.add(1, Long.MIN_VALUE);
//startRange.add(2, 439L);
- Composite endRange = new Composite();
- endRange.add(0, "Austin" + Character.MAX_VALUE);
+ Composite endRange = new Composite();
+ //endRange.add(0, "Austin" + Character.MAX_VALUE);
+ endRange.add(0, "B");
// the following statement is effectively identical for the purposes of restricting to 'Austin'
//endRange.addComponent("Austin", StringSerializer.get(), "UTF8Type", AbstractComposite.ComponentEquality.GREATER_THAN_EQUAL);
sliceQuery.setRange(startRange, endRange, false, 10);
QueryResult<ColumnSlice<Composite, String>> result = sliceQuery.execute();
+ ColumnSlice<Composite, String> cs = result.get();
+ for ( HColumn<Composite, String> col: cs.getColumns() ) {
+ System.out.println(col.getName().getComponents());
+ System.out.println(col.getName().get(0, StringSerializer.get()));
+ }
return result;
@@ -10,9 +10,9 @@
import me.prettyprint.hector.api.factory.HFactory;
public class TutorialBase {
- static Cluster tutorialCluster;
- static Keyspace tutorialKeyspace;
- static Properties properties;
+ protected static Cluster tutorialCluster;
+ protected static Keyspace tutorialKeyspace;
+ protected static Properties properties;
protected static void init() {
properties = new Properties();
@@ -2,17 +2,11 @@
import java.util.Iterator;
-import me.prettyprint.cassandra.model.ConfigurableConsistencyLevel;
-import me.prettyprint.hector.api.Cluster;
-import me.prettyprint.hector.api.HConsistencyLevel;
-import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.ResultStatus;
import me.prettyprint.hector.api.beans.ColumnSlice;
import me.prettyprint.hector.api.beans.Row;
import me.prettyprint.hector.api.beans.Rows;
-import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.query.QueryResult;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,8 +32,8 @@
* Creates a Cluster with a bunch of defaults, generally matching the
* default configuration of Apache Cassandra.
*
- * For customization, see {@link CassandraHostConfigurator} and the
- * corresponding {@link HFactory#createCluster(String, CassandraHostConfigurator)}
+ * For customization, see {@link me.prettyprint.cassandra.service.CassandraHostConfigurator} and the
+ * corresponding HFactory#createCluster(String, CassandraHostConfigurator)
* in Hector.
*
* @param args
@@ -131,6 +125,8 @@ private static TutorialCommand loadCommand(String cmd) {
return new StaticCompositeIndex(tutorialKeyspace);
} else if ( cmd.equalsIgnoreCase("dynamic_comp_index")) {
return new DynamicCompositeIndex(tutorialKeyspace);
+ } else if ( cmd.equalsIgnoreCase("comp_ranges_with_long")) {
+ return new CompositeRangesWithLong(tutorialKeyspace);
}
log.error(" ***OOPS! No match found for {}.", cmd);
return null;
@@ -0,0 +1,149 @@
+package com.datastax.tutorial.composite;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import com.datastax.tutorial.TutorialBase;
+import me.prettyprint.cassandra.serializers.CompositeSerializer;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.hector.api.beans.Composite;
+import me.prettyprint.hector.api.beans.HColumn;
+import me.prettyprint.hector.api.factory.HFactory;
+import me.prettyprint.hector.api.mutation.Mutator;
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * mvn -e exec:java -Dexec.mainClass="com.datastax.tutorial.composite.CompositeDataLoader"
+ * @author zznate
+ */
+public class CompositeDataLoader extends TutorialBase {
+ private static Logger log = LoggerFactory.getLogger(CompositeDataLoader.class);
+
+ private static ExecutorService exec;
+ // key for static composite, First row of dynamic composite
+ public static final String COMPOSITE_KEY = "ALL";
+
+ public static void main(String[] args) {
+ long startTime = System.currentTimeMillis();
+ init();
+ String fileLocation = properties.getProperty("composites.geodata.file.location","data/geodata.txt");
+ BufferedReader reader;
+ exec = Executors.newFixedThreadPool(5);
+ try {
+ reader = new BufferedReader(new InputStreamReader(new FileInputStream(fileLocation)));
+ // read 1000 and hand off to worker
+
+ List<String> lines = new ArrayList<String>(1000);
+ String line = reader.readLine();
+
+ List<Future<Integer>> sums = new ArrayList<Future<Integer>>();
+ while(line != null) {
+
+ lines.add(line);
+ if ( lines.size() % 250 == 0 ) {
+ doParse(lines, sums);
+ }
+ line = reader.readLine();
+ }
+ doParse(lines, sums);
+
+ int total = 0;
+ for (Future<Integer> future : sums) {
+ // naive wait for completion
+ total = total + future.get().intValue();
+ }
+
+ log.info("Inserted a total of {} over duration ms: {}", total, System.currentTimeMillis() - startTime);
+ } catch (Exception e) {
+ log.error("Could not locate file",e);
+ } finally {
+ exec.shutdown();
+ }
+ tutorialCluster.getConnectionManager().shutdown();
+ }
+
+ private static void doParse(List<String> lines, List<Future<Integer>> sums) {
+ Future<Integer> f = exec.submit(new CompositeDataLoader().new LineParser(new ArrayList(lines)));
+ sums.add(f);
+ lines.clear();
+ }
+
+
+ class LineParser implements Callable<Integer> {
+
+ List<String> lines;
+ LineParser(List<String> lines) {
+ this.lines = lines;
+ }
+
+ public Integer call() throws Exception {
+ int count = 0;
+ GeoDataLine geoDataLine;
+ Mutator<String> mutator = HFactory.createMutator(tutorialKeyspace, StringSerializer.get());
+
+ for (String row : lines) {
+ // parse
+ geoDataLine = new GeoDataLine(row);
+ // assemble the insertions
+ // first, the static composite
+ mutator.addInsertion(COMPOSITE_KEY, "CountryStateCity", geoDataLine.staticColumnFrom());
+
+ count++;
+ }
+ mutator.execute();
+ log.info("found count {}", count);
+ return Integer.valueOf(count);
+ }
+
+ }
+
+ static class GeoDataLine {
+ private String[] vals = new String[10];
+
+ GeoDataLine(String line) {
+ vals = StringUtils.split(StringEscapeUtils.unescapeCsv(line), ',');
+ log.debug("array size: {} for row: {}", vals.length, line);
+ }
+
+ /**
+ * Creates an HColumn with a column name composite of the form:
+ * ['country_code']:['admin1_code']:['asciiname'])
+ * and a value of ['name']
+ * @return
+ */
+ HColumn<Composite,String> staticColumnFrom() {
+
+ Composite composite = new Composite();
+ composite.addComponent(getCountryCode(), StringSerializer.get());
+ composite.addComponent(getAdmin1Code(), StringSerializer.get());
+ composite.addComponent(getAsciiName(), StringSerializer.get());
+ HColumn<Composite,String> col =
+ HFactory.createColumn(composite, getTimezone(), new CompositeSerializer(), StringSerializer.get());
+ return col;
+ }
+
+
+ String getCountryCode() {
+ return vals[0];
+ }
+ String getAdmin1Code() {
+ return vals[1];
+ }
+ String getAsciiName() {
+ return vals[2];
+ }
+ String getTimezone() {
+ return vals[3];
+ }
+ }
+}
Oops, something went wrong.

0 comments on commit 8cadfa2

Please sign in to comment.