From d3d913183e1f047e999146be3ffec1bf733485ce Mon Sep 17 00:00:00 2001 From: "P. Taylor Goetz" Date: Wed, 13 Feb 2013 20:40:45 -0500 Subject: [PATCH] convert storm-cassandra, storm-jms, and storm-signals to git submodules --- .gitmodules | 9 + storm-cassandra | 1 + storm-cassandra/.gitignore | 4 - storm-cassandra/README.md | 130 -------- storm-cassandra/examples/pom.xml | 101 ------ .../examples/schema/cassandra_schema.txt | 98 ------ .../example/CassandraReachTopology.java | 181 ---------- .../example/PersistentWordCount.java | 53 --- .../cassandra/example/TestWordCounter.java | 43 --- .../cassandra/example/TestWordSpout.java | 62 ---- .../src/main/resources/log4j.properties | 13 - storm-cassandra/pom.xml | 96 ------ .../cassandra/bolt/AbstractBatchingBolt.java | 123 ------- .../cassandra/bolt/BaseCassandraBolt.java | 58 ---- .../cassandra/bolt/BatchingCassandraBolt.java | 111 ------- .../contrib/cassandra/bolt/CassandraBolt.java | 140 -------- .../cassandra/bolt/CassandraConstants.java | 7 - .../bolt/DefaultBatchingCassandraBolt.java | 80 ----- .../bolt/DelimitedColumnLookupBolt.java | 152 --------- .../bolt/ValueLessColumnLookupBolt.java | 139 -------- .../ColumnFamilyDeterminable.java | 17 - .../DefaultColumnFamilyDeterminable.java | 19 -- .../DefaultRowKeyDeterminable.java | 17 - .../bolt/determinable/RowKeyDeterminable.java | 16 - .../bolt/serialize/CassandraSerializer.java | 7 - .../src/main/resources/log4j.properties | 16 - storm-jms | 1 + storm-jms/LICENSE.html | 261 --------------- storm-jms/README.markdown | 40 --- storm-jms/examples/README.markdown | 23 -- storm-jms/examples/pom.xml | 135 -------- .../jms/example/ExampleJmsTopology.java | 114 ------- .../contrib/jms/example/GenericBolt.java | 99 ------ .../jms/example/JsonTupleProducer.java | 41 --- .../jms/example/SpringJmsProvider.java | 58 ---- .../src/main/resources/jms-activemq.xml | 37 --- .../src/main/resources/log4j.properties | 13 - storm-jms/pom.xml | 69 ---- .../storm/contrib/jms/JmsMessageProducer.java | 33 -- .../storm/contrib/jms/JmsProvider.java | 30 -- .../storm/contrib/jms/JmsTupleProducer.java | 41 --- .../storm/contrib/jms/bolt/JmsBolt.java | 197 ----------- .../storm/contrib/jms/spout/JmsSpout.java | 312 ------------------ storm-signals | 1 + storm-signals/README.md | 101 ------ storm-signals/pom.xml | 60 ---- .../contrib/signals/client/SignalClient.java | 57 ---- .../signals/spout/BaseSignalSpout.java | 120 ------- .../contrib/signals/test/SignalTopology.java | 31 -- .../contrib/signals/test/TestSignalSpout.java | 38 --- 50 files changed, 12 insertions(+), 3593 deletions(-) create mode 100644 .gitmodules create mode 160000 storm-cassandra delete mode 100644 storm-cassandra/.gitignore delete mode 100644 storm-cassandra/README.md delete mode 100644 storm-cassandra/examples/pom.xml delete mode 100644 storm-cassandra/examples/schema/cassandra_schema.txt delete mode 100644 storm-cassandra/examples/src/main/java/backtype/storm/contrib/cassandra/example/CassandraReachTopology.java delete mode 100644 storm-cassandra/examples/src/main/java/backtype/storm/contrib/cassandra/example/PersistentWordCount.java delete mode 100644 storm-cassandra/examples/src/main/java/backtype/storm/contrib/cassandra/example/TestWordCounter.java delete mode 100644 storm-cassandra/examples/src/main/java/backtype/storm/contrib/cassandra/example/TestWordSpout.java delete mode 100644 storm-cassandra/examples/src/main/resources/log4j.properties delete mode 100644 storm-cassandra/pom.xml delete mode 100644 storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/AbstractBatchingBolt.java delete mode 100644 storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/BaseCassandraBolt.java delete mode 100644 storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/BatchingCassandraBolt.java delete mode 100644 storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/CassandraBolt.java delete mode 100644 storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/CassandraConstants.java delete mode 100644 storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/DefaultBatchingCassandraBolt.java delete mode 100644 storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/DelimitedColumnLookupBolt.java delete mode 100644 storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/ValueLessColumnLookupBolt.java delete mode 100644 storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/determinable/ColumnFamilyDeterminable.java delete mode 100644 storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/determinable/DefaultColumnFamilyDeterminable.java delete mode 100644 storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/determinable/DefaultRowKeyDeterminable.java delete mode 100644 storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/determinable/RowKeyDeterminable.java delete mode 100644 storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/serialize/CassandraSerializer.java delete mode 100644 storm-cassandra/src/main/resources/log4j.properties create mode 160000 storm-jms delete mode 100644 storm-jms/LICENSE.html delete mode 100644 storm-jms/README.markdown delete mode 100644 storm-jms/examples/README.markdown delete mode 100644 storm-jms/examples/pom.xml delete mode 100644 storm-jms/examples/src/main/java/backtype/storm/contrib/jms/example/ExampleJmsTopology.java delete mode 100644 storm-jms/examples/src/main/java/backtype/storm/contrib/jms/example/GenericBolt.java delete mode 100644 storm-jms/examples/src/main/java/backtype/storm/contrib/jms/example/JsonTupleProducer.java delete mode 100644 storm-jms/examples/src/main/java/backtype/storm/contrib/jms/example/SpringJmsProvider.java delete mode 100644 storm-jms/examples/src/main/resources/jms-activemq.xml delete mode 100644 storm-jms/examples/src/main/resources/log4j.properties delete mode 100644 storm-jms/pom.xml delete mode 100644 storm-jms/src/main/java/backtype/storm/contrib/jms/JmsMessageProducer.java delete mode 100644 storm-jms/src/main/java/backtype/storm/contrib/jms/JmsProvider.java delete mode 100644 storm-jms/src/main/java/backtype/storm/contrib/jms/JmsTupleProducer.java delete mode 100644 storm-jms/src/main/java/backtype/storm/contrib/jms/bolt/JmsBolt.java delete mode 100644 storm-jms/src/main/java/backtype/storm/contrib/jms/spout/JmsSpout.java create mode 160000 storm-signals delete mode 100644 storm-signals/README.md delete mode 100644 storm-signals/pom.xml delete mode 100644 storm-signals/src/main/java/backtype/storm/contrib/signals/client/SignalClient.java delete mode 100644 storm-signals/src/main/java/backtype/storm/contrib/signals/spout/BaseSignalSpout.java delete mode 100644 storm-signals/src/main/java/backtype/storm/contrib/signals/test/SignalTopology.java delete mode 100644 storm-signals/src/main/java/backtype/storm/contrib/signals/test/TestSignalSpout.java diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..6762dcf --- /dev/null +++ b/.gitmodules @@ -0,0 +1,9 @@ +[submodule "storm-cassandra"] + path = storm-cassandra + url = git@github.com:hmsonline/storm-cassandra.git +[submodule "storm-jms"] + path = storm-jms + url = git@github.com:ptgoetz/storm-jms.git +[submodule "storm-signals"] + path = storm-signals + url = git@github.com:ptgoetz/storm-signals.git diff --git a/storm-cassandra b/storm-cassandra new file mode 160000 index 0000000..aacaee0 --- /dev/null +++ b/storm-cassandra @@ -0,0 +1 @@ +Subproject commit aacaee05aee4bcabb08fe832240d9bb0b414a310 diff --git a/storm-cassandra/.gitignore b/storm-cassandra/.gitignore deleted file mode 100644 index 8a9736c..0000000 --- a/storm-cassandra/.gitignore +++ /dev/null @@ -1,4 +0,0 @@ -target -.settings -.classpath -.project \ No newline at end of file diff --git a/storm-cassandra/README.md b/storm-cassandra/README.md deleted file mode 100644 index 51a1f2d..0000000 --- a/storm-cassandra/README.md +++ /dev/null @@ -1,130 +0,0 @@ -Storm Cassandra Integration -=========================== - -Integrates Storm and Cassandra by providing a generic and configurable `backtype.storm.Bolt` -implementation that writes Storm `Tuple` objects to a Cassandra Column Family. - -How the Storm `Tuple` data is written to Cassandra is dynamically configurable -- you -provide classes that "determine" a column family, row key, and column name/values, and the -bolt will write the data to a Cassandra cluster. - -### Building from Source - - $ mvn install - -### Usage - -**Basic Usage** - -`CassandraBolt` expects that a Cassandra hostname, port, and keyspace be set in the Storm topology configuration: - - Config config = new Config(); - config.put(CassandraBolt.CASSANDRA_HOST, "localhost"); - config.put(CassandraBolt.CASSANDRA_PORT, 9160); - config.put(CassandraBolt.CASSANDRA_KEYSPACE, "testKeyspace"); - - -The `CassandraBolt` class provides a convenience constructor that takes a column family name, and row key field value as arguments: - - IRichBolt cassandraBolt = new CassandraBolt("columnFamily", "rowKey"); - -The above constructor will create a `CassandraBolt` that writes to the "`columnFamily`" column family, and will look for/use a field -named "`rowKey`" in the `backtype.storm.tuple.Tuple` objects it receives as the Cassandra row key. - -For each field in the `backtype.storm.Tuple` received, the `CassandraBolt` will write a column name/value pair. - -For example, given the constructor listed above, a tuple value of: - - {rowKey: 12345, field1: "foo", field2: "bar} - -Would yield the following Cassandra row (as seen from `cassandra-cli`): - - RowKey: 12345 - => (column=field1, value=foo, timestamp=1321938505071001) - => (column=field2, value=bar, timestamp=1321938505072000) - - -#### Dynamic ColumnFamily Determination -[TODO] - - - -#### Dynamic RowKey Generation -[TODO] - - -### Running the Persistent Word Count Example - -The "examples" directory contains a sample `PersistentWordCount` topology that illustrates -the basic usage of the Cassandra Bolt implementation. It reuses the [`TestWordSpout`](https://github.com/nathanmarz/storm/blob/master/src/jvm/backtype/storm/testing/TestWordSpout.java) -spout and [`TestWordCounter`](https://github.com/nathanmarz/storm/blob/master/src/jvm/backtype/storm/testing/TestWordCounter.java) -bolt from the Storm tutorial, and adds an instance of `CassandraBolt` to persist the results. - -The `PersistentWordCount` example build the following topology: - - TestWordSpout ==> TestWordCounter ==> CassandraBolt - -**Data Flow** - -1. `TestWordSpout` emits words at random from a pre-defined list. -2. `TestWordCounter` receives a word, updates a counter for that word, -and emits a tuple containing the word and corresponding count ("word", "count"). -3. The `CassandraBolt` receives the ("word", "count") tuple and writes it to the -Cassandra database using the word as the row key. - -#### Build the Example Source - - $ cd examples - $ mvn install - -#### Create Cassandra Schema -Install and run [Apache Cassandra](http://cassandra.apache.org/). - -Create the sample schema using `cassandra-cli`: - - $ cd schema - $ cassandra-cli -h localhost -f cassandra_schema.txt - -Run the example topology: - - $ mvn exec:java - -View the end result in `cassandra-cli`: - - $ cassandra-cli -h localhost - [default@unknown] use stormks; - [default@stromks] list stormcf; - -The output should resemble the following: - - Using default limit of 100 - ------------------- - RowKey: nathan - => (column=count, value=22, timestamp=1322332601951001) - => (column=word, value=nathan, timestamp=1322332601951000) - ------------------- - RowKey: mike - => (column=count, value=11, timestamp=1322332600330001) - => (column=word, value=mike, timestamp=1322332600330000) - ------------------- - RowKey: jackson - => (column=count, value=17, timestamp=1322332600633001) - => (column=word, value=jackson, timestamp=1322332600633000) - ------------------- - RowKey: golda - => (column=count, value=31, timestamp=1322332602155001) - => (column=word, value=golda, timestamp=1322332602155000) - ------------------- - RowKey: bertels - => (column=count, value=16, timestamp=1322332602257000) - => (column=word, value=bertels, timestamp=1322332602255000) - - 5 Rows Returned. - Elapsed time: 8 msec(s). - - - - - - - diff --git a/storm-cassandra/examples/pom.xml b/storm-cassandra/examples/pom.xml deleted file mode 100644 index c9458cc..0000000 --- a/storm-cassandra/examples/pom.xml +++ /dev/null @@ -1,101 +0,0 @@ - - 4.0.0 - backtype.storm.contrib - storm-cassandra-examples - 0.1.0-SNAPSHOT - Storm Cassandra Examples - Storm Cassandra Examples - - - 2.5.6 - 0.6.2 - - - - com.github.ptgoetz - storm-cassandra - 0.1.0-SNAPSHOT - - - storm - storm - ${storm.version} - - provided - - - - - - - maven-assembly-plugin - - - jar-with-dependencies - - - - - - - - - - make-assembly - package - - single - - - - - - - - org.codehaus.mojo - exec-maven-plugin - 1.2.1 - - - - exec - - - - - java - true - true - backtype.storm.contrib.cassandra.example.PersistentWordCount - - - log4j.configuration - file:./src/main/resources/log4j.properties - - - - - - storm - storm - ${storm.version} - jar - - - - - org.apache.maven.plugins - maven-compiler-plugin - - 1.6 - 1.6 - - - - - - - \ No newline at end of file diff --git a/storm-cassandra/examples/schema/cassandra_schema.txt b/storm-cassandra/examples/schema/cassandra_schema.txt deleted file mode 100644 index d44e8ec..0000000 --- a/storm-cassandra/examples/schema/cassandra_schema.txt +++ /dev/null @@ -1,98 +0,0 @@ -drop keyspace stormks; - -create keyspace stormks; - -use stormks; - - create column family stormcf - with column_type = 'Standard' - and comparator = 'UTF8Type' - and default_validation_class = 'UTF8Type' - and key_validation_class = 'UTF8Type'; - - create column family followers - with column_type = 'Standard' - and comparator = 'UTF8Type' - and default_validation_class = 'UTF8Type' - and key_validation_class = 'UTF8Type'; - set followers['sally']['bob']=''; - set followers['sally']['tim']=''; - set followers['sally']['alice']=''; - set followers['sally']['adam']=''; - set followers['sally']['jim']=''; - set followers['sally']['chris']=''; - set followers['sally']['jai']=''; - set followers['bob']['sally']=''; - set followers['bob']['nathan']=''; - set followers['bob']['jim']=''; - set followers['bob']['mary']=''; - set followers['bob']['david']=''; - set followers['bob']['vivian']=''; - set followers['tim']['alex']=''; - set followers['nathan']['sally']=''; - set followers['nathan']['bob']=''; - set followers['nathan']['adam']=''; - set followers['nathan']['harry']=''; - set followers['nathan']['chris']=''; - set followers['nathan']['vivian']=''; - set followers['nathan']['jordan']=''; - set followers['nathan']['emily']=''; - set followers['adam']['david']=''; - set followers['adam']['carissa']=''; - set followers['adam']['david']=''; - set followers['mike']['john']=''; - set followers['mike']['bob']=''; - set followers['john']['alice']=''; - set followers['john']['nathan']=''; - set followers['john']['jim']=''; - set followers['john']['mike']=''; - set followers['john']['bob']=''; - - - create column family tweeters - with column_type = 'Standard' - and comparator = 'UTF8Type' - and default_validation_class = 'UTF8Type' - and key_validation_class = 'UTF8Type'; - - set tweeters['http://github.com/ptgoetz']['sally']=''; - set tweeters['http://github.com/ptgoetz']['bob']=''; - set tweeters['http://github.com/ptgoetz']['tim']=''; - set tweeters['http://github.com/ptgoetz']['george']=''; - set tweeters['http://github.com/ptgoetz']['nathan']=''; - - set tweeters['http://github.com/hmsonline']['adam']=''; - set tweeters['http://github.com/hmsonline']['david']=''; - set tweeters['http://github.com/hmsonline']['sally']=''; - set tweeters['http://github.com/hmsonline']['nathan']=''; - - set tweeters['http://github.com/nathanmarz']['tim']=''; - set tweeters['http://github.com/nathanmarz']['mike']=''; - set tweeters['http://github.com/nathanmarz']['john']=''; - - - - create column family followers_delimited - with column_type = 'Standard' - and comparator = 'UTF8Type' - and default_validation_class = 'UTF8Type' - and key_validation_class = 'UTF8Type'; - - - set followers_delimited['sally']['followers']='bob:tim:alice:adam:jim:chris:jai'; - set followers_delimited['bob']['followers']='sally:nathan:jim:mary:david:vivian'; - set followers_delimited['tim']['followers']='alex'; - set followers_delimited['nathan']['followers']='sally:bob:adam:harry:chris:vivian:emily:jordan'; - set followers_delimited['adam']['followers']='david:carissa'; - set followers_delimited['mike']['followers']='john:bob'; - set followers_delimited['john']['followers']='alice:nathan:jim:mike:bob'; - - create column family tweeters_delimited - with column_type = 'Standard' - and comparator = 'UTF8Type' - and default_validation_class = 'UTF8Type' - and key_validation_class = 'UTF8Type'; - - set tweeters_delimited['http://github.com/ptgoetz']['tweeted_by']='sally:bob:tim:george:nathan'; - set tweeters_delimited['http://github.com/hmsonline']['tweeted_by']='adam:david:sally:nathan'; - set tweeters_delimited['http://github.com/nathanmarz']['tweeted_by']='tim:mike:john'; diff --git a/storm-cassandra/examples/src/main/java/backtype/storm/contrib/cassandra/example/CassandraReachTopology.java b/storm-cassandra/examples/src/main/java/backtype/storm/contrib/cassandra/example/CassandraReachTopology.java deleted file mode 100644 index 4026341..0000000 --- a/storm-cassandra/examples/src/main/java/backtype/storm/contrib/cassandra/example/CassandraReachTopology.java +++ /dev/null @@ -1,181 +0,0 @@ -// Copyright (c) 2012 P. Taylor Goetz - -package backtype.storm.contrib.cassandra.example; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.LocalDRPC; -import backtype.storm.StormSubmitter; -import backtype.storm.contrib.cassandra.bolt.CassandraConstants; -import backtype.storm.contrib.cassandra.bolt.DelimitedColumnLookupBolt; -import backtype.storm.contrib.cassandra.bolt.ValueLessColumnLookupBolt; -import backtype.storm.drpc.CoordinatedBolt.FinishedCallback; -import backtype.storm.drpc.LinearDRPCTopologyBuilder; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.BasicOutputCollector; -import backtype.storm.topology.IBasicBolt; -import backtype.storm.topology.IRichBolt; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; - -public class CassandraReachTopology implements CassandraConstants{ - - public static void main(String[] args) throws Exception{ - LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach"); - -// DelimitedColumnLookupBolt tweetersBolt = -// new DelimitedColumnLookupBolt("tweeters_delimited", "rowKey", "tweeted_by", ":", "rowKey", "tweeter", true); -// -// DelimitedColumnLookupBolt followersBolt = -// new DelimitedColumnLookupBolt("followers_delimited", "tweeter", "followers", ":", "rowKey", "follower", true); - - ValueLessColumnLookupBolt tweetersBolt = - new ValueLessColumnLookupBolt("tweeters", "rowKey","rowKey", "tweeter", true); - - ValueLessColumnLookupBolt followersBolt = - new ValueLessColumnLookupBolt("followers", "tweeter", "rowKey", "follower", true); - - builder.addBolt(new InitBolt()); - builder.addBolt(tweetersBolt).shuffleGrouping(); - builder.addBolt(followersBolt).shuffleGrouping(); - builder.addBolt(new PartialUniquer()).fieldsGrouping(new Fields("id", "follower")); - builder.addBolt(new CountAggregator()).fieldsGrouping(new Fields("id")); - - - Config config = new Config(); - config.put(CASSANDRA_HOST, "localhost"); - config.put(CASSANDRA_PORT, 9160); - config.put(CASSANDRA_KEYSPACE, "stormks"); - - if(args==null || args.length==0) { - config.setMaxTaskParallelism(3); - LocalDRPC drpc = new LocalDRPC(); - LocalCluster cluster = new LocalCluster(); - cluster.submitTopology("reach-drpc", config, builder.createLocalTopology(drpc)); - - String[] urlsToTry = new String[] {"http://github.com/hmsonline","http://github.com/nathanmarz", "http://github.com/ptgoetz", "http://github.com/boneill"}; - for(String url: urlsToTry) { - System.out.println("Reach of " + url + ": " + drpc.execute("reach", url)); - } - - cluster.shutdown(); - drpc.shutdown(); - } else { - config.setNumWorkers(6); - StormSubmitter.submitTopology(args[0], config, builder.createRemoteTopology()); - } - } - - public static class InitBolt implements IBasicBolt { - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("id", "rowKey")); - } - - @Override - public void prepare(Map stormConf, TopologyContext context) { - } - - @Override - public void execute(Tuple input, BasicOutputCollector collector) { - collector.emit(input.getValues()); - } - - @Override - public void cleanup() { - - } - - } - - public static class PartialUniquer implements IRichBolt, FinishedCallback { - OutputCollector collector; - Map> sets = new HashMap>(); - - @Override - public void prepare(Map conf, TopologyContext context, OutputCollector collector) { - this.collector = collector; - } - - @Override - public void execute(Tuple tuple) { - Object id = tuple.getValue(0); - Set curr = this.sets.get(id); - if(curr==null) { - curr = new HashSet(); - this.sets.put(id, curr); - } - curr.add(tuple.getString(2)); - collector.ack(tuple); - } - - @Override - public void cleanup() { - } - - @Override - public void finishedId(Object id) { - Set curr = this.sets.remove(id); - int count; - if(curr!=null) { - count = curr.size(); - } else { - count = 0; - } - collector.emit(new Values(id, count)); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("id", "partial-count")); - } - } - - public static class CountAggregator implements IRichBolt, FinishedCallback { - Map counts = new HashMap(); - OutputCollector collector; - - @Override - public void prepare(Map conf, TopologyContext context, OutputCollector collector) { - this.collector = collector; - } - - @Override - public void execute(Tuple tuple) { - Object id = tuple.getValue(0); - int partial = tuple.getInteger(1); - - Integer curr = counts.get(id); - if(curr==null) curr = 0; - this.counts.put(id, curr + partial); - this.collector.ack(tuple); - } - - @Override - public void cleanup() { - } - - @Override - public void finishedId(Object id) { - Integer reach = counts.get(id); - if(reach==null) reach = 0; - collector.emit(new Values(id, reach)); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("id", "reach")); - } - - } - -} diff --git a/storm-cassandra/examples/src/main/java/backtype/storm/contrib/cassandra/example/PersistentWordCount.java b/storm-cassandra/examples/src/main/java/backtype/storm/contrib/cassandra/example/PersistentWordCount.java deleted file mode 100644 index ad280a5..0000000 --- a/storm-cassandra/examples/src/main/java/backtype/storm/contrib/cassandra/example/PersistentWordCount.java +++ /dev/null @@ -1,53 +0,0 @@ -package backtype.storm.contrib.cassandra.example; - -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.StormSubmitter; -import backtype.storm.contrib.cassandra.bolt.BatchingCassandraBolt; -import backtype.storm.contrib.cassandra.bolt.CassandraBolt; -import backtype.storm.contrib.cassandra.bolt.DefaultBatchingCassandraBolt; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.tuple.Fields; - -public class PersistentWordCount { - private static final String WORD_SPOUT = "WORD_SPOUT"; - private static final String COUNT_BOLT ="COUNT_BOLT"; - private static final String CASSANDRA_BOLT = "WORD_COUNT_CASSANDRA_BOLT"; - - public static void main(String[] args) throws Exception{ - Config config = new Config(); - - config.put(CassandraBolt.CASSANDRA_HOST, "localhost"); - config.put(CassandraBolt.CASSANDRA_PORT, 9160); - config.put(CassandraBolt.CASSANDRA_KEYSPACE, "stormks"); - - TestWordSpout wordSpout = new TestWordSpout(); - - TestWordCounter countBolt = new TestWordCounter(); - - // create a CassandraBolt that writes to the "stormcf" column - // family and uses the Tuple field "word" as the row key - BatchingCassandraBolt cassandraBolt = new DefaultBatchingCassandraBolt("stormcf", "word"); - cassandraBolt.setAckStrategy(BatchingCassandraBolt.AckStrategy.ACK_ON_WRITE); - - - // setup topology: - // wordSpout ==> countBolt ==> cassandraBolt - TopologyBuilder builder = new TopologyBuilder(); - - builder.setSpout(WORD_SPOUT, wordSpout, 3); - builder.setBolt(COUNT_BOLT, countBolt,3).fieldsGrouping(WORD_SPOUT, new Fields("word")); - builder.setBolt(CASSANDRA_BOLT, cassandraBolt,3).shuffleGrouping(COUNT_BOLT); - - if(args.length == 0){ - LocalCluster cluster = new LocalCluster(); - cluster.submitTopology("test", config, builder.createTopology()); - Thread.sleep(10000); - cluster.killTopology("test"); - cluster.shutdown(); - } else{ - config.setNumWorkers(3); - StormSubmitter.submitTopology(args[0], config, builder.createTopology()); - } - } -} diff --git a/storm-cassandra/examples/src/main/java/backtype/storm/contrib/cassandra/example/TestWordCounter.java b/storm-cassandra/examples/src/main/java/backtype/storm/contrib/cassandra/example/TestWordCounter.java deleted file mode 100644 index f5ed7cd..0000000 --- a/storm-cassandra/examples/src/main/java/backtype/storm/contrib/cassandra/example/TestWordCounter.java +++ /dev/null @@ -1,43 +0,0 @@ -package backtype.storm.contrib.cassandra.example; - -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Fields; -import java.util.Map; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.BasicOutputCollector; -import backtype.storm.topology.IBasicBolt; -import java.util.HashMap; -import org.apache.log4j.Logger; -import static backtype.storm.utils.Utils.tuple; - - -public class TestWordCounter implements IBasicBolt { - public static Logger LOG = Logger.getLogger(TestWordCounter.class); - - Map _counts; - - public void prepare(Map stormConf, TopologyContext context) { - _counts = new HashMap(); - } - - public void execute(Tuple input, BasicOutputCollector collector) { - String word = (String) input.getValues().get(0); - int count = 0; - if(_counts.containsKey(word)) { - count = _counts.get(word); - } - count++; - _counts.put(word, count); - collector.emit(tuple(word, count)); - } - - public void cleanup() { - - } - - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word", "count")); - } - -} \ No newline at end of file diff --git a/storm-cassandra/examples/src/main/java/backtype/storm/contrib/cassandra/example/TestWordSpout.java b/storm-cassandra/examples/src/main/java/backtype/storm/contrib/cassandra/example/TestWordSpout.java deleted file mode 100644 index 539a2e6..0000000 --- a/storm-cassandra/examples/src/main/java/backtype/storm/contrib/cassandra/example/TestWordSpout.java +++ /dev/null @@ -1,62 +0,0 @@ -package backtype.storm.contrib.cassandra.example; - -import backtype.storm.topology.OutputFieldsDeclarer; -import java.util.Map; -import java.util.UUID; - -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IRichSpout; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Values; -import backtype.storm.utils.Utils; -import java.util.Random; -import org.apache.log4j.Logger; - - -public class TestWordSpout implements IRichSpout { - public static Logger LOG = Logger.getLogger(TestWordSpout.class); - boolean _isDistributed; - SpoutOutputCollector _collector; - - public TestWordSpout() { - this(true); - } - - public TestWordSpout(boolean isDistributed) { - _isDistributed = isDistributed; - } - - public boolean isDistributed() { - return _isDistributed; - } - - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { - _collector = collector; - } - - public void close() { - - } - - public void nextTuple() { -// Utils.sleep(1); - Thread.yield(); - final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"}; - final Random rand = new Random(); - final String word = words[rand.nextInt(words.length)]; - _collector.emit(new Values(word), UUID.randomUUID()); - } - - public void ack(Object msgId) { - - } - - public void fail(Object msgId) { - - } - - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word")); - } -} diff --git a/storm-cassandra/examples/src/main/resources/log4j.properties b/storm-cassandra/examples/src/main/resources/log4j.properties deleted file mode 100644 index 31a50d6..0000000 --- a/storm-cassandra/examples/src/main/resources/log4j.properties +++ /dev/null @@ -1,13 +0,0 @@ -log4j.rootLogger=INFO, stdout - -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout - -log4j.appender.stdout.layout.ConversionPattern=%5p (%C:%L) - %m%n - - -log4j.logger.backtype.storm.contrib=DEBUG -log4j.logger.clojure.contrib=WARN -log4j.logger.org.springframework=WARN -log4j.logger.org.apache.zookeeper=WARN - diff --git a/storm-cassandra/pom.xml b/storm-cassandra/pom.xml deleted file mode 100644 index 3e8a776..0000000 --- a/storm-cassandra/pom.xml +++ /dev/null @@ -1,96 +0,0 @@ - - - - org.sonatype.oss - oss-parent - 7 - - - - - 4.0.0 - com.github.ptgoetz - storm-cassandra - 0.1.2-SNAPSHOT - Storm Cassandra Support - Storm Cassandra Support - - - - Eclipse Public License - v 1.0 - http://www.eclipse.org/legal/epl-v10.html - repo - - - - scm:git:git@github.com:ptgoetz/storm-cassandra.git - scm:git:git@github.com:ptgoetz/storm-cassandra.git - :git@github.com:ptgoetz/storm-cassandra.git - - - - - ptgoetz - P. Taylor Goetz - ptgoetz@gmail.com - - - - - - - - - 2.5.6 - 0.6.2 - 1.0.2 - - - - me.prettyprint - hector-core - 1.0-1 - - - org.slf4j - slf4j-log4j12 - - - org.slf4j - slf4j-api - - - com.google.guava - guava - - - - - storm - storm - ${storm.version} - - provided - - - - - - - org.apache.maven.plugins - maven-compiler-plugin - - 1.6 - 1.6 - - - - - - - \ No newline at end of file diff --git a/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/AbstractBatchingBolt.java b/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/AbstractBatchingBolt.java deleted file mode 100644 index f7c7c54..0000000 --- a/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/AbstractBatchingBolt.java +++ /dev/null @@ -1,123 +0,0 @@ -package backtype.storm.contrib.cassandra.bolt; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.LinkedBlockingQueue; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IRichBolt; -import backtype.storm.tuple.Tuple; - -/** - * Abstract IRichBolt implementation that caches/batches - * backtype.storm.tuple.Tuple and processes them on a separate - * thread. - *

- *

- * Subclasses are obligated to implement the - * executeBatch(List inputs) method, called when a batch of - * tuples should be processed. - *

- * Subclasses that overide the prepare() and cleanup() - * methods must call the corresponding methods on the superclass - * (i.e. super.prepare() and super.cleanup()) to - * ensure proper initialization and termination. - * - * - * - * - * @author ptgoetz - * - */ -@SuppressWarnings("serial") -public abstract class AbstractBatchingBolt implements IRichBolt, - CassandraConstants { - - private static final Logger LOG = LoggerFactory - .getLogger(AbstractBatchingBolt.class); - - private boolean ackOnReceive = false; - - private OutputCollector collector; - - private LinkedBlockingQueue queue; - - private BatchThread batchThread; - - @Override - public void prepare(Map stormConf, TopologyContext context, - OutputCollector collector) { - this.collector = collector; - this.queue = new LinkedBlockingQueue(); - this.batchThread = new BatchThread(); - this.batchThread.start(); - } - - public void setAckOnReceive(boolean ackOnReceive){ - this.ackOnReceive = ackOnReceive; - } - - @Override - public final void execute(Tuple input) { - if(this.ackOnReceive){ - this.collector.ack(input); - } - this.queue.offer(input); - } - - @Override - public void cleanup() { - this.batchThread.stopRunning(); - } - - /** - * Process a java.util.List of - * backtype.storm.tuple.Tuple objects that have been - * cached/batched. - *

- * This method is analagous to the execute(Tuple input) method - * defined in the bolt interface. Subclasses are responsible for processing - * and/or ack'ing tuples as necessary. The only difference is that tuples - * are passed in as a list, as opposed to one at a time. - *

- * - * - * @param inputs - */ - public abstract void executeBatch(List inputs); - - private class BatchThread extends Thread { - - boolean stopRequested = false; - - BatchThread() { - super("batch-bolt-thread"); - super.setDaemon(true); - } - - @Override - public void run() { - while (!stopRequested) { - try { - ArrayList batch = new ArrayList(); - // drainTo() does not block, take() does. - Tuple t = queue.take(); - batch.add(t); - queue.drainTo(batch); - executeBatch(batch); - - } - catch (InterruptedException e) {} - } - } - - synchronized void stopRunning() { - this.stopRequested = true; - } - } -} diff --git a/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/BaseCassandraBolt.java b/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/BaseCassandraBolt.java deleted file mode 100644 index 845fe2f..0000000 --- a/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/BaseCassandraBolt.java +++ /dev/null @@ -1,58 +0,0 @@ -package backtype.storm.contrib.cassandra.bolt; - -import java.util.Map; - -import me.prettyprint.cassandra.service.CassandraHostConfigurator; -import me.prettyprint.hector.api.Cluster; -import me.prettyprint.hector.api.Keyspace; -import me.prettyprint.hector.api.factory.HFactory; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IBasicBolt; - -@SuppressWarnings("serial") -public abstract class BaseCassandraBolt implements IBasicBolt, - CassandraConstants { - - private static final Logger LOG = LoggerFactory - .getLogger(BaseCassandraBolt.class); - - private String cassandraHost; - private String cassandraPort; - private String cassandraKeyspace; - - protected Cluster cluster; - protected Keyspace keyspace; - -// protected OutputCollector collector; - - @Override - public void prepare(Map stormConf, TopologyContext context) { -// LOG.debug("Preparing..."); - this.cassandraHost = (String) stormConf.get(CASSANDRA_HOST); - this.cassandraKeyspace = (String) stormConf.get(CASSANDRA_KEYSPACE); - this.cassandraPort = String.valueOf(stormConf.get(CASSANDRA_PORT)); - initCassandraConnection(); - -// this.collector = collector; - } - - private void initCassandraConnection() { - try { - this.cluster = HFactory.getOrCreateCluster("cassandra-bolt", - new CassandraHostConfigurator(this.cassandraHost - + ":" + this.cassandraPort)); - this.keyspace = HFactory.createKeyspace(this.cassandraKeyspace, - this.cluster); - } - catch (Throwable e) { - LOG.warn("Preparation failed.", e); - throw new IllegalStateException("Failed to prepare CassandraBolt", - e); - } - } - -} diff --git a/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/BatchingCassandraBolt.java b/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/BatchingCassandraBolt.java deleted file mode 100644 index a1d8f16..0000000 --- a/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/BatchingCassandraBolt.java +++ /dev/null @@ -1,111 +0,0 @@ -package backtype.storm.contrib.cassandra.bolt; - -import java.util.Map; - -import me.prettyprint.cassandra.service.CassandraHostConfigurator; -import me.prettyprint.hector.api.Cluster; -import me.prettyprint.hector.api.Keyspace; -import me.prettyprint.hector.api.factory.HFactory; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import backtype.storm.contrib.cassandra.bolt.determinable.ColumnFamilyDeterminable; -import backtype.storm.contrib.cassandra.bolt.determinable.DefaultColumnFamilyDeterminable; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; - -@SuppressWarnings("serial") -public abstract class BatchingCassandraBolt extends AbstractBatchingBolt implements - CassandraConstants { - private static final Logger LOG = LoggerFactory - .getLogger(BatchingCassandraBolt.class); - - public static enum AckStrategy { - ACK_IGNORE, ACK_ON_RECEIVE, ACK_ON_WRITE; - } - - protected AckStrategy ackStrategy = AckStrategy.ACK_IGNORE; - - protected OutputCollector collector; - - private Fields declaredFields; - - private String cassandraHost; - private String cassandraPort; - private String cassandraKeyspace; - - protected Cluster cluster; - protected Keyspace keyspace; - - protected ColumnFamilyDeterminable cfDeterminable; -// protected RowKeyDeterminable rkDeterminable; - - public BatchingCassandraBolt(String columnFamily) { - this(new DefaultColumnFamilyDeterminable(columnFamily)); - } - - public BatchingCassandraBolt(ColumnFamilyDeterminable cfDeterminable) { - this.cfDeterminable = cfDeterminable; -// this.rkDeterminable = rkDeterminable; - } - - public void setAckStrategy(AckStrategy strategy) { - this.ackStrategy = strategy; - } - - /* - * IRichBolt Implementation - */ - @Override - public void prepare(Map stormConf, TopologyContext context, - OutputCollector collector) { - super.prepare(stormConf, context, collector); - LOG.debug("Preparing..."); - this.cassandraHost = (String) stormConf.get(CASSANDRA_HOST); - this.cassandraKeyspace = (String) stormConf.get(CASSANDRA_KEYSPACE); - this.cassandraPort = String.valueOf(stormConf.get(CASSANDRA_PORT)); - - this.collector = collector; - - initCassandraConnection(); - - if (this.ackStrategy == AckStrategy.ACK_ON_RECEIVE) { - super.setAckOnReceive(true); - } - - } - - private void initCassandraConnection() { - // setup Cassandra connection - try { - this.cluster = HFactory.getOrCreateCluster("cassandra-bolt", - new CassandraHostConfigurator(this.cassandraHost + ":" - + this.cassandraPort)); - this.keyspace = HFactory.createKeyspace(this.cassandraKeyspace, - this.cluster); - } catch (Throwable e) { - LOG.warn("Preparation failed.", e); - throw new IllegalStateException("Failed to prepare CassandraBolt", - e); - } - } - - - - @Override - public void cleanup() { - super.cleanup(); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - if (this.declaredFields != null) { - declarer.declare(this.declaredFields); - } - - } - -} diff --git a/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/CassandraBolt.java b/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/CassandraBolt.java deleted file mode 100644 index 601fe37..0000000 --- a/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/CassandraBolt.java +++ /dev/null @@ -1,140 +0,0 @@ -package backtype.storm.contrib.cassandra.bolt; - -import java.util.Map; - -import me.prettyprint.cassandra.serializers.StringSerializer; -import me.prettyprint.cassandra.service.CassandraHostConfigurator; -import me.prettyprint.hector.api.Cluster; -import me.prettyprint.hector.api.Keyspace; -import me.prettyprint.hector.api.factory.HFactory; -import me.prettyprint.hector.api.mutation.Mutator; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import backtype.storm.contrib.cassandra.bolt.determinable.ColumnFamilyDeterminable; -import backtype.storm.contrib.cassandra.bolt.determinable.DefaultColumnFamilyDeterminable; -import backtype.storm.contrib.cassandra.bolt.determinable.DefaultRowKeyDeterminable; -import backtype.storm.contrib.cassandra.bolt.determinable.RowKeyDeterminable; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IRichBolt; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; - -@SuppressWarnings("serial") -public class CassandraBolt implements IRichBolt, CassandraConstants { - private static final Logger LOG = LoggerFactory - .getLogger(CassandraBolt.class); - - private OutputCollector collector; - private boolean autoAck = true; - - private Fields declaredFields; - - private String cassandraHost; - private String cassandraPort; - private String cassandraKeyspace; - - private Cluster cluster; - private Keyspace keyspace; - - private ColumnFamilyDeterminable cfDeterminable; - private RowKeyDeterminable rkDeterminable; - - public CassandraBolt(String columnFamily, String rowkeyField) { - - this(new DefaultColumnFamilyDeterminable(columnFamily), - new DefaultRowKeyDeterminable(rowkeyField)); - - } - - public CassandraBolt(ColumnFamilyDeterminable cfDeterminable, - RowKeyDeterminable rkDeterminable) { - this.cfDeterminable = cfDeterminable; - this.rkDeterminable = rkDeterminable; - } - - /* - * IRichBolt Implementation - */ - @Override - public void prepare(Map stormConf, TopologyContext context, - OutputCollector collector) { - LOG.debug("Preparing..."); - this.cassandraHost = (String) stormConf.get(CASSANDRA_HOST); - this.cassandraKeyspace = (String) stormConf.get(CASSANDRA_KEYSPACE); - this.cassandraPort = String.valueOf(stormConf.get(CASSANDRA_PORT)); - - this.collector = collector; - - initCassandraConnection(); - - } - - private void initCassandraConnection() { - // setup Cassandra connection - try { - this.cluster = HFactory.getOrCreateCluster("cassandra-bolt", - new CassandraHostConfigurator(this.cassandraHost + ":" - + this.cassandraPort)); - this.keyspace = HFactory.createKeyspace(this.cassandraKeyspace, - this.cluster); - } catch (Throwable e) { - LOG.warn("Preparation failed.", e); - throw new IllegalStateException("Failed to prepare CassandraBolt", - e); - } - } - - @Override - public void execute(Tuple input) { - LOG.debug("Tuple received: " + input); - try { - String columnFamily = this.cfDeterminable - .determineColumnFamily(input); - Object rowKey = this.rkDeterminable.determineRowKey(input); - - Mutator mutator = HFactory.createMutator(this.keyspace, - new StringSerializer()); - Fields fields = input.getFields(); - - for (int i = 0; i < fields.size(); i++) { - // LOG.debug("Name: " + fields.get(i) + ", Value: " - // + input.getValue(i)); - mutator.addInsertion(rowKey.toString(), columnFamily, HFactory - .createStringColumn(fields.get(i), input.getValue(i) - .toString())); - } - mutator.execute(); - - if (this.autoAck) { - this.collector.ack(input); - } - } catch (Throwable e) { - LOG.warn("Caught throwable.", e); - } - } - - @Override - public void cleanup() { - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - if (this.declaredFields != null) { - declarer.declare(this.declaredFields); - } - - } - - public boolean isAutoAck() { - return autoAck; - } - - public void setAutoAck(boolean autoAck) { - this.autoAck = autoAck; - } - -} diff --git a/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/CassandraConstants.java b/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/CassandraConstants.java deleted file mode 100644 index f83c202..0000000 --- a/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/CassandraConstants.java +++ /dev/null @@ -1,7 +0,0 @@ -package backtype.storm.contrib.cassandra.bolt; - -public interface CassandraConstants { - public static String CASSANDRA_HOST = "cassandra.host"; - public static final String CASSANDRA_PORT = "cassandra.port"; - public static final String CASSANDRA_KEYSPACE = "cassandra.keyspace"; -} diff --git a/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/DefaultBatchingCassandraBolt.java b/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/DefaultBatchingCassandraBolt.java deleted file mode 100644 index 815d607..0000000 --- a/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/DefaultBatchingCassandraBolt.java +++ /dev/null @@ -1,80 +0,0 @@ -package backtype.storm.contrib.cassandra.bolt; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import me.prettyprint.cassandra.serializers.StringSerializer; -import me.prettyprint.cassandra.service.CassandraHostConfigurator; -import me.prettyprint.hector.api.Cluster; -import me.prettyprint.hector.api.Keyspace; -import me.prettyprint.hector.api.factory.HFactory; -import me.prettyprint.hector.api.mutation.Mutator; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import backtype.storm.contrib.cassandra.bolt.BatchingCassandraBolt.AckStrategy; -import backtype.storm.contrib.cassandra.bolt.determinable.ColumnFamilyDeterminable; -import backtype.storm.contrib.cassandra.bolt.determinable.DefaultColumnFamilyDeterminable; -import backtype.storm.contrib.cassandra.bolt.determinable.DefaultRowKeyDeterminable; -import backtype.storm.contrib.cassandra.bolt.determinable.RowKeyDeterminable; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; - -@SuppressWarnings("serial") -public class DefaultBatchingCassandraBolt extends BatchingCassandraBolt implements - CassandraConstants { - private RowKeyDeterminable rkDeterminable; - public DefaultBatchingCassandraBolt( - ColumnFamilyDeterminable cfDeterminable, - RowKeyDeterminable rkDeterminable) { - super(cfDeterminable); - this.rkDeterminable = rkDeterminable; - } - - public DefaultBatchingCassandraBolt(String columnFamily, String rowKey) { - this(new DefaultColumnFamilyDeterminable(columnFamily), new DefaultRowKeyDeterminable(rowKey)); - } - - private static final Logger LOG = LoggerFactory - .getLogger(DefaultBatchingCassandraBolt.class); - - - @Override - public void executeBatch(List inputs) { - ArrayList tuplesToAck = new ArrayList(); - try { - Mutator mutator = HFactory.createMutator(this.keyspace, - new StringSerializer()); - for (Tuple input : inputs) { - String columnFamily = this.cfDeterminable - .determineColumnFamily(input); - Object rowKey = this.rkDeterminable.determineRowKey(input); - Fields fields = input.getFields(); - for (int i = 0; i < fields.size(); i++) { - mutator.addInsertion(rowKey.toString(), columnFamily, - HFactory.createStringColumn(fields.get(i), input - .getValue(i).toString())); - tuplesToAck.add(input); - } - } - mutator.execute(); - - } catch (Throwable e) { - LOG.warn("Unable to write batch.", e); - } finally { - if (this.ackStrategy == AckStrategy.ACK_ON_WRITE) { - for (Tuple tupleToAck : tuplesToAck) { - this.collector.ack(tupleToAck); - } - } - } - - } - - -} diff --git a/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/DelimitedColumnLookupBolt.java b/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/DelimitedColumnLookupBolt.java deleted file mode 100644 index 5de82ad..0000000 --- a/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/DelimitedColumnLookupBolt.java +++ /dev/null @@ -1,152 +0,0 @@ -// Copyright (c) 2012 P. Taylor Goetz - -package backtype.storm.contrib.cassandra.bolt; - -import java.util.Map; - -import me.prettyprint.cassandra.serializers.StringSerializer; -import me.prettyprint.cassandra.service.template.ColumnFamilyResult; -import me.prettyprint.cassandra.service.template.ColumnFamilyTemplate; -import me.prettyprint.cassandra.service.template.ThriftColumnFamilyTemplate; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.BasicOutputCollector; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; - -/** - * A bolt implementation that emits tuples based on a combination of cassandra - * rowkey, collumnkey, and delimiter. - *

- * When this bolt received a tuple, it will attempt the following: - *

    - *
  1. Look up a value in the tuple using rowKeyField
  2. - *
  3. Fetch the corresponding row from cassandra
  4. - *
  5. Fetch the column columnKeyField value from the row.
  6. - *
  7. Split the column value into an array based on delimiter
  8. - *
  9. For each value, emit a tuple with {emitIdFieldName}={value}
  10. - *
- * For example, given the following cassandra row:
- * - *
- * RowKey: mike
- * => (column=followers, value=john:bob, timestamp=1328848653055000)
- * 
- * - * and the following bolt setup: - * - *
- * rowKeyField = "rowKey"
- * columnKeyField = "followers"
- * delimiter = ":"
- * emitIdFieldName = "rowKey"
- * emitValueFieldName = "follower"
- * 
- * - * if the following tuple were received by the bolt: - * - *
- * {rowKey:mike}
- * 
- * - * The following tuples would be emitted: - * - *
- * {rowKey:mike, follower:john}
- * {rowKey:mike, follower:bob}
- * 
- * - * @author tgoetz - */ -@SuppressWarnings("serial") -public class DelimitedColumnLookupBolt extends BaseCassandraBolt { - - private static final Logger LOG = LoggerFactory - .getLogger(DelimitedColumnLookupBolt.class); - private String columnFamily; - private String rowKeyField; - private String columnKeyField; - private String delimiter; - - private String emitIdFieldName; - private String emitValueFieldName; - - private boolean isDrpc = false; - - public DelimitedColumnLookupBolt(String columnFamily, String rowKeyField, - String columnKeyField, String delimiter, - String emitIdFieldName, String emitValueFieldName, - boolean isDrpc) { - super(); - this.columnFamily = columnFamily; - this.rowKeyField = rowKeyField; - this.columnKeyField = columnKeyField; - this.delimiter = delimiter; - this.emitIdFieldName = emitIdFieldName; - this.emitValueFieldName = emitValueFieldName; - this.isDrpc = isDrpc; - } - - public DelimitedColumnLookupBolt(String columnFamily, String rowKeyField, - String columnKeyField, String delimiter, - String emitIdFieldName, String emitValueFieldName) { - this(columnFamily, rowKeyField, columnKeyField, delimiter, - emitIdFieldName, emitValueFieldName, false); - } - - @Override - public void prepare(Map stormConf, TopologyContext context) { - super.prepare(stormConf, context); - } - - @Override - public void execute(Tuple input, BasicOutputCollector collector) { -// LOG.debug("Tuple: " + input); - String rowKey = input.getStringByField(this.rowKeyField); -// LOG.debug("Row Key: " + rowKey); - - ColumnFamilyTemplate template = new ThriftColumnFamilyTemplate( - this.keyspace, this.columnFamily, - new StringSerializer(), new StringSerializer()); - - ColumnFamilyResult result = template - .queryColumns(rowKey); -// LOG.debug("looking for column: " + this.columnKeyField); - String delimVal = result.getString(this.columnKeyField); - if (delimVal != null) { - String[] vals = delimVal.split(this.delimiter); - for (String val : vals) { -// LOG.debug("Emitting: {" + rowKey + ":" + val + "}"); - if (this.isDrpc) { - collector.emit(new Values(input.getValue(0), rowKey, val)); - } - else { - collector.emit(new Values(rowKey, val)); - } - } - } - } - - @Override - public void cleanup() { - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - if (this.isDrpc) { - declarer.declare(new Fields("id", this.emitIdFieldName, - this.emitValueFieldName)); - } - else { - declarer.declare(new Fields(this.emitIdFieldName, - this.emitValueFieldName)); - } - - } - -} diff --git a/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/ValueLessColumnLookupBolt.java b/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/ValueLessColumnLookupBolt.java deleted file mode 100644 index cd0f6b3..0000000 --- a/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/ValueLessColumnLookupBolt.java +++ /dev/null @@ -1,139 +0,0 @@ -// Copyright (c) 2012 P. Taylor Goetz - -package backtype.storm.contrib.cassandra.bolt; - -import java.util.Map; - -import me.prettyprint.cassandra.serializers.StringSerializer; -import me.prettyprint.cassandra.service.template.ColumnFamilyResult; -import me.prettyprint.cassandra.service.template.ColumnFamilyTemplate; -import me.prettyprint.cassandra.service.template.ThriftColumnFamilyTemplate; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.BasicOutputCollector; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; - -/** - * A bolt implementation that emits tuples based on a combination of cassandra - * rowkey, collumnkey, and delimiter. - *

- * When this bolt received a tuple, it will attempt the following: - *

    - *
  1. Look up a value in the tuple using rowKeyField
  2. - *
  3. Fetch the corresponding row from cassandra
  4. - *
  5. Fetch the column columnKeyField value from the row.
  6. - *
  7. Split the column value into an array based on delimiter
  8. - *
  9. For each value, emit a tuple with {emitIdFieldName}={value}
  10. - *
- * For example, given the following cassandra row:
- * - *
- * RowKey: mike
- * => (column=followers, value=john:bob, timestamp=1328848653055000)
- * 
- * - * and the following bolt setup: - * - *
- * rowKeyField = "rowKey"
- * columnKeyField = "followers"
- * delimiter = ":"
- * emitIdFieldName = "rowKey"
- * emitValueFieldName = "follower"
- * 
- * - * if the following tuple were received by the bolt: - * - *
- * {rowKey:mike}
- * 
- * - * The following tuples would be emitted: - * - *
- * {rowKey:mike, follower:john}
- * {rowKey:mike, follower:bob}
- * 
- * - * @author tgoetz - */ -@SuppressWarnings("serial") -public class ValueLessColumnLookupBolt extends BaseCassandraBolt { - - private static final Logger LOG = LoggerFactory - .getLogger(ValueLessColumnLookupBolt.class); - private String columnFamily; - private String rowKeyField; - - private String emitIdFieldName; - private String emitValueFieldName; - - private boolean isDrpc = false; - - public ValueLessColumnLookupBolt(String columnFamily, String rowKeyField, - String emitIdFieldName, String emitValueFieldName, - boolean isDrpc) { - super(); - this.columnFamily = columnFamily; - this.rowKeyField = rowKeyField; - this.emitIdFieldName = emitIdFieldName; - this.emitValueFieldName = emitValueFieldName; - this.isDrpc = isDrpc; - } - - public ValueLessColumnLookupBolt(String columnFamily, String rowKeyField, - String columnKeyField, String delimiter, - String emitIdFieldName, String emitValueFieldName) { - this(columnFamily, rowKeyField, - emitIdFieldName, emitValueFieldName, false); - } - - @Override - public void prepare(Map stormConf, TopologyContext context) { - super.prepare(stormConf, context); - } - - @Override - public void execute(Tuple input, BasicOutputCollector collector) { - String rowKey = input.getStringByField(this.rowKeyField); - - ColumnFamilyTemplate template = new ThriftColumnFamilyTemplate( - this.keyspace, this.columnFamily, - new StringSerializer(), new StringSerializer()); - - ColumnFamilyResult result = template - .queryColumns(rowKey); - for(String val : result.getColumnNames()){ - if (this.isDrpc) { - collector.emit(new Values(input.getValue(0), rowKey, val)); - } - else { - collector.emit(new Values(rowKey, val)); - } - } - } - - @Override - public void cleanup() { - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - if (this.isDrpc) { - declarer.declare(new Fields("id", this.emitIdFieldName, - this.emitValueFieldName)); - } - else { - declarer.declare(new Fields(this.emitIdFieldName, - this.emitValueFieldName)); - } - - } - -} diff --git a/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/determinable/ColumnFamilyDeterminable.java b/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/determinable/ColumnFamilyDeterminable.java deleted file mode 100644 index d4c6442..0000000 --- a/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/determinable/ColumnFamilyDeterminable.java +++ /dev/null @@ -1,17 +0,0 @@ -package backtype.storm.contrib.cassandra.bolt.determinable; - -import java.io.Serializable; - -import backtype.storm.tuple.Tuple; - -public interface ColumnFamilyDeterminable extends Serializable{ - - /** - * Given a backtype.storm.tuple.Tuple object, - * determine the column family to write to. - * - * @param tuple - * @return - */ - public String determineColumnFamily(Tuple tuple); -} diff --git a/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/determinable/DefaultColumnFamilyDeterminable.java b/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/determinable/DefaultColumnFamilyDeterminable.java deleted file mode 100644 index ff3ec4a..0000000 --- a/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/determinable/DefaultColumnFamilyDeterminable.java +++ /dev/null @@ -1,19 +0,0 @@ -package backtype.storm.contrib.cassandra.bolt.determinable; - -import backtype.storm.tuple.Tuple; - -public class DefaultColumnFamilyDeterminable implements - ColumnFamilyDeterminable { - - private String columnFamily; - - public DefaultColumnFamilyDeterminable(String columnFamily){ - this.columnFamily = columnFamily; - } - - @Override - public String determineColumnFamily(Tuple tuple) { - return this.columnFamily; - } - -} diff --git a/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/determinable/DefaultRowKeyDeterminable.java b/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/determinable/DefaultRowKeyDeterminable.java deleted file mode 100644 index 18eafd6..0000000 --- a/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/determinable/DefaultRowKeyDeterminable.java +++ /dev/null @@ -1,17 +0,0 @@ -package backtype.storm.contrib.cassandra.bolt.determinable; - -import backtype.storm.tuple.Tuple; - -public class DefaultRowKeyDeterminable implements RowKeyDeterminable { - private String keyField; - - public DefaultRowKeyDeterminable(String keyField){ - this.keyField = keyField; - } - - @Override - public Object determineRowKey(Tuple tuple) { - return tuple.getValueByField(this.keyField); - } - -} diff --git a/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/determinable/RowKeyDeterminable.java b/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/determinable/RowKeyDeterminable.java deleted file mode 100644 index 09ab3eb..0000000 --- a/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/determinable/RowKeyDeterminable.java +++ /dev/null @@ -1,16 +0,0 @@ -package backtype.storm.contrib.cassandra.bolt.determinable; - -import java.io.Serializable; - -import backtype.storm.tuple.Tuple; - -public interface RowKeyDeterminable extends Serializable { - /** - * Given a backtype.storm.tuple.Tuple generate a Cassandra - * row key. - * - * @param tuple - * @return - */ - Object determineRowKey(Tuple tuple); -} diff --git a/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/serialize/CassandraSerializer.java b/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/serialize/CassandraSerializer.java deleted file mode 100644 index d2db399..0000000 --- a/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/serialize/CassandraSerializer.java +++ /dev/null @@ -1,7 +0,0 @@ -package backtype.storm.contrib.cassandra.bolt.serialize; - -import backtype.storm.tuple.Tuple; - -public interface CassandraSerializer { - public void writeToCassandra(Tuple tuple); -} diff --git a/storm-cassandra/src/main/resources/log4j.properties b/storm-cassandra/src/main/resources/log4j.properties deleted file mode 100644 index 4fe3dac..0000000 --- a/storm-cassandra/src/main/resources/log4j.properties +++ /dev/null @@ -1,16 +0,0 @@ -log4j.rootLogger=INFO, stdout - -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout - -# Pattern to output the caller's file name and line number. -log4j.appender.stdout.layout.ConversionPattern=%5p (%C:%L) - %m%n - - - -# Print only messages of level WARN or above in the package com.foo. -log4j.logger.backtype.storm=DEBUG -log4j.logger.clojure.contrib=WARN -log4j.logger.org.springframework=WARN -log4j.logger.org.apache.zookeeper=WARN - diff --git a/storm-jms b/storm-jms new file mode 160000 index 0000000..dd4e5e6 --- /dev/null +++ b/storm-jms @@ -0,0 +1 @@ +Subproject commit dd4e5e694b814a638d4c510abf5cb50952030cd5 diff --git a/storm-jms/LICENSE.html b/storm-jms/LICENSE.html deleted file mode 100644 index fd39122..0000000 --- a/storm-jms/LICENSE.html +++ /dev/null @@ -1,261 +0,0 @@ - - - - - - -Eclipse Public License - Version 1.0 - - - - - - -

Eclipse Public License - v 1.0

- -

THE ACCOMPANYING PROGRAM IS PROVIDED UNDER THE TERMS OF THIS ECLIPSE -PUBLIC LICENSE ("AGREEMENT"). ANY USE, REPRODUCTION OR -DISTRIBUTION OF THE PROGRAM CONSTITUTES RECIPIENT'S ACCEPTANCE OF THIS -AGREEMENT.

- -

1. DEFINITIONS

- -

"Contribution" means:

- -

a) in the case of the initial Contributor, the initial -code and documentation distributed under this Agreement, and

-

b) in the case of each subsequent Contributor:

-

i) changes to the Program, and

-

ii) additions to the Program;

-

where such changes and/or additions to the Program -originate from and are distributed by that particular Contributor. A -Contribution 'originates' from a Contributor if it was added to the -Program by such Contributor itself or anyone acting on such -Contributor's behalf. Contributions do not include additions to the -Program which: (i) are separate modules of software distributed in -conjunction with the Program under their own license agreement, and (ii) -are not derivative works of the Program.

- -

"Contributor" means any person or entity that distributes -the Program.

- -

"Licensed Patents" mean patent claims licensable by a -Contributor which are necessarily infringed by the use or sale of its -Contribution alone or when combined with the Program.

- -

"Program" means the Contributions distributed in accordance -with this Agreement.

- -

"Recipient" means anyone who receives the Program under -this Agreement, including all Contributors.

- -

2. GRANT OF RIGHTS

- -

a) Subject to the terms of this Agreement, each -Contributor hereby grants Recipient a non-exclusive, worldwide, -royalty-free copyright license to reproduce, prepare derivative works -of, publicly display, publicly perform, distribute and sublicense the -Contribution of such Contributor, if any, and such derivative works, in -source code and object code form.

- -

b) Subject to the terms of this Agreement, each -Contributor hereby grants Recipient a non-exclusive, worldwide, -royalty-free patent license under Licensed Patents to make, use, sell, -offer to sell, import and otherwise transfer the Contribution of such -Contributor, if any, in source code and object code form. This patent -license shall apply to the combination of the Contribution and the -Program if, at the time the Contribution is added by the Contributor, -such addition of the Contribution causes such combination to be covered -by the Licensed Patents. The patent license shall not apply to any other -combinations which include the Contribution. No hardware per se is -licensed hereunder.

- -

c) Recipient understands that although each Contributor -grants the licenses to its Contributions set forth herein, no assurances -are provided by any Contributor that the Program does not infringe the -patent or other intellectual property rights of any other entity. Each -Contributor disclaims any liability to Recipient for claims brought by -any other entity based on infringement of intellectual property rights -or otherwise. As a condition to exercising the rights and licenses -granted hereunder, each Recipient hereby assumes sole responsibility to -secure any other intellectual property rights needed, if any. For -example, if a third party patent license is required to allow Recipient -to distribute the Program, it is Recipient's responsibility to acquire -that license before distributing the Program.

- -

d) Each Contributor represents that to its knowledge it -has sufficient copyright rights in its Contribution, if any, to grant -the copyright license set forth in this Agreement.

- -

3. REQUIREMENTS

- -

A Contributor may choose to distribute the Program in object code -form under its own license agreement, provided that:

- -

a) it complies with the terms and conditions of this -Agreement; and

- -

b) its license agreement:

- -

i) effectively disclaims on behalf of all Contributors -all warranties and conditions, express and implied, including warranties -or conditions of title and non-infringement, and implied warranties or -conditions of merchantability and fitness for a particular purpose;

- -

ii) effectively excludes on behalf of all Contributors -all liability for damages, including direct, indirect, special, -incidental and consequential damages, such as lost profits;

- -

iii) states that any provisions which differ from this -Agreement are offered by that Contributor alone and not by any other -party; and

- -

iv) states that source code for the Program is available -from such Contributor, and informs licensees how to obtain it in a -reasonable manner on or through a medium customarily used for software -exchange.

- -

When the Program is made available in source code form:

- -

a) it must be made available under this Agreement; and

- -

b) a copy of this Agreement must be included with each -copy of the Program.

- -

Contributors may not remove or alter any copyright notices contained -within the Program.

- -

Each Contributor must identify itself as the originator of its -Contribution, if any, in a manner that reasonably allows subsequent -Recipients to identify the originator of the Contribution.

- -

4. COMMERCIAL DISTRIBUTION

- -

Commercial distributors of software may accept certain -responsibilities with respect to end users, business partners and the -like. While this license is intended to facilitate the commercial use of -the Program, the Contributor who includes the Program in a commercial -product offering should do so in a manner which does not create -potential liability for other Contributors. Therefore, if a Contributor -includes the Program in a commercial product offering, such Contributor -("Commercial Contributor") hereby agrees to defend and -indemnify every other Contributor ("Indemnified Contributor") -against any losses, damages and costs (collectively "Losses") -arising from claims, lawsuits and other legal actions brought by a third -party against the Indemnified Contributor to the extent caused by the -acts or omissions of such Commercial Contributor in connection with its -distribution of the Program in a commercial product offering. The -obligations in this section do not apply to any claims or Losses -relating to any actual or alleged intellectual property infringement. In -order to qualify, an Indemnified Contributor must: a) promptly notify -the Commercial Contributor in writing of such claim, and b) allow the -Commercial Contributor to control, and cooperate with the Commercial -Contributor in, the defense and any related settlement negotiations. The -Indemnified Contributor may participate in any such claim at its own -expense.

- -

For example, a Contributor might include the Program in a commercial -product offering, Product X. That Contributor is then a Commercial -Contributor. If that Commercial Contributor then makes performance -claims, or offers warranties related to Product X, those performance -claims and warranties are such Commercial Contributor's responsibility -alone. Under this section, the Commercial Contributor would have to -defend claims against the other Contributors related to those -performance claims and warranties, and if a court requires any other -Contributor to pay any damages as a result, the Commercial Contributor -must pay those damages.

- -

5. NO WARRANTY

- -

EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, THE PROGRAM IS -PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS -OF ANY KIND, EITHER EXPRESS OR IMPLIED INCLUDING, WITHOUT LIMITATION, -ANY WARRANTIES OR CONDITIONS OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY -OR FITNESS FOR A PARTICULAR PURPOSE. Each Recipient is solely -responsible for determining the appropriateness of using and -distributing the Program and assumes all risks associated with its -exercise of rights under this Agreement , including but not limited to -the risks and costs of program errors, compliance with applicable laws, -damage to or loss of data, programs or equipment, and unavailability or -interruption of operations.

- -

6. DISCLAIMER OF LIABILITY

- -

EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, NEITHER RECIPIENT -NOR ANY CONTRIBUTORS SHALL HAVE ANY LIABILITY FOR ANY DIRECT, INDIRECT, -INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING -WITHOUT LIMITATION LOST PROFITS), HOWEVER CAUSED AND ON ANY THEORY OF -LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING -NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OR -DISTRIBUTION OF THE PROGRAM OR THE EXERCISE OF ANY RIGHTS GRANTED -HEREUNDER, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGES.

- -

7. GENERAL

- -

If any provision of this Agreement is invalid or unenforceable under -applicable law, it shall not affect the validity or enforceability of -the remainder of the terms of this Agreement, and without further action -by the parties hereto, such provision shall be reformed to the minimum -extent necessary to make such provision valid and enforceable.

- -

If Recipient institutes patent litigation against any entity -(including a cross-claim or counterclaim in a lawsuit) alleging that the -Program itself (excluding combinations of the Program with other -software or hardware) infringes such Recipient's patent(s), then such -Recipient's rights granted under Section 2(b) shall terminate as of the -date such litigation is filed.

- -

All Recipient's rights under this Agreement shall terminate if it -fails to comply with any of the material terms or conditions of this -Agreement and does not cure such failure in a reasonable period of time -after becoming aware of such noncompliance. If all Recipient's rights -under this Agreement terminate, Recipient agrees to cease use and -distribution of the Program as soon as reasonably practicable. However, -Recipient's obligations under this Agreement and any licenses granted by -Recipient relating to the Program shall continue and survive.

- -

Everyone is permitted to copy and distribute copies of this -Agreement, but in order to avoid inconsistency the Agreement is -copyrighted and may only be modified in the following manner. The -Agreement Steward reserves the right to publish new versions (including -revisions) of this Agreement from time to time. No one other than the -Agreement Steward has the right to modify this Agreement. The Eclipse -Foundation is the initial Agreement Steward. The Eclipse Foundation may -assign the responsibility to serve as the Agreement Steward to a -suitable separate entity. Each new version of the Agreement will be -given a distinguishing version number. The Program (including -Contributions) may always be distributed subject to the version of the -Agreement under which it was received. In addition, after a new version -of the Agreement is published, Contributor may elect to distribute the -Program (including its Contributions) under the new version. Except as -expressly stated in Sections 2(a) and 2(b) above, Recipient receives no -rights or licenses to the intellectual property of any Contributor under -this Agreement, whether expressly, by implication, estoppel or -otherwise. All rights in the Program not expressly granted under this -Agreement are reserved.

- -

This Agreement is governed by the laws of the State of New York and -the intellectual property laws of the United States of America. No party -to this Agreement will bring a legal action under this Agreement more -than one year after the cause of action arose. Each party waives its -rights to a jury trial in any resulting litigation.

- - - - diff --git a/storm-jms/README.markdown b/storm-jms/README.markdown deleted file mode 100644 index 3f091a6..0000000 --- a/storm-jms/README.markdown +++ /dev/null @@ -1,40 +0,0 @@ -## About Storm JMS -Storm JMS is a generic framework for integrating JMS messaging within the Storm framework. - -The [Storm Rationale page](https://github.com/nathanmarz/storm/wiki/Rationale) explains what storm is and why it was built. - -Storm-JMS allows you to inject data into Storm via a generic JMS spout, as well as consume data from Storm via a generic JMS bolt. - -Both the JMS Spout and JMS Bolt are data agnostic. To use them, you provide a simple Java class that bridges the JMS and Storm APIs and encapsulates and domain-specific logic. - -## Components - -### JMS Spout -The JMS Spout component allows for data published to a JMS topic or queue to be consumed by a Storm topology. - -A JMS Spout connects to a JMS Destination (topic or queue), and emits Storm "Tuple" objects based on the content of the JMS message received. - - -### JMS Bolt -The JMS Bolt component allows for data within a Storm topology to be published to a JMS destination (topic or queue). - -A JMS Bolt connects to a JMS Destination, and publishes JMS Messages based on the Storm "Tuple" objects it receives. - - -## Documentation - -Documentation and tutorials can be found on the [Storm-JMS wiki](http://github.com/ptgoetz/storm-jms/wiki). - - -## License - -The use and distribution terms for this software are covered by the -Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php) -which can be found in the file LICENSE.html at the root of this distribution. -By using this software in any fashion, you are agreeing to be bound by -the terms of this license. -You must not remove this notice, or any other, from this software. - -## Contributors - -* P. Taylor Goetz ([@ptgoetz](http://twitter.com/ptgoetz)) diff --git a/storm-jms/examples/README.markdown b/storm-jms/examples/README.markdown deleted file mode 100644 index 7846b99..0000000 --- a/storm-jms/examples/README.markdown +++ /dev/null @@ -1,23 +0,0 @@ -## About Storm JMS Examples -This project contains a simple storm topology that illustrates the usage of "storm-jms". - -To build: - -`mvn clean install` - -The default build will create a jar file that can be deployed to to a Storm cluster in the "target" directory: - -`storm-jms-examples-0.1-SNAPSHOT-jar-with-dependencies.jar` - -## License - -The use and distribution terms for this software are covered by the -Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php) -which can be found in the file LICENSE.html at the root of this distribution. -By using this software in any fashion, you are agreeing to be bound by -the terms of this license. -You must not remove this notice, or any other, from this software. - -## Contributors - -* P. Taylor Goetz ([@ptgoetz](http://twitter.com/ptgoetz)) diff --git a/storm-jms/examples/pom.xml b/storm-jms/examples/pom.xml deleted file mode 100644 index 99fdb19..0000000 --- a/storm-jms/examples/pom.xml +++ /dev/null @@ -1,135 +0,0 @@ - - 4.0.0 - backtype.storm.contrib - storm-jms-examples - 0.1-SNAPSHOT - Storm JMS Examples - Storm JMS Examples - - - clojars.org - http://clojars.org/repo - - - - 2.5.6 - 0.6.2 - - - - org.springframework - spring-beans - ${spring.version} - - - org.springframework - spring-core - ${spring.version} - - - org.springframework - spring-context - ${spring.version} - - - org.springframework - spring-jms - ${spring.version} - - - org.apache.xbean - xbean-spring - 3.7 - - - storm - storm - ${storm.version} - - provided - - - com.github.ptgoetz - storm-jms - 0.1.0-SNAPSHOT - - - org.apache.activemq - activemq-core - 5.4.0 - - - - - - - maven-assembly-plugin - - - jar-with-dependencies - - - - - - - - - - make-assembly - package - - single - - - - - - - - org.codehaus.mojo - exec-maven-plugin - 1.2.1 - - - - exec - - - - - java - true - true - backtype.storm.contrib.jms.example.ExampleJmsTopology - - - log4j.configuration - file:./src/main/resources/log4j.properties - - - - - - storm - storm - ${storm.version} - jar - - - - - org.apache.maven.plugins - maven-compiler-plugin - - 1.6 - 1.6 - - - - - - - \ No newline at end of file diff --git a/storm-jms/examples/src/main/java/backtype/storm/contrib/jms/example/ExampleJmsTopology.java b/storm-jms/examples/src/main/java/backtype/storm/contrib/jms/example/ExampleJmsTopology.java deleted file mode 100644 index 4c6ccad..0000000 --- a/storm-jms/examples/src/main/java/backtype/storm/contrib/jms/example/ExampleJmsTopology.java +++ /dev/null @@ -1,114 +0,0 @@ -package backtype.storm.contrib.jms.example; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.Session; -import javax.jms.TextMessage; - -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.StormSubmitter; -import backtype.storm.contrib.jms.JmsMessageProducer; -import backtype.storm.contrib.jms.JmsProvider; -import backtype.storm.contrib.jms.JmsTupleProducer; -import backtype.storm.contrib.jms.bolt.JmsBolt; -import backtype.storm.contrib.jms.spout.JmsSpout; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.utils.Utils; - -public class ExampleJmsTopology { - public static final String JMS_QUEUE_SPOUT = "JMS_QUEUE_SPOUT"; - public static final String INTERMEDIATE_BOLT = "INTERMEDIATE_BOLT"; - public static final String FINAL_BOLT = "FINAL_BOLT"; - public static final String JMS_TOPIC_BOLT = "JMS_TOPIC_BOLT"; - public static final String JMS_TOPIC_SPOUT = "JMS_TOPIC_SPOUT"; - public static final String ANOTHER_BOLT = "ANOTHER_BOLT"; - - @SuppressWarnings("serial") - public static void main(String[] args) throws Exception { - - // JMS Queue Provider - JmsProvider jmsQueueProvider = new SpringJmsProvider( - "jms-activemq.xml", "jmsConnectionFactory", - "notificationQueue"); - - // JMS Topic provider - JmsProvider jmsTopicProvider = new SpringJmsProvider( - "jms-activemq.xml", "jmsConnectionFactory", - "notificationTopic"); - - // JMS Producer - JmsTupleProducer producer = new JsonTupleProducer(); - - // JMS Queue Spout - JmsSpout queueSpout = new JmsSpout(); - queueSpout.setJmsProvider(jmsQueueProvider); - queueSpout.setJmsTupleProducer(producer); - queueSpout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); - queueSpout.setDistributed(true); // allow multiple instances - - TopologyBuilder builder = new TopologyBuilder(); - - // spout with 5 parallel instances - builder.setSpout(JMS_QUEUE_SPOUT, queueSpout, 5); - - // intermediate bolt, subscribes to jms spout, anchors on tuples, and auto-acks - builder.setBolt(INTERMEDIATE_BOLT, - new GenericBolt("INTERMEDIATE_BOLT", true, true, new Fields("json")), 3).shuffleGrouping( - JMS_QUEUE_SPOUT); - - // bolt that subscribes to the intermediate bolt, and auto-acks - // messages. - builder.setBolt(FINAL_BOLT, new GenericBolt("FINAL_BOLT", true, true), 3).shuffleGrouping( - INTERMEDIATE_BOLT); - - // bolt that subscribes to the intermediate bolt, and publishes to a JMS Topic - JmsBolt jmsBolt = new JmsBolt(); - jmsBolt.setJmsProvider(jmsTopicProvider); - - // anonymous message producer just calls toString() on the tuple to create a jms message - jmsBolt.setJmsMessageProducer(new JmsMessageProducer() { - @Override - public Message toMessage(Session session, Tuple input) throws JMSException{ - System.out.println("Sending JMS Message:" + input.toString()); - TextMessage tm = session.createTextMessage(input.toString()); - return tm; - } - }); - - builder.setBolt(JMS_TOPIC_BOLT, jmsBolt).shuffleGrouping(INTERMEDIATE_BOLT); - - // JMS Topic spout - JmsSpout topicSpout = new JmsSpout(); - topicSpout.setJmsProvider(jmsTopicProvider); - topicSpout.setJmsTupleProducer(producer); - topicSpout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); - topicSpout.setDistributed(false); - - builder.setSpout(JMS_TOPIC_SPOUT, topicSpout); - - builder.setBolt(ANOTHER_BOLT, new GenericBolt("ANOTHER_BOLT", true, true), 1).shuffleGrouping( - JMS_TOPIC_SPOUT); - - Config conf = new Config(); - - if (args.length > 0) { - conf.setNumWorkers(3); - - StormSubmitter.submitTopology(args[0], conf, - builder.createTopology()); - } else { - - conf.setDebug(true); - - LocalCluster cluster = new LocalCluster(); - cluster.submitTopology("storm-jms-example", conf, builder.createTopology()); - Utils.sleep(60000); - cluster.killTopology("storm-jms-example"); - cluster.shutdown(); - } - } - -} diff --git a/storm-jms/examples/src/main/java/backtype/storm/contrib/jms/example/GenericBolt.java b/storm-jms/examples/src/main/java/backtype/storm/contrib/jms/example/GenericBolt.java deleted file mode 100644 index bd0dada..0000000 --- a/storm-jms/examples/src/main/java/backtype/storm/contrib/jms/example/GenericBolt.java +++ /dev/null @@ -1,99 +0,0 @@ -package backtype.storm.contrib.jms.example; - -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IRichBolt; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -/** - * A generic backtype.storm.topology.IRichBolt implementation - * for testing/debugging the Storm JMS Spout and example topologies. - *

- * For debugging purposes, set the log level of the - * backtype.storm.contrib.jms package to DEBUG for debugging - * output. - * @author tgoetz - * - */ -@SuppressWarnings("serial") -public class GenericBolt implements IRichBolt { - private static final Logger LOG = LoggerFactory.getLogger(GenericBolt.class); - private OutputCollector collector; - private boolean autoAck = false; - private boolean autoAnchor = false; - private Fields declaredFields; - private String name; - - /** - * Constructs a new GenericBolt instance. - * - * @param name The name of the bolt (used in DEBUG logging) - * @param autoAck Whether or not this bolt should automatically acknowledge received tuples. - * @param autoAnchor Whether or not this bolt should automatically anchor to received tuples. - * @param declaredFields The fields this bolt declares as output. - */ - public GenericBolt(String name, boolean autoAck, boolean autoAnchor, Fields declaredFields){ - this.name = name; - this.autoAck = autoAck; - this.autoAnchor = autoAnchor; - this.declaredFields = declaredFields; - } - - public GenericBolt(String name, boolean autoAck, boolean autoAnchor){ - this(name, autoAck, autoAnchor, null); - } - - @SuppressWarnings("rawtypes") - public void prepare(Map stormConf, TopologyContext context, - OutputCollector collector) { - this.collector = collector; - - } - - public void execute(Tuple input) { - LOG.debug("[" + this.name + "] Received message: " + input); - - - - // only emit if we have declared fields. - if(this.declaredFields != null){ - LOG.debug("[" + this.name + "] emitting: " + input); - if(this.autoAnchor){ - this.collector.emit(input, input.getValues()); - } else{ - this.collector.emit(input.getValues()); - } - } - - if(this.autoAck){ - LOG.debug("[" + this.name + "] ACKing tuple: " + input); - this.collector.ack(input); - } - - } - - public void cleanup() { - - } - - public void declareOutputFields(OutputFieldsDeclarer declarer) { - if(this.declaredFields != null){ - declarer.declare(this.declaredFields); - } - } - - public boolean isAutoAck(){ - return this.autoAck; - } - - public void setAutoAck(boolean autoAck){ - this.autoAck = autoAck; - } - -} diff --git a/storm-jms/examples/src/main/java/backtype/storm/contrib/jms/example/JsonTupleProducer.java b/storm-jms/examples/src/main/java/backtype/storm/contrib/jms/example/JsonTupleProducer.java deleted file mode 100644 index 35dc9f8..0000000 --- a/storm-jms/examples/src/main/java/backtype/storm/contrib/jms/example/JsonTupleProducer.java +++ /dev/null @@ -1,41 +0,0 @@ -package backtype.storm.contrib.jms.example; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.TextMessage; - -import backtype.storm.contrib.jms.JmsTupleProducer; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Values; - -/** - * A simple JmsTupleProducer that expects to receive - * JMS TextMessage objects with a body in JSON format. - *

- * Ouputs a tuple with field name "json" and a string value - * containing the raw json. - *

- * NOTE: Currently this implementation assumes the text is valid - * JSON and does not attempt to parse or validate it. - * - * @author tgoetz - * - */ -@SuppressWarnings("serial") -public class JsonTupleProducer implements JmsTupleProducer { - - public Values toTuple(Message msg) throws JMSException { - if(msg instanceof TextMessage){ - String json = ((TextMessage) msg).getText(); - return new Values(json); - } else { - return null; - } - } - - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("json")); - } - -} diff --git a/storm-jms/examples/src/main/java/backtype/storm/contrib/jms/example/SpringJmsProvider.java b/storm-jms/examples/src/main/java/backtype/storm/contrib/jms/example/SpringJmsProvider.java deleted file mode 100644 index ba2dfce..0000000 --- a/storm-jms/examples/src/main/java/backtype/storm/contrib/jms/example/SpringJmsProvider.java +++ /dev/null @@ -1,58 +0,0 @@ -package backtype.storm.contrib.jms.example; - -import javax.jms.ConnectionFactory; -import javax.jms.Destination; - -import org.springframework.context.ApplicationContext; -import org.springframework.context.support.ClassPathXmlApplicationContext; - -import backtype.storm.contrib.jms.JmsProvider; - - -/** - * A JmsProvider that uses the spring framework - * to obtain a JMS ConnectionFactory and - * Desitnation objects. - *

- * The constructor takes three arguments: - *

    - *
  1. A string pointing to the the spring application context file contining the JMS configuration - * (must be on the classpath) - *
  2. - *
  3. The name of the connection factory bean
  4. - *
  5. The name of the destination bean
  6. - *
- * - * - * @author tgoetz - * - */ -@SuppressWarnings("serial") -public class SpringJmsProvider implements JmsProvider { - private ConnectionFactory connectionFactory; - private Destination destination; - - /** - * Constructs a SpringJmsProvider object given the name of a - * classpath resource (the spring application context file), and the bean - * names of a JMS connection factory and destination. - * - * @param appContextClasspathResource - the spring configuration file (classpath resource) - * @param connectionFactoryBean - the JMS connection factory bean name - * @param destinationBean - the JMS destination bean name - */ - public SpringJmsProvider(String appContextClasspathResource, String connectionFactoryBean, String destinationBean){ - ApplicationContext context = new ClassPathXmlApplicationContext(appContextClasspathResource); - this.connectionFactory = (ConnectionFactory)context.getBean(connectionFactoryBean); - this.destination = (Destination)context.getBean(destinationBean); - } - - public ConnectionFactory connectionFactory() throws Exception { - return this.connectionFactory; - } - - public Destination destination() throws Exception { - return this.destination; - } - -} diff --git a/storm-jms/examples/src/main/resources/jms-activemq.xml b/storm-jms/examples/src/main/resources/jms-activemq.xml deleted file mode 100644 index db50fcf..0000000 --- a/storm-jms/examples/src/main/resources/jms-activemq.xml +++ /dev/null @@ -1,37 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - diff --git a/storm-jms/examples/src/main/resources/log4j.properties b/storm-jms/examples/src/main/resources/log4j.properties deleted file mode 100644 index 31a50d6..0000000 --- a/storm-jms/examples/src/main/resources/log4j.properties +++ /dev/null @@ -1,13 +0,0 @@ -log4j.rootLogger=INFO, stdout - -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout - -log4j.appender.stdout.layout.ConversionPattern=%5p (%C:%L) - %m%n - - -log4j.logger.backtype.storm.contrib=DEBUG -log4j.logger.clojure.contrib=WARN -log4j.logger.org.springframework=WARN -log4j.logger.org.apache.zookeeper=WARN - diff --git a/storm-jms/pom.xml b/storm-jms/pom.xml deleted file mode 100644 index cb11e86..0000000 --- a/storm-jms/pom.xml +++ /dev/null @@ -1,69 +0,0 @@ - - - - org.sonatype.oss - oss-parent - 7 - - - - 4.0.0 - com.github.ptgoetz - storm-jms - 0.1.1-SNAPSHOT - Storm JMS - Storm JMS Components - - - - - Eclipse Public License - v 1.0 - http://www.eclipse.org/legal/epl-v10.html - repo - - - - scm:git:git@github.com:ptgoetz/storm-jms.git - scm:git:git@github.com:ptgoetz/storm-jms.git - :git@github.com:ptgoetz/storm-jms.git - - - - - ptgoetz - P. Taylor Goetz - ptgoetz@gmail.com - - - - - - 0.6.2 - - - - storm - storm - ${storm.version} - - provided - - - org.apache.geronimo.specs - geronimo-jms_1.1_spec - 1.1.1 - - - - - - org.apache.maven.plugins - maven-compiler-plugin - - 1.6 - 1.6 - - - - - \ No newline at end of file diff --git a/storm-jms/src/main/java/backtype/storm/contrib/jms/JmsMessageProducer.java b/storm-jms/src/main/java/backtype/storm/contrib/jms/JmsMessageProducer.java deleted file mode 100644 index bd07ccb..0000000 --- a/storm-jms/src/main/java/backtype/storm/contrib/jms/JmsMessageProducer.java +++ /dev/null @@ -1,33 +0,0 @@ -package backtype.storm.contrib.jms; - -import java.io.Serializable; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.Session; - -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; -/** - * JmsMessageProducer implementations are responsible for translating - * a backtype.storm.tuple.Values instance into a - * javax.jms.Message object. - *

- * - * - * @author P. Taylor Goetz - * - */ -public interface JmsMessageProducer extends Serializable{ - - /** - * Translate a backtype.storm.tuple.Tuple object - * to a javax.jms.MessageJmsProvider object encapsulates the ConnectionFactory - * and Destination JMS objects the JmsSpout needs to manage - * a topic/queue connection over the course of it's lifecycle. - * - * @author P. Taylor Goetz - * - */ -public interface JmsProvider extends Serializable{ - /** - * Provides the JMS ConnectionFactory - * @return the connection factory - * @throws Exception - */ - public ConnectionFactory connectionFactory() throws Exception; - - /** - * Provides the Destination (topic or queue) from which the - * JmsSpout will receive messages. - * @return - * @throws Exception - */ - public Destination destination() throws Exception; -} diff --git a/storm-jms/src/main/java/backtype/storm/contrib/jms/JmsTupleProducer.java b/storm-jms/src/main/java/backtype/storm/contrib/jms/JmsTupleProducer.java deleted file mode 100644 index a76837d..0000000 --- a/storm-jms/src/main/java/backtype/storm/contrib/jms/JmsTupleProducer.java +++ /dev/null @@ -1,41 +0,0 @@ -package backtype.storm.contrib.jms; - -import java.io.Serializable; - -import javax.jms.JMSException; -import javax.jms.Message; - -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Values; - -/** - * Interface to define classes that can produce a Storm Values objects - * from a javax.jms.Message object>. - *

- * Implementations are also responsible for declaring the output - * fields they produce. - *

- * If for some reason the implementation can't process a message - * (for example if it received a javax.jms.ObjectMessage - * when it was expecting a javax.jms.TextMessage it should - * return null to indicate to the JmsSpout that - * the message could not be processed. - * - * @author P. Taylor Goetz - * - */ -public interface JmsTupleProducer extends Serializable{ - /** - * Process a JMS message object to create a Values object. - * @param msg - the JMS message - * @return the Values tuple, or null if the message couldn't be processed. - * @throws JMSException - */ - Values toTuple(Message msg) throws JMSException; - - /** - * Declare the output fields produced by this JmsTupleProducer. - * @param declarer The OuputFieldsDeclarer for the spout. - */ - void declareOutputFields(OutputFieldsDeclarer declarer); -} diff --git a/storm-jms/src/main/java/backtype/storm/contrib/jms/bolt/JmsBolt.java b/storm-jms/src/main/java/backtype/storm/contrib/jms/bolt/JmsBolt.java deleted file mode 100644 index 487aa02..0000000 --- a/storm-jms/src/main/java/backtype/storm/contrib/jms/bolt/JmsBolt.java +++ /dev/null @@ -1,197 +0,0 @@ -package backtype.storm.contrib.jms.bolt; - -import java.util.Map; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import backtype.storm.contrib.jms.JmsMessageProducer; -import backtype.storm.contrib.jms.JmsProvider; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IRichBolt; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Tuple; - -/** - * A JmsBolt receives backtype.storm.tuple.Tuple objects from a Storm - * topology and publishes JMS Messages to a destination (topic or queue). - *

- * To use a JmsBolt in a topology, the following must be supplied: - *

    - *
  1. A JmsProvider implementation.
  2. - *
  3. A JmsMessageProducer implementation.
  4. - *
- * The JmsProvider provides the JMS javax.jms.ConnectionFactory - * and javax.jms.Destination objects requied to publish JMS messages. - *

- * The JmsBolt uses a JmsMessageProducer to translate - * backtype.storm.tuple.Tuple objects into - * javax.jms.Message objects for publishing. - *

- * Both JmsProvider and JmsMessageProducer must be set, or the bolt will - * fail upon deployment to a cluster. - *

- * The JmsBolt is typically an endpoint in a topology -- in other words - * it does not emit any tuples. - * - * - * @author P. Taylor Goetz - * - */ -public class JmsBolt implements IRichBolt { - private static Logger LOG = LoggerFactory.getLogger(JmsBolt.class); - - private boolean autoAck = true; - - // javax.jms objects - private Connection connection; - private Session session; - private MessageProducer messageProducer; - - // JMS options - private boolean jmsTransactional = false; - private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE; - - - private JmsProvider jmsProvider; - private JmsMessageProducer producer; - - - private OutputCollector collector; - - /** - * Set the JmsProvider used to connect to the JMS destination topic/queue - * @param provider - */ - public void setJmsProvider(JmsProvider provider){ - this.jmsProvider = provider; - } - - /** - * Set the JmsMessageProducer used to convert tuples - * into JMS messages. - * - * @param producer - */ - public void setJmsMessageProducer(JmsMessageProducer producer){ - this.producer = producer; - } - - /** - * Sets the JMS acknowledgement mode for JMS messages sent - * by this bolt. - *

- * Possible values: - *

    - *
  • javax.jms.Session.AUTO_ACKNOWLEDGE
  • - *
  • javax.jms.Session.CLIENT_ACKNOWLEDGE
  • - *
  • javax.jms.Session.DUPS_OK_ACKNOWLEDGE
  • - *
- * @param acknowledgeMode (constant defined in javax.jms.Session) - */ - public void setJmsAcknowledgeMode(int acknowledgeMode){ - this.jmsAcknowledgeMode = acknowledgeMode; - } - - /** - * Set the JMS transactional setting for the JMS session. - * - * @param transactional - */ -// public void setJmsTransactional(boolean transactional){ -// this.jmsTransactional = transactional; -// } - - /** - * Sets whether or not tuples should be acknowledged by this - * bolt. - *

- * @param autoAck - */ - public void setAutoAck(boolean autoAck){ - this.autoAck = autoAck; - } - - - /** - * Consumes a tuple and sends a JMS message. - *

- * If autoAck is true, the tuple will be acknowledged - * after the message is sent. - *

- * If JMS sending fails, the tuple will be failed. - */ - @Override - public void execute(Tuple input) { - // write the tuple to a JMS destination... - LOG.debug("Tuple received. Sending JMS message."); - - try { - Message msg = this.producer.toMessage(this.session, input); - if(msg != null){ - this.messageProducer.send(msg); - } - if(this.autoAck){ - LOG.debug("ACKing tuple: " + input); - this.collector.ack(input); - } - } catch (JMSException e) { - // failed to send the JMS message, fail the tuple fast - LOG.warn("Failing tuple: " + input); - LOG.warn("Exception: ", e); - this.collector.fail(input); - } - } - - /** - * Releases JMS resources. - */ - @Override - public void cleanup() { - try { - LOG.debug("Closing JMS connection."); - this.session.close(); - this.connection.close(); - } catch (JMSException e) { - LOG.warn("Error closing JMS connection.", e); - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - } - - /** - * Initializes JMS resources. - */ - @Override - public void prepare(Map stormConf, TopologyContext context, - OutputCollector collector) { - if(this.jmsProvider == null || this.producer == null){ - throw new IllegalStateException("JMS Provider and MessageProducer not set."); - } - this.collector = collector; - LOG.debug("Connecting JMS.."); - try { - ConnectionFactory cf = this.jmsProvider.connectionFactory(); - Destination dest = this.jmsProvider.destination(); - this.connection = cf.createConnection(); - this.session = connection.createSession(this.jmsTransactional, - this.jmsAcknowledgeMode); - this.messageProducer = session.createProducer(dest); - - connection.start(); - } catch (Exception e) { - LOG.warn("Error creating JMS connection.", e); - } - } -} diff --git a/storm-jms/src/main/java/backtype/storm/contrib/jms/spout/JmsSpout.java b/storm-jms/src/main/java/backtype/storm/contrib/jms/spout/JmsSpout.java deleted file mode 100644 index 75500d5..0000000 --- a/storm-jms/src/main/java/backtype/storm/contrib/jms/spout/JmsSpout.java +++ /dev/null @@ -1,312 +0,0 @@ -package backtype.storm.contrib.jms.spout; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.Session; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import backtype.storm.contrib.jms.JmsProvider; -import backtype.storm.contrib.jms.JmsTupleProducer; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IRichSpout; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Values; -import backtype.storm.utils.Utils; - -/** - * A Storm Spout implementation that listens to a JMS topic or queue - * and outputs tuples based on the messages it receives. - *

- * JmsSpout instances rely on JmsProducer implementations - * to obtain the JMS ConnectionFactory and Destination objects - * necessary to connect to a JMS topic/queue. - *

- * When a JmsSpout receives a JMS message, it delegates to an - * internal JmsTupleProducer instance to create a Storm tuple from the - * incoming message. - *

- * Typically, developers will supply a custom JmsTupleProducer implementation - * appropriate for the expected message content. - * - * @author P. Taylor Goetz - * - */ -@SuppressWarnings("serial") -public class JmsSpout implements IRichSpout, MessageListener { - private static final Logger LOG = LoggerFactory.getLogger(JmsSpout.class); - - // JMS options - private boolean jmsTransactional = false; - private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE; - - private boolean distributed = true; - - private JmsTupleProducer tupleProducer; - - private JmsProvider jmsProvider; - - private LinkedBlockingQueue queue; - private ConcurrentHashMap pendingMessages; - - private SpoutOutputCollector collector; - - private transient Connection connection; - private transient Session session; - - /** - * Sets the JMS Session acknowledgement mode for the JMS seesion associated with this spout. - *

- * Possible values: - *

    - *
  • javax.jms.Session.AUTO_ACKNOWLEDGE
  • - *
  • javax.jms.Session.CLIENT_ACKNOWLEDGE
  • - *
  • javax.jms.Session.DUPS_OK_ACKNOWLEDGE
  • - *
- * @param mode JMS Session Acknowledgement mode - * @throws IllegalArgumentException if the mode is not recognized. - */ - public void setJmsAcknowledgeMode(int mode){ - switch (mode) { - case Session.AUTO_ACKNOWLEDGE: - case Session.CLIENT_ACKNOWLEDGE: - case Session.DUPS_OK_ACKNOWLEDGE: - break; - default: - throw new IllegalArgumentException("Unknown Acknowledge mode: " + mode + " (See javax.jms.Session for valid values)"); - - } - this.jmsAcknowledgeMode = mode; - } - - /** - * Returns the JMS Session acknowledgement mode for the JMS seesion associated with this spout. - * @return - */ - public int getJmsAcknowledgeMode(){ - return this.jmsAcknowledgeMode; - } - - /** - * Set whether this Spout uses the JMS transactional model by defualt. - *

- * If true the spout will always request acks from downstream - * bolts, using the incoming JMS message ID as the Storm message ID. - *

- * If false the spout will request acks from downstream bolts - * only if the spout's JmsAcknowledgeMode is not AUTO_ACKNOWLEDGE - * and the JMS message DeliveryMode is not AUTO_ACKNOWLEDGE. - *

- * If the spout determines that a JMS message should be handled transactionally - * (i.e. acknowledged in JMS terms), it will be JMS-acknowledged in the spout's - * ack() method. - *

- * Otherwise, if a downstream spout that has anchored on one of this spouts tuples - * fails to acknowledge an emitted tuple, the JMS message will not be not be acknowledged, - * and potentially be set for retransmission, depending on the underlying JMS implementation - * and configuration. - * - * @param transactional - */ - public void setJmsTransactional(boolean transactional){ - this.jmsTransactional = transactional; - } - public boolean isJmsTransaction(){ - return this.jmsTransactional; - } - /** - * Set the backtype.storm.contrib.jms.JmsProvider - * implementation that this Spout will use to connect to - * a JMS javax.jms.Desination - * - * @param provider - */ - public void setJmsProvider(JmsProvider provider){ - this.jmsProvider = provider; - } - /** - * Set the backtype.storm.contrib.jms.JmsTupleProducer - * implementation that will convert javax.jms.Message - * object to backtype.storm.tuple.Values objects - * to be emitted. - * - * @param producer - */ - public void setJmsTupleProducer(JmsTupleProducer producer){ - this.tupleProducer = producer; - } - - /** - * javax.jms.MessageListener implementation. - *

- * Stored the JMS message in an internal queue for processing - * by the nextTuple() method. - */ - public void onMessage(Message msg) { - this.queue.offer(msg); - - } - - /** - * ISpout implementation. - *

- * Connects the JMS spout to the configured JMS destination - * topic/queue. - * - */ - @SuppressWarnings("rawtypes") - public void open(Map conf, TopologyContext context, - SpoutOutputCollector collector) { - if(this.jmsProvider == null){ - throw new IllegalStateException("JMS provider has not been set."); - } - if(this.tupleProducer == null){ - throw new IllegalStateException("JMS Tuple Producer has not been set."); - } - queue = new LinkedBlockingQueue(); - this.pendingMessages = new ConcurrentHashMap(); - this.collector = collector; - try { - ConnectionFactory cf = this.jmsProvider.connectionFactory(); - Destination dest = this.jmsProvider.destination(); - this.connection = cf.createConnection(); - this.session = connection.createSession(this.jmsTransactional, - this.jmsAcknowledgeMode); - MessageConsumer consumer = session.createConsumer(dest); - consumer.setMessageListener(this); - connection.start(); - - } catch (Exception e) { - LOG.warn("Error creating JMS connection.", e); - } - - } - - public void close() { - try { - LOG.debug("Closing JMS connection."); - this.session.close(); - this.connection.close(); - } catch (JMSException e) { - LOG.warn("Error closing JMS connection.", e); - } - - } - - public void nextTuple() { - Message msg = this.queue.poll(); - if (msg == null) { - Utils.sleep(50); - } else { - - LOG.debug("sending tuple: " + msg); - // get the tuple from the handler - try { - Values vals = this.tupleProducer.toTuple(msg); - // if we're transactional, always ack, otherwise - // ack if we're not in AUTO_ACKNOWLEDGE mode, or the message requests ACKNOWLEDGE - LOG.debug("Requested deliveryMode: " + toDeliveryModeString(msg.getJMSDeliveryMode())); - LOG.debug("Our deliveryMode: " + toDeliveryModeString(this.jmsAcknowledgeMode)); - if (this.jmsTransactional - || (this.jmsAcknowledgeMode != Session.AUTO_ACKNOWLEDGE) - || (msg.getJMSDeliveryMode() != Session.AUTO_ACKNOWLEDGE)) { - LOG.debug("Requesting acks."); - this.collector.emit(vals, msg.getJMSMessageID()); - - // at this point we successfully emitted. Store - // the message and message ID so we can do a - // JMS acknowledge later - this.pendingMessages.put(msg.getJMSMessageID(), msg); - } else { - this.collector.emit(vals); - } - } catch (JMSException e) { - LOG.warn("Unable to convert JMS message: " + msg); - } - - } - - } - - /* - * Will only be called if we're transactional or not AUTO_ACKNOWLEDGE - */ - public void ack(Object msgId) { - - Message msg = this.pendingMessages.remove(msgId); - if (msg != null) { - try { - msg.acknowledge(); - LOG.debug("JMS Message acked: " + msgId); - } catch (JMSException e) { - LOG.warn("Error acknowldging JMS message: " + msgId, e); - } - } else { - LOG.warn("Couldn't acknowledge unknown JMS message ID: " + msgId); - } - - } - - /* - * Will only be called if we're transactional or not AUTO_ACKNOWLEDGE - */ - public void fail(Object msgId) { - LOG.debug("Message failed: " + msgId); - this.pendingMessages.remove(msgId); - - } - - public void declareOutputFields(OutputFieldsDeclarer declarer) { - this.tupleProducer.declareOutputFields(declarer); - - } - - public boolean isDistributed() { - return this.distributed; - } - - /** - * Sets the "distributed" mode of this spout. - *

- * If true multiple instances of this spout may be - * created across the cluster (depending on the "parallelism_hint" in the topology configuration). - *

- * Setting this value to false essentially means this spout will run as a singleton - * within the cluster ("parallelism_hint" will be ignored). - *

- * In general, this should be set to false if the underlying JMS destination is a - * topic, and true if it is a JMS queue. - * - * @param distributed - */ - public void setDistributed(boolean distributed){ - this.distributed = distributed; - } - - - private static final String toDeliveryModeString(int deliveryMode) { - switch (deliveryMode) { - case Session.AUTO_ACKNOWLEDGE: - return "AUTO_ACKNOWLEDGE"; - case Session.CLIENT_ACKNOWLEDGE: - return "CLIENT_ACKNOWLEDGE"; - case Session.DUPS_OK_ACKNOWLEDGE: - return "DUPS_OK_ACKNOWLEDGE"; - default: - return "UNKNOWN"; - - } - } - -} diff --git a/storm-signals b/storm-signals new file mode 160000 index 0000000..c639a03 --- /dev/null +++ b/storm-signals @@ -0,0 +1 @@ +Subproject commit c639a034ee28c930f46b8a7e44208e7c5ae36b72 diff --git a/storm-signals/README.md b/storm-signals/README.md deleted file mode 100644 index ab58fc7..0000000 --- a/storm-signals/README.md +++ /dev/null @@ -1,101 +0,0 @@ -## Storm-Signals -Storm-Signals aims to provide a way to send messages ("signals") to components (spouts/bolts) in a storm topology that are otherwise not addressable. - -Storm topologies can be considered static in that modifications to a topology's behavior require redeployment. Storm-Signals provides a simple way to modify a topology's behavior at runtime, without redeployment. - - -### Project Location -Primary development of storm-signals will take place at: -https://github.com/ptgoetz/storm-signals - -Point/stable (non-SNAPSHOT) release souce code will be pushed to: -https://github.com/nathanmarz/storm-contrib - -Maven artifacts for releases will be available on maven central. - - - - -### Use Cases -Typical storm spouts run forever (until undeployed), emitting tuples based on an underlying, presumably event-driven, data source/stream. - -Some storm users have expressed an interest in having more control over that pattern, for instance in situations where the data stream is not open-ended, or where the situation requires that data streams be controllable (i.e. the ability to start/stop/pause/resume processing). - -Storm-Signals provides a very simple mechanism for communicating with spouts deployed within a storm topology. The communication mechanism resides outside of storm's basic stream processing paradigm (i.e. calls to `nextTuple()` and the tuple ack/fail mechanism). - -Signals (messages) - -#### Sample Use Cases - -* Ability to start/stop/pause/resume a spout from a process _external to_ the storm topology. -* Ability to change the source of a spout's stream without redeploying the topology. -* Initiating processing of a set/batch of data based on a schedule (such as a Quartz or cron job) -* Periodically sending a dynamic SQL query to a spout that emits tuples for processing. -* Any other use case you can think of. :) - -## Usage - -### Spout Implementation -Currently (Version 0.1.0) provides a basic abstract `BaseRichSpout` implementation that must be subclassed: - -`backtype.storm.contrib.signals.spout.BaseSignalSpout` - -Subclasses _must_ override the `onSignal()` method: - - protected abstract void onSignal(byte[] data); - -This method is called when a signal is sent to a spout. The signal payload is a `byte[]` that can contain anything (string, data, seriliazed object(s), etc.). - -Subclasses _must_ override the superclass constructor: - - public TestSignalSpout(String name) { - super(name); - } - -The `name` parameter provides a unique ID for the spout that allows `SingalClient`s to address the bolt and send it messages. - -Subclasses _must_ call `super.open()` if they override the `open()` method: - - @Override - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { - super.open(conf, context, collector); - } - -Failure to do so will prevent the spout from receiving signals (i.e. `onSignal()` will never be called). - -### Signal Client - -The `SignalClient` constructor requires two arguments: - -1. a zookeeper connect string ("host1:port1,host2:port2,hostN:portN") that should match the storm zookeeper configuration -2. a name string (this should match the name used to construct the `BaseSignalSpout` subclass) - -Example: - - public static void main(String[] args) throws Exception { - SignalClient sc = new SignalClient("localhost:2181", "test-signal-spout"); - sc.start(); - try { - sc.send("Hello Signal Spout!".getBytes()); - } finally { - sc.close(); - } - } - - -## Maven Usage - -### Maven Dependency - -Point (non-SNAPSHOT) releases will be available on maven central. - - - com.github.ptgoetz - storm-signals - 0.1.0 - - - - - - diff --git a/storm-signals/pom.xml b/storm-signals/pom.xml deleted file mode 100644 index 0f2a9ac..0000000 --- a/storm-signals/pom.xml +++ /dev/null @@ -1,60 +0,0 @@ - - - - org.sonatype.oss - oss-parent - 7 - - - 4.0.0 - com.github.ptgoetz - storm-signals - 0.1.0 - Storm Signals - Storm Signals - - - - Eclipse Public License - v 1.0 - http://www.eclipse.org/legal/epl-v10.html - repo - - - - - scm:git:git@github.com:ptgoetz/storm-signals.git - scm:git:git@github.com:ptgoetz/storm-signals.git - :git@github.com:ptgoetz/storm-signals.git - - - - ptgoetz - P. Taylor Goetz - ptgoetz@gmail.com - - - - 0.7.0 - - - - storm - storm - ${storm.version} - - provided - - - - - - org.apache.maven.plugins - maven-compiler-plugin - - 1.6 - 1.6 - - - - - \ No newline at end of file diff --git a/storm-signals/src/main/java/backtype/storm/contrib/signals/client/SignalClient.java b/storm-signals/src/main/java/backtype/storm/contrib/signals/client/SignalClient.java deleted file mode 100644 index 1a167fe..0000000 --- a/storm-signals/src/main/java/backtype/storm/contrib/signals/client/SignalClient.java +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright (c) P. Taylor Goetz (ptgoetz@gmail.com) - -package backtype.storm.contrib.signals.client; - -import java.io.IOException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.netflix.curator.framework.CuratorFramework; -import com.netflix.curator.framework.CuratorFrameworkFactory; -import com.netflix.curator.retry.RetryOneTime; - -public class SignalClient { - - private static final Logger LOG = LoggerFactory.getLogger(SignalClient.class); - - private CuratorFramework client = null; - private String name; - - public SignalClient(String zkConnectString, String name) { - this.name = name; - try { - this.client = CuratorFrameworkFactory.builder().namespace("storm-signals").connectString(zkConnectString) - .retryPolicy(new RetryOneTime(500)).build(); - } catch (IOException e) { - LOG.error("Error creating zookeeper client.", e); - } - LOG.debug("created Curator client"); - } - - public void start() { - this.client.start(); - } - - public void close() { - this.client.close(); - } - - public void send(byte[] signal) throws Exception { - this.client.setData().forPath(this.name, signal); - } - - /** - * @param args - */ - public static void main(String[] args) throws Exception { - SignalClient sc = new SignalClient("localhost:2181", "test-signal-spout1"); - sc.start(); - try { - sc.send("Hello Signal Spout!".getBytes()); - } finally { - sc.close(); - } - } - -} diff --git a/storm-signals/src/main/java/backtype/storm/contrib/signals/spout/BaseSignalSpout.java b/storm-signals/src/main/java/backtype/storm/contrib/signals/spout/BaseSignalSpout.java deleted file mode 100644 index fa94cc6..0000000 --- a/storm-signals/src/main/java/backtype/storm/contrib/signals/spout/BaseSignalSpout.java +++ /dev/null @@ -1,120 +0,0 @@ -// Copyright (c) P. Taylor Goetz (ptgoetz@gmail.com) - -package backtype.storm.contrib.signals.spout; - -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.data.Stat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.base.BaseRichSpout; - -import com.netflix.curator.framework.CuratorFramework; -import com.netflix.curator.framework.CuratorFrameworkFactory; -import com.netflix.curator.retry.RetryNTimes; - -@SuppressWarnings("serial") -public abstract class BaseSignalSpout extends BaseRichSpout implements Watcher { - - private static final Logger LOG = LoggerFactory.getLogger(BaseSignalSpout.class); - private static final String namespace = "storm-signals"; - private String name; - private CuratorFramework client; - - public BaseSignalSpout(String name) { - this.name = name; - } - - @SuppressWarnings("rawtypes") - @Override - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { - try { - initZookeeper(conf); - } catch (Exception e) { - LOG.error("Error creating zookeeper client.", e); - } - } - - @SuppressWarnings("rawtypes") - private void initZookeeper(Map conf) throws Exception { - String connectString = zkHosts(conf); - int retryCount = (Integer) conf.get("storm.zookeeper.retry.times"); - int retryInterval = (Integer) conf.get("storm.zookeeper.retry.interval"); - - this.client = CuratorFrameworkFactory.builder().namespace(namespace).connectString(connectString) - .retryPolicy(new RetryNTimes(retryCount, retryInterval)).build(); - this.client.start(); - - // create base path if necessary - Stat stat = this.client.checkExists().usingWatcher(this).forPath(this.name); - if (stat == null) { - String path = this.client.create().creatingParentsIfNeeded().forPath(this.name); - LOG.info("Created: " + path); - } - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - private String zkHosts(Map conf) { - int zkPort = (Integer) conf.get("storm.zookeeper.port"); - List zkServers = (List) conf.get("storm.zookeeper.servers"); - - Iterator it = zkServers.iterator(); - StringBuffer sb = new StringBuffer(); - while (it.hasNext()) { - sb.append(it.next()); - sb.append(":"); - sb.append(zkPort); - if (it.hasNext()) { - sb.append(","); - } - } - return sb.toString(); - } - - /** - * Releases the zookeeper connection. - */ - @Override - public void close() { - super.close(); - this.client.close(); - } - - @Override - public void process(WatchedEvent we) { - try { - this.client.checkExists().usingWatcher(this).forPath(this.name); - LOG.debug("Renewed watch for path %s", this.name); - } catch (Exception ex) { - LOG.error("Error renewing watch.", ex); - } - - switch (we.getType()) { - case NodeCreated: - LOG.debug("Node created."); - break; - case NodeDataChanged: - LOG.debug("Received signal."); - try { - this.onSignal(this.client.getData().forPath(we.getPath())); - } catch (Exception e) { - LOG.warn("Unable to process signal.", e); - } - break; - case NodeDeleted: - LOG.debug("NodeDeleted"); - break; - } - - } - - protected abstract void onSignal(byte[] data); - -} diff --git a/storm-signals/src/main/java/backtype/storm/contrib/signals/test/SignalTopology.java b/storm-signals/src/main/java/backtype/storm/contrib/signals/test/SignalTopology.java deleted file mode 100644 index 461741c..0000000 --- a/storm-signals/src/main/java/backtype/storm/contrib/signals/test/SignalTopology.java +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright (c) P. Taylor Goetz (ptgoetz@gmail.com) - -package backtype.storm.contrib.signals.test; - -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.utils.Utils; - -public class SignalTopology { - - /** - * @param args - */ - public static void main(String[] args) { - // TODO Auto-generated method stub - TopologyBuilder builder = new TopologyBuilder(); - - builder.setSpout("signal-spout", new TestSignalSpout("test-signal-spout")); - - Config conf = new Config(); - conf.setDebug(true); - - LocalCluster cluster = new LocalCluster(); - cluster.submitTopology("test", conf, builder.createTopology()); - Utils.sleep(120000); - cluster.killTopology("test"); - cluster.shutdown(); - } - -} diff --git a/storm-signals/src/main/java/backtype/storm/contrib/signals/test/TestSignalSpout.java b/storm-signals/src/main/java/backtype/storm/contrib/signals/test/TestSignalSpout.java deleted file mode 100644 index cc18041..0000000 --- a/storm-signals/src/main/java/backtype/storm/contrib/signals/test/TestSignalSpout.java +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright (c) P. Taylor Goetz (ptgoetz@gmail.com) - -package backtype.storm.contrib.signals.test; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import backtype.storm.contrib.signals.spout.BaseSignalSpout; -import backtype.storm.topology.OutputFieldsDeclarer; - -public class TestSignalSpout extends BaseSignalSpout { - - - private static final Logger LOG = LoggerFactory.getLogger(TestSignalSpout.class); - - public TestSignalSpout(String name) { - super(name); - } - - @Override - public void onSignal(byte[] data) { - LOG.info("Received signal: " + new String(data)); - - } - - @Override - public void nextTuple() { - // TODO Auto-generated method stub - - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - // TODO Auto-generated method stub - - } - -}