Skip to content

Commit

Permalink
added base class, example data loader
Browse files Browse the repository at this point in the history
  • Loading branch information
zznate committed Feb 22, 2011
1 parent 9ec0bed commit 4741a5f
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 10 deletions.
139 changes: 139 additions & 0 deletions src/main/java/com/datastax/tutorial/NpanxxDatasetLoader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package com.datastax.tutorial;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.mutation.Mutator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Load the npanxx data from a file. An example of a multi-threaded file loader
* using batch_mutate via Hector's Mutator
*
* @author zznate
*/
public class NpanxxDatasetLoader extends TutorialBase {
private static Logger log = LoggerFactory.getLogger(NpanxxDatasetLoader.class);

private static ExecutorService exec;

public static void main(String[] args) {
long startTime = System.currentTimeMillis();
init();
String fileLocation = properties.getProperty("npanxx.file.location");
BufferedReader reader;
exec = Executors.newFixedThreadPool(10);
try {
reader = new BufferedReader(new InputStreamReader(new FileInputStream(fileLocation)));
// read 1000 and hand off to worker

List<String> lines = new ArrayList<String>(1000);
String line = reader.readLine();

List<Future<Integer>> sums = new ArrayList<Future<Integer>>();
while(line != null) {

lines.add(line);
if ( lines.size() % 1000 == 0 ) {
doParse(lines, sums);
}
line = reader.readLine();
}
doParse(lines, sums);

int total = 0;
for (Future<Integer> future : sums) {
total = total + future.get().intValue();
}

log.info("Found total: {}", total);
exec.shutdown();
log.info("duration in ms: {}",System.currentTimeMillis() - startTime);
} catch (Exception e) {
log.error("Could not locate file",e);
}
tutorialCluster.getConnectionManager().shutdown();
}

private static void doParse(List<String> lines, List<Future<Integer>> sums) {
Future<Integer> f = exec.submit(new NpanxxDatasetLoader().new LineParser(new ArrayList(lines)));
sums.add(f);
lines.clear();
}

class LineParser implements Callable<Integer> {

List<String> lines;
LineParser(List<String> lines) {
this.lines = lines;
}

public Integer call() throws Exception {
int count = 0;
NpanxxLine npanxxLine;
Mutator<String> mutator = HFactory.createMutator(tutorialKeyspace, StringSerializer.get());
for (String row : lines) {
// parse
npanxxLine = new NpanxxLine(row);
//
mutator.addInsertion(npanxxLine.getNpa()+npanxxLine.getNxx(), "Npanxx", HFactory.createStringColumn("city", npanxxLine.getCity()));
mutator.addInsertion(npanxxLine.getNpa()+npanxxLine.getNxx(), "Npanxx", HFactory.createStringColumn("lat", Double.toString(npanxxLine.getLat())));
mutator.addInsertion(npanxxLine.getNpa()+npanxxLine.getNxx(), "Npanxx", HFactory.createStringColumn("lng", Double.toString(npanxxLine.getLng())));
mutator.addInsertion(npanxxLine.getNpa()+npanxxLine.getNxx(), "Npanxx", HFactory.createStringColumn("state", npanxxLine.getState()));
if ( count % 250 == 0 ) {
mutator.execute();
mutator.discardPendingMutations();
}
count++;
}
mutator.execute();
// TODO Auto-generated method stub
log.info("found count {}", count);
return Integer.valueOf(count);
}

}

static class NpanxxLine {
private String[] vals = new String[10];

NpanxxLine(String line) {
vals = line.split("\\s");
//log.info("found vals: {}", vals[0]);
}

String getNpa() {
return vals[0];
}
String getNxx() {
return vals[1];
}
double getLat() {
return Double.parseDouble(vals[2]);
}
double getLng() {
return Double.parseDouble(vals[3]);
}
String getState() {
return vals[5];
}
String getCity() {
StringBuilder cityName = new StringBuilder(56);
for (int i = 6; i < vals.length; i++) {
cityName.append(vals[i]).append(" ");
}
return cityName.toString();
}
}
}
35 changes: 35 additions & 0 deletions src/main/java/com/datastax/tutorial/TutorialBase.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.datastax.tutorial;

import java.io.IOException;
import java.util.Properties;

import me.prettyprint.cassandra.model.ConfigurableConsistencyLevel;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.HConsistencyLevel;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.factory.HFactory;

public class TutorialBase {
static Cluster tutorialCluster;
static Keyspace tutorialKeyspace;
static Properties properties;

protected static void init() {
properties = new Properties();
try {
properties.load(TutorialBase.class.getResourceAsStream("/tutorial.properties"));
} catch (IOException ioe) {
ioe.printStackTrace();
}
// To modify the default ConsistencyLevel of QUORUM, create a
// me.prettyprint.hector.api.ConsistencyLevelPolicy and use the overloaded form:
// tutorialKeyspace = HFactory.createKeyspace("Tutorial", tutorialCluster, consistencyLevelPolicy);
// see also me.prettyprint.cassandra.model.ConfigurableConsistencyLevelPolicy[Test] for details

tutorialCluster = HFactory.getOrCreateCluster(properties.getProperty("cluster.name", "TutorialCluster"),
properties.getProperty("cluster.hosts", "127.0.0.1:9160"));
ConfigurableConsistencyLevel ccl = new ConfigurableConsistencyLevel();
ccl.setDefaultReadConsistencyLevel(HConsistencyLevel.ONE);
tutorialKeyspace = HFactory.createKeyspace(properties.getProperty("tutorial.keyspace", "Tutorial"), tutorialCluster, ccl);
}
}
15 changes: 5 additions & 10 deletions src/main/java/com/datastax/tutorial/TutorialRunner.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.datastax.tutorial;

import me.prettyprint.cassandra.model.ConfigurableConsistencyLevel;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.HConsistencyLevel;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.beans.Row;
import me.prettyprint.hector.api.beans.Rows;
Expand All @@ -23,11 +25,10 @@
*
* @author zznate
*/
public class TutorialRunner {
public class TutorialRunner extends TutorialBase {
private static Logger log = LoggerFactory.getLogger(TutorialRunner.class);

static Cluster tutorialCluster;
static Keyspace tutorialKeyspace;


/**
* Creates a Cluster with a bunch of defaults, generally matching the
Expand All @@ -40,14 +41,8 @@ public class TutorialRunner {
* @param args
*/
public static void main(String[] args) {
tutorialCluster = HFactory.getOrCreateCluster("TestCluster", "localhost:9160");
init();

tutorialKeyspace = HFactory.createKeyspace("Tutorial", tutorialCluster);
// To modify the default ConsistencyLevel of QUORUM, create a
// me.prettyprint.hector.api.ConsistencyLevelPolicy and use the overloaded form:
// tutorialKeyspace = HFactory.createKeyspace("Tutorial", tutorialCluster, consistencyLevelPolicy);
// see also me.prettyprint.cassandra.model.ConfigurableConsistencyLevelPolicy[Test] for details

TutorialCommand command = loadCommand(args[0]);
if ( command != null ) {
try {
Expand Down
6 changes: 6 additions & 0 deletions src/main/resources/tutorial.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# general cluster identity
cluster.name=TutorialCluster
cluster.hosts=127.0.0.1:9161,127.0.0.1:9162,127.0.0.1:9163
tutorial.keyspace=Tutorial
# File loading
npanxx.file.location=/tmp/npanxx99.txt

0 comments on commit 4741a5f

Please sign in to comment.