Skip to content

Commit

Permalink
Merge internal changes
Browse files Browse the repository at this point in the history
  • Loading branch information
ptgoetz committed Jun 10, 2013
2 parents aacaee0 + 018e2b4 commit 8315230
Show file tree
Hide file tree
Showing 26 changed files with 458 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

package com.hmsonline.storm.cassandra.example;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -39,7 +40,7 @@ public static void main(String[] args) throws Exception {
String configKey = "cassandra-config";
HashMap<String, Object> clientConfig = new HashMap<String, Object>();
clientConfig.put(StormCassandraConstants.CASSANDRA_HOST, "localhost:9160");
clientConfig.put(StormCassandraConstants.CASSANDRA_KEYSPACE, "stormks");
clientConfig.put(StormCassandraConstants.CASSANDRA_KEYSPACE, Arrays.asList(new String [] {"stormks"}));
config.put(configKey, clientConfig);

// DelimitedColumnLookupBolt tweetersBolt =
Expand All @@ -51,14 +52,14 @@ public static void main(String[] args) throws Exception {
// "followers", ":", "rowKey", "follower", true);

// cf = "tweeters", rowkey = tuple["url"]
TupleMapper<String, String, String> tweetersTupleMapper = new DefaultTupleMapper("tweeters", "url");
TupleMapper<String, String, String> tweetersTupleMapper = new DefaultTupleMapper("stormks", "tweeters", "url");
// cf (url -> tweeters) -> emit(url, follower)
ColumnMapper<String, String, String> tweetersColumnsMapper = new ValuelessColumnsMapper("url", "tweeter", true);
CassandraLookupBolt<String, String, String> tweetersBolt = new CassandraLookupBolt<String, String, String>(configKey,
tweetersTupleMapper, tweetersColumnsMapper);

// cf = "followers", rowkey = tuple["tweeter"]
TupleMapper<String, String, String> followersTupleMapper = new DefaultTupleMapper("followers", "tweeter");
TupleMapper<String, String, String> followersTupleMapper = new DefaultTupleMapper("stormks", "followers", "tweeter");
// cf (tweeter -> followers) ==> emit(url, follower)
ValuelessColumnsMapper followersColumnsMapper = new ValuelessColumnsMapper("url", "follower", true);
CassandraLookupBolt<String, String, String> followersBolt = new CassandraLookupBolt<String, String, String>(configKey,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.hmsonline.storm.cassandra.example;

import java.util.Arrays;
import java.util.HashMap;

import backtype.storm.Config;
Expand All @@ -23,7 +24,7 @@ public static void main(String[] args) throws Exception {
String configKey = "cassandra-config";
HashMap<String, Object> clientConfig = new HashMap<String, Object>();
clientConfig.put(StormCassandraConstants.CASSANDRA_HOST, "localhost:9160");
clientConfig.put(StormCassandraConstants.CASSANDRA_KEYSPACE, "stormks");
clientConfig.put(StormCassandraConstants.CASSANDRA_KEYSPACE, Arrays.asList(new String [] {"stormks"}));
config.put(configKey, clientConfig);

TestWordSpout wordSpout = new TestWordSpout();
Expand All @@ -33,7 +34,7 @@ public static void main(String[] args) throws Exception {
// create a CassandraBolt that writes to the "stormcf" column
// family and uses the Tuple field "word" as the row key
CassandraBatchingBolt<String, String, String> cassandraBolt = new CassandraBatchingBolt<String, String, String>(configKey,
new DefaultTupleMapper("stormcf", "word"));
new DefaultTupleMapper("stormks", "stormcf", "word"));
cassandraBolt.setAckStrategy(AckStrategy.ACK_ON_WRITE);

// setup topology:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public void nextTuple() {
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
this.collector.emit(new Values(word), UUID.randomUUID());
Thread.yield();
}

public void ack(Object msgId) {
Expand Down
8 changes: 4 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
</license>
</licenses>
<scm>
<connection>scm:git:git@github.com:ptgoetz/storm-cassandra.git</connection>
<developerConnection>scm:git:git@github.com:ptgoetz/storm-cassandra.git</developerConnection>
<url>:git@github.com:ptgoetz/storm-cassandra.git</url>
<connection>scm:git:git@github.com:hmsonline/storm-cassandra.git</connection>
<developerConnection>scm:git:git@github.com:hmsonline/storm-cassandra.git</developerConnection>
<url>:git@github.com:hmsonline/storm-cassandra.git</url>
</scm>

<developers>
Expand All @@ -43,7 +43,7 @@

<properties>
<storm.version>0.8.2</storm.version>
<cassandra.version>1.1.2</cassandra.version>
<cassandra.version>1.2.5</cassandra.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

public class StormCassandraConstants {
public static final String CASSANDRA_HOST = "cassandra.host";
public static final String CASSANDRA_PORT = "cassandra.port";
public static final String CASSANDRA_KEYSPACE = "cassandra.keyspace";
public static final String CASSANDRA_STATE_KEYSPACE = "cassandra.state.keyspace";
public static final String CASSANDRA_BATCH_MAX_SIZE = "cassandra.batch.max_size";
public static final String CASSANDRA_CLIENT_CLASS = "cassandra.client.class";
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* This is a batching bolt that can be used outside of a transactional topology.
* It does *not* implement IBatchBolt for that reason. If you want to use the
* batching inside a transactional topology, use
* <code>TransactionBatchingCassandraBolt</code>.
* <code>TransactionalBatchingCassandraBolt</code>.
*
* @author boneill42
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ public CassandraCounterBatchingBolt(String clientConfigKey, TupleCounterMapper t
this.tupleMapper = tupleMapper;
}

public CassandraCounterBatchingBolt(String clientConfigKey, String columnFamily, String rowKeyField, String incrementAmountField) {
this(clientConfigKey, new DefaultTupleCounterMapper(columnFamily, rowKeyField, incrementAmountField));
public CassandraCounterBatchingBolt(String keyspace, String clientConfigKey, String columnFamily, String rowKeyField, String incrementAmountField) {
this(clientConfigKey, new DefaultTupleCounterMapper(keyspace, columnFamily, rowKeyField, incrementAmountField));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,24 @@ public class DefaultTupleCounterMapper implements TupleCounterMapper {
private String rowKeyField;
private String columnFamily;
private String incrementAmountField;
private String keyspace;

public DefaultTupleCounterMapper(String columnFamily, String rowKeyField, String incrementAmountField) {
public DefaultTupleCounterMapper(String keyspace, String columnFamily, String rowKeyField, String incrementAmountField) {
this.columnFamily = columnFamily;
this.rowKeyField = rowKeyField;
this.incrementAmountField = incrementAmountField;
this.keyspace = keyspace;
}

@Override
public String mapToColumnFamily(Tuple tuple) {
return this.columnFamily;
}

@Override
public String mapToKeyspace(Tuple tuple) {
return this.keyspace;
}

@Override
public String mapToRowKey(Tuple tuple) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public class DefaultTupleMapper implements TupleMapper<String, String, String> {
private static final long serialVersionUID = 1L;
private String rowKeyField;
private String columnFamily;
private String keyspace;

/**
* Construct default mapper.
Expand All @@ -19,15 +20,21 @@ public class DefaultTupleMapper implements TupleMapper<String, String, String> {
* @param rowKeyField
* tuple field to use as the row key.
*/
public DefaultTupleMapper(String columnFamily, String rowKeyField) {
public DefaultTupleMapper(String keyspace, String columnFamily, String rowKeyField) {
this.rowKeyField = rowKeyField;
this.columnFamily = columnFamily;
this.keyspace = keyspace;
}

@Override
public String mapToRowKey(Tuple tuple) {
return tuple.getValueByField(this.rowKeyField).toString();
}

@Override
public String mapToKeyspace(Tuple tuple) {
return this.keyspace;
}

/**
* Default behavior is to write each value in the tuple as a key:value pair
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,15 @@ public interface TridentTupleMapper<K, C, V> extends Serializable {
* @return the column family name
*/
String mapToColumnFamily(TridentTuple tuple) throws TupleMappingException;


/**
* Given a <code>backtype.storm.tuple.Tuple</code> object, map the keyspace to write to.
*
* @param tuple
* @return
*/
String mapToKeyspace(TridentTuple tuple);

/**
* Given a <code>storm.trident.tuple.TridentTuple</code> object,
* return an object representing the Cassandra row key.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ public interface TupleCounterMapper extends Serializable {
* @return
*/
String mapToColumnFamily(Tuple tuple);

/**
* Given a <code>backtype.storm.tuple.Tuple</code> object, map the keyspace to write to.
*
* @param tuple
* @return
*/
String mapToKeyspace(Tuple tuple);

/**
* Given a <code>backtype.storm.tuple.Tuple</code> generate a Cassandra row
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ public interface TupleMapper<K, C, V> extends Serializable {
* @return
*/
String mapToColumnFamily(Tuple tuple);

/**
* Given a <code>backtype.storm.tuple.Tuple</code> object, map the keyspace to write to.
*
* @param tuple
* @return
*/
String mapToKeyspace(Tuple tuple);

/**
* Given a <code>backtype.storm.tuple.Tuple</code> generate a Cassandra row
Expand Down
Loading

0 comments on commit 8315230

Please sign in to comment.