Skip to content

Commit

Permalink
Add support for multiple keyspaces
Browse files Browse the repository at this point in the history
Making AstyanaxClient lowercase the keyspace.  It seems to have trouble with an uppercase keysapce.  Might be a Cassandra 1.2 thing.

Making only the lookup lowercase and not the actual keyspace.  This is to support Cassandra connections before 1.2.

Updating examples to handle more than one keyspace.

Updating examples to handle more than one keyspace. (Missed a Topology config)
  • Loading branch information
Isaac Rieksts authored and ptgoetz committed Jun 10, 2013
1 parent c9cabe0 commit fd6261c
Show file tree
Hide file tree
Showing 14 changed files with 181 additions and 54 deletions.
Expand Up @@ -2,6 +2,7 @@

package com.hmsonline.storm.cassandra.example;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -39,7 +40,7 @@ public static void main(String[] args) throws Exception {
String configKey = "cassandra-config";
HashMap<String, Object> clientConfig = new HashMap<String, Object>();
clientConfig.put(StormCassandraConstants.CASSANDRA_HOST, "localhost:9160");
clientConfig.put(StormCassandraConstants.CASSANDRA_KEYSPACE, "stormks");
clientConfig.put(StormCassandraConstants.CASSANDRA_KEYSPACE, Arrays.asList(new String [] {"stormks"}));
config.put(configKey, clientConfig);

// DelimitedColumnLookupBolt tweetersBolt =
Expand All @@ -51,14 +52,14 @@ public static void main(String[] args) throws Exception {
// "followers", ":", "rowKey", "follower", true);

// cf = "tweeters", rowkey = tuple["url"]
TupleMapper<String, String, String> tweetersTupleMapper = new DefaultTupleMapper("tweeters", "url");
TupleMapper<String, String, String> tweetersTupleMapper = new DefaultTupleMapper("stormks", "tweeters", "url");
// cf (url -> tweeters) -> emit(url, follower)
ColumnMapper<String, String, String> tweetersColumnsMapper = new ValuelessColumnsMapper("url", "tweeter", true);
CassandraLookupBolt<String, String, String> tweetersBolt = new CassandraLookupBolt<String, String, String>(configKey,
tweetersTupleMapper, tweetersColumnsMapper);

// cf = "followers", rowkey = tuple["tweeter"]
TupleMapper<String, String, String> followersTupleMapper = new DefaultTupleMapper("followers", "tweeter");
TupleMapper<String, String, String> followersTupleMapper = new DefaultTupleMapper("stormks", "followers", "tweeter");
// cf (tweeter -> followers) ==> emit(url, follower)
ValuelessColumnsMapper followersColumnsMapper = new ValuelessColumnsMapper("url", "follower", true);
CassandraLookupBolt<String, String, String> followersBolt = new CassandraLookupBolt<String, String, String>(configKey,
Expand Down
@@ -1,5 +1,6 @@
package com.hmsonline.storm.cassandra.example;

import java.util.Arrays;
import java.util.HashMap;

import backtype.storm.Config;
Expand All @@ -23,7 +24,7 @@ public static void main(String[] args) throws Exception {
String configKey = "cassandra-config";
HashMap<String, Object> clientConfig = new HashMap<String, Object>();
clientConfig.put(StormCassandraConstants.CASSANDRA_HOST, "localhost:9160");
clientConfig.put(StormCassandraConstants.CASSANDRA_KEYSPACE, "stormks");
clientConfig.put(StormCassandraConstants.CASSANDRA_KEYSPACE, Arrays.asList(new String [] {"stormks"}));
config.put(configKey, clientConfig);

TestWordSpout wordSpout = new TestWordSpout();
Expand All @@ -33,7 +34,7 @@ public static void main(String[] args) throws Exception {
// create a CassandraBolt that writes to the "stormcf" column
// family and uses the Tuple field "word" as the row key
CassandraBatchingBolt<String, String, String> cassandraBolt = new CassandraBatchingBolt<String, String, String>(configKey,
new DefaultTupleMapper("stormcf", "word"));
new DefaultTupleMapper("stormks", "stormcf", "word"));
cassandraBolt.setAckStrategy(AckStrategy.ACK_ON_WRITE);

// setup topology:
Expand Down
Expand Up @@ -22,8 +22,8 @@ public CassandraCounterBatchingBolt(String clientConfigKey, TupleCounterMapper t
this.tupleMapper = tupleMapper;
}

public CassandraCounterBatchingBolt(String clientConfigKey, String columnFamily, String rowKeyField, String incrementAmountField) {
this(clientConfigKey, new DefaultTupleCounterMapper(columnFamily, rowKeyField, incrementAmountField));
public CassandraCounterBatchingBolt(String keyspace, String clientConfigKey, String columnFamily, String rowKeyField, String incrementAmountField) {
this(clientConfigKey, new DefaultTupleCounterMapper(keyspace, columnFamily, rowKeyField, incrementAmountField));
}

@Override
Expand Down
Expand Up @@ -18,17 +18,24 @@ public class DefaultTupleCounterMapper implements TupleCounterMapper {
private String rowKeyField;
private String columnFamily;
private String incrementAmountField;
private String keyspace;

public DefaultTupleCounterMapper(String columnFamily, String rowKeyField, String incrementAmountField) {
public DefaultTupleCounterMapper(String keyspace, String columnFamily, String rowKeyField, String incrementAmountField) {
this.columnFamily = columnFamily;
this.rowKeyField = rowKeyField;
this.incrementAmountField = incrementAmountField;
this.keyspace = keyspace;
}

@Override
public String mapToColumnFamily(Tuple tuple) {
return this.columnFamily;
}

@Override
public String mapToKeyspace(Tuple tuple) {
return this.keyspace;
}

@Override
public String mapToRowKey(Tuple tuple) {
Expand Down
Expand Up @@ -10,6 +10,7 @@ public class DefaultTupleMapper implements TupleMapper<String, String, String> {
private static final long serialVersionUID = 1L;
private String rowKeyField;
private String columnFamily;
private String keyspace;

/**
* Construct default mapper.
Expand All @@ -19,15 +20,21 @@ public class DefaultTupleMapper implements TupleMapper<String, String, String> {
* @param rowKeyField
* tuple field to use as the row key.
*/
public DefaultTupleMapper(String columnFamily, String rowKeyField) {
public DefaultTupleMapper(String keyspace, String columnFamily, String rowKeyField) {
this.rowKeyField = rowKeyField;
this.columnFamily = columnFamily;
this.keyspace = keyspace;
}

@Override
public String mapToRowKey(Tuple tuple) {
return tuple.getValueByField(this.rowKeyField).toString();
}

@Override
public String mapToKeyspace(Tuple tuple) {
return this.keyspace;
}

/**
* Default behavior is to write each value in the tuple as a key:value pair
Expand Down
Expand Up @@ -31,7 +31,15 @@ public interface TridentTupleMapper<K, C, V> extends Serializable {
* @return the column family name
*/
String mapToColumnFamily(TridentTuple tuple) throws TupleMappingException;


/**
* Given a <code>backtype.storm.tuple.Tuple</code> object, map the keyspace to write to.
*
* @param tuple
* @return
*/
String mapToKeyspace(TridentTuple tuple);

/**
* Given a <code>storm.trident.tuple.TridentTuple</code> object,
* return an object representing the Cassandra row key.
Expand Down
Expand Up @@ -21,6 +21,14 @@ public interface TupleCounterMapper extends Serializable {
* @return
*/
String mapToColumnFamily(Tuple tuple);

/**
* Given a <code>backtype.storm.tuple.Tuple</code> object, map the keyspace to write to.
*
* @param tuple
* @return
*/
String mapToKeyspace(Tuple tuple);

/**
* Given a <code>backtype.storm.tuple.Tuple</code> generate a Cassandra row
Expand Down
Expand Up @@ -15,6 +15,14 @@ public interface TupleMapper<K, C, V> extends Serializable {
* @return
*/
String mapToColumnFamily(Tuple tuple);

/**
* Given a <code>backtype.storm.tuple.Tuple</code> object, map the keyspace to write to.
*
* @param tuple
* @return
*/
String mapToKeyspace(Tuple tuple);

/**
* Given a <code>backtype.storm.tuple.Tuple</code> generate a Cassandra row
Expand Down

0 comments on commit fd6261c

Please sign in to comment.