result = template
- .queryColumns(rowKey);
- for(String val : result.getColumnNames()){
- if (this.isDrpc) {
- collector.emit(new Values(input.getValue(0), rowKey, val));
- }
- else {
- collector.emit(new Values(rowKey, val));
- }
- }
- }
-
- @Override
- public void cleanup() {
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- if (this.isDrpc) {
- declarer.declare(new Fields("id", this.emitIdFieldName,
- this.emitValueFieldName));
- }
- else {
- declarer.declare(new Fields(this.emitIdFieldName,
- this.emitValueFieldName));
- }
-
- }
-
-}
diff --git a/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/determinable/ColumnFamilyDeterminable.java b/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/determinable/ColumnFamilyDeterminable.java
deleted file mode 100644
index d4c6442..0000000
--- a/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/determinable/ColumnFamilyDeterminable.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package backtype.storm.contrib.cassandra.bolt.determinable;
-
-import java.io.Serializable;
-
-import backtype.storm.tuple.Tuple;
-
-public interface ColumnFamilyDeterminable extends Serializable{
-
- /**
- * Given a backtype.storm.tuple.Tuple
object,
- * determine the column family to write to.
- *
- * @param tuple
- * @return
- */
- public String determineColumnFamily(Tuple tuple);
-}
diff --git a/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/determinable/DefaultColumnFamilyDeterminable.java b/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/determinable/DefaultColumnFamilyDeterminable.java
deleted file mode 100644
index ff3ec4a..0000000
--- a/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/determinable/DefaultColumnFamilyDeterminable.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package backtype.storm.contrib.cassandra.bolt.determinable;
-
-import backtype.storm.tuple.Tuple;
-
-public class DefaultColumnFamilyDeterminable implements
- ColumnFamilyDeterminable {
-
- private String columnFamily;
-
- public DefaultColumnFamilyDeterminable(String columnFamily){
- this.columnFamily = columnFamily;
- }
-
- @Override
- public String determineColumnFamily(Tuple tuple) {
- return this.columnFamily;
- }
-
-}
diff --git a/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/determinable/DefaultRowKeyDeterminable.java b/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/determinable/DefaultRowKeyDeterminable.java
deleted file mode 100644
index 18eafd6..0000000
--- a/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/determinable/DefaultRowKeyDeterminable.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package backtype.storm.contrib.cassandra.bolt.determinable;
-
-import backtype.storm.tuple.Tuple;
-
-public class DefaultRowKeyDeterminable implements RowKeyDeterminable {
- private String keyField;
-
- public DefaultRowKeyDeterminable(String keyField){
- this.keyField = keyField;
- }
-
- @Override
- public Object determineRowKey(Tuple tuple) {
- return tuple.getValueByField(this.keyField);
- }
-
-}
diff --git a/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/determinable/RowKeyDeterminable.java b/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/determinable/RowKeyDeterminable.java
deleted file mode 100644
index 09ab3eb..0000000
--- a/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/determinable/RowKeyDeterminable.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package backtype.storm.contrib.cassandra.bolt.determinable;
-
-import java.io.Serializable;
-
-import backtype.storm.tuple.Tuple;
-
-public interface RowKeyDeterminable extends Serializable {
- /**
- * Given a backtype.storm.tuple.Tuple
generate a Cassandra
- * row key.
- *
- * @param tuple
- * @return
- */
- Object determineRowKey(Tuple tuple);
-}
diff --git a/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/serialize/CassandraSerializer.java b/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/serialize/CassandraSerializer.java
deleted file mode 100644
index d2db399..0000000
--- a/storm-cassandra/src/main/java/backtype/storm/contrib/cassandra/bolt/serialize/CassandraSerializer.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package backtype.storm.contrib.cassandra.bolt.serialize;
-
-import backtype.storm.tuple.Tuple;
-
-public interface CassandraSerializer {
- public void writeToCassandra(Tuple tuple);
-}
diff --git a/storm-cassandra/src/main/resources/log4j.properties b/storm-cassandra/src/main/resources/log4j.properties
deleted file mode 100644
index 4fe3dac..0000000
--- a/storm-cassandra/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,16 +0,0 @@
-log4j.rootLogger=INFO, stdout
-
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-
-# Pattern to output the caller's file name and line number.
-log4j.appender.stdout.layout.ConversionPattern=%5p (%C:%L) - %m%n
-
-
-
-# Print only messages of level WARN or above in the package com.foo.
-log4j.logger.backtype.storm=DEBUG
-log4j.logger.clojure.contrib=WARN
-log4j.logger.org.springframework=WARN
-log4j.logger.org.apache.zookeeper=WARN
-
diff --git a/storm-jms b/storm-jms
new file mode 160000
index 0000000..dd4e5e6
--- /dev/null
+++ b/storm-jms
@@ -0,0 +1 @@
+Subproject commit dd4e5e694b814a638d4c510abf5cb50952030cd5
diff --git a/storm-jms/LICENSE.html b/storm-jms/LICENSE.html
deleted file mode 100644
index fd39122..0000000
--- a/storm-jms/LICENSE.html
+++ /dev/null
@@ -1,261 +0,0 @@
-
-
-
-
-
-
-Eclipse Public License - Version 1.0
-
-
-
-
-
-
-Eclipse Public License - v 1.0
-
-THE ACCOMPANYING PROGRAM IS PROVIDED UNDER THE TERMS OF THIS ECLIPSE
-PUBLIC LICENSE ("AGREEMENT"). ANY USE, REPRODUCTION OR
-DISTRIBUTION OF THE PROGRAM CONSTITUTES RECIPIENT'S ACCEPTANCE OF THIS
-AGREEMENT.
-
-1. DEFINITIONS
-
-"Contribution" means:
-
-a) in the case of the initial Contributor, the initial
-code and documentation distributed under this Agreement, and
-b) in the case of each subsequent Contributor:
-i) changes to the Program, and
-ii) additions to the Program;
-where such changes and/or additions to the Program
-originate from and are distributed by that particular Contributor. A
-Contribution 'originates' from a Contributor if it was added to the
-Program by such Contributor itself or anyone acting on such
-Contributor's behalf. Contributions do not include additions to the
-Program which: (i) are separate modules of software distributed in
-conjunction with the Program under their own license agreement, and (ii)
-are not derivative works of the Program.
-
-"Contributor" means any person or entity that distributes
-the Program.
-
-"Licensed Patents" mean patent claims licensable by a
-Contributor which are necessarily infringed by the use or sale of its
-Contribution alone or when combined with the Program.
-
-"Program" means the Contributions distributed in accordance
-with this Agreement.
-
-"Recipient" means anyone who receives the Program under
-this Agreement, including all Contributors.
-
-2. GRANT OF RIGHTS
-
-a) Subject to the terms of this Agreement, each
-Contributor hereby grants Recipient a non-exclusive, worldwide,
-royalty-free copyright license to reproduce, prepare derivative works
-of, publicly display, publicly perform, distribute and sublicense the
-Contribution of such Contributor, if any, and such derivative works, in
-source code and object code form.
-
-b) Subject to the terms of this Agreement, each
-Contributor hereby grants Recipient a non-exclusive, worldwide,
-royalty-free patent license under Licensed Patents to make, use, sell,
-offer to sell, import and otherwise transfer the Contribution of such
-Contributor, if any, in source code and object code form. This patent
-license shall apply to the combination of the Contribution and the
-Program if, at the time the Contribution is added by the Contributor,
-such addition of the Contribution causes such combination to be covered
-by the Licensed Patents. The patent license shall not apply to any other
-combinations which include the Contribution. No hardware per se is
-licensed hereunder.
-
-c) Recipient understands that although each Contributor
-grants the licenses to its Contributions set forth herein, no assurances
-are provided by any Contributor that the Program does not infringe the
-patent or other intellectual property rights of any other entity. Each
-Contributor disclaims any liability to Recipient for claims brought by
-any other entity based on infringement of intellectual property rights
-or otherwise. As a condition to exercising the rights and licenses
-granted hereunder, each Recipient hereby assumes sole responsibility to
-secure any other intellectual property rights needed, if any. For
-example, if a third party patent license is required to allow Recipient
-to distribute the Program, it is Recipient's responsibility to acquire
-that license before distributing the Program.
-
-d) Each Contributor represents that to its knowledge it
-has sufficient copyright rights in its Contribution, if any, to grant
-the copyright license set forth in this Agreement.
-
-3. REQUIREMENTS
-
-A Contributor may choose to distribute the Program in object code
-form under its own license agreement, provided that:
-
-a) it complies with the terms and conditions of this
-Agreement; and
-
-b) its license agreement:
-
-i) effectively disclaims on behalf of all Contributors
-all warranties and conditions, express and implied, including warranties
-or conditions of title and non-infringement, and implied warranties or
-conditions of merchantability and fitness for a particular purpose;
-
-ii) effectively excludes on behalf of all Contributors
-all liability for damages, including direct, indirect, special,
-incidental and consequential damages, such as lost profits;
-
-iii) states that any provisions which differ from this
-Agreement are offered by that Contributor alone and not by any other
-party; and
-
-iv) states that source code for the Program is available
-from such Contributor, and informs licensees how to obtain it in a
-reasonable manner on or through a medium customarily used for software
-exchange.
-
-When the Program is made available in source code form:
-
-a) it must be made available under this Agreement; and
-
-b) a copy of this Agreement must be included with each
-copy of the Program.
-
-Contributors may not remove or alter any copyright notices contained
-within the Program.
-
-Each Contributor must identify itself as the originator of its
-Contribution, if any, in a manner that reasonably allows subsequent
-Recipients to identify the originator of the Contribution.
-
-4. COMMERCIAL DISTRIBUTION
-
-Commercial distributors of software may accept certain
-responsibilities with respect to end users, business partners and the
-like. While this license is intended to facilitate the commercial use of
-the Program, the Contributor who includes the Program in a commercial
-product offering should do so in a manner which does not create
-potential liability for other Contributors. Therefore, if a Contributor
-includes the Program in a commercial product offering, such Contributor
-("Commercial Contributor") hereby agrees to defend and
-indemnify every other Contributor ("Indemnified Contributor")
-against any losses, damages and costs (collectively "Losses")
-arising from claims, lawsuits and other legal actions brought by a third
-party against the Indemnified Contributor to the extent caused by the
-acts or omissions of such Commercial Contributor in connection with its
-distribution of the Program in a commercial product offering. The
-obligations in this section do not apply to any claims or Losses
-relating to any actual or alleged intellectual property infringement. In
-order to qualify, an Indemnified Contributor must: a) promptly notify
-the Commercial Contributor in writing of such claim, and b) allow the
-Commercial Contributor to control, and cooperate with the Commercial
-Contributor in, the defense and any related settlement negotiations. The
-Indemnified Contributor may participate in any such claim at its own
-expense.
-
-For example, a Contributor might include the Program in a commercial
-product offering, Product X. That Contributor is then a Commercial
-Contributor. If that Commercial Contributor then makes performance
-claims, or offers warranties related to Product X, those performance
-claims and warranties are such Commercial Contributor's responsibility
-alone. Under this section, the Commercial Contributor would have to
-defend claims against the other Contributors related to those
-performance claims and warranties, and if a court requires any other
-Contributor to pay any damages as a result, the Commercial Contributor
-must pay those damages.
-
-5. NO WARRANTY
-
-EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, THE PROGRAM IS
-PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
-OF ANY KIND, EITHER EXPRESS OR IMPLIED INCLUDING, WITHOUT LIMITATION,
-ANY WARRANTIES OR CONDITIONS OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY
-OR FITNESS FOR A PARTICULAR PURPOSE. Each Recipient is solely
-responsible for determining the appropriateness of using and
-distributing the Program and assumes all risks associated with its
-exercise of rights under this Agreement , including but not limited to
-the risks and costs of program errors, compliance with applicable laws,
-damage to or loss of data, programs or equipment, and unavailability or
-interruption of operations.
-
-6. DISCLAIMER OF LIABILITY
-
-EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, NEITHER RECIPIENT
-NOR ANY CONTRIBUTORS SHALL HAVE ANY LIABILITY FOR ANY DIRECT, INDIRECT,
-INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING
-WITHOUT LIMITATION LOST PROFITS), HOWEVER CAUSED AND ON ANY THEORY OF
-LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
-NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OR
-DISTRIBUTION OF THE PROGRAM OR THE EXERCISE OF ANY RIGHTS GRANTED
-HEREUNDER, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGES.
-
-7. GENERAL
-
-If any provision of this Agreement is invalid or unenforceable under
-applicable law, it shall not affect the validity or enforceability of
-the remainder of the terms of this Agreement, and without further action
-by the parties hereto, such provision shall be reformed to the minimum
-extent necessary to make such provision valid and enforceable.
-
-If Recipient institutes patent litigation against any entity
-(including a cross-claim or counterclaim in a lawsuit) alleging that the
-Program itself (excluding combinations of the Program with other
-software or hardware) infringes such Recipient's patent(s), then such
-Recipient's rights granted under Section 2(b) shall terminate as of the
-date such litigation is filed.
-
-All Recipient's rights under this Agreement shall terminate if it
-fails to comply with any of the material terms or conditions of this
-Agreement and does not cure such failure in a reasonable period of time
-after becoming aware of such noncompliance. If all Recipient's rights
-under this Agreement terminate, Recipient agrees to cease use and
-distribution of the Program as soon as reasonably practicable. However,
-Recipient's obligations under this Agreement and any licenses granted by
-Recipient relating to the Program shall continue and survive.
-
-Everyone is permitted to copy and distribute copies of this
-Agreement, but in order to avoid inconsistency the Agreement is
-copyrighted and may only be modified in the following manner. The
-Agreement Steward reserves the right to publish new versions (including
-revisions) of this Agreement from time to time. No one other than the
-Agreement Steward has the right to modify this Agreement. The Eclipse
-Foundation is the initial Agreement Steward. The Eclipse Foundation may
-assign the responsibility to serve as the Agreement Steward to a
-suitable separate entity. Each new version of the Agreement will be
-given a distinguishing version number. The Program (including
-Contributions) may always be distributed subject to the version of the
-Agreement under which it was received. In addition, after a new version
-of the Agreement is published, Contributor may elect to distribute the
-Program (including its Contributions) under the new version. Except as
-expressly stated in Sections 2(a) and 2(b) above, Recipient receives no
-rights or licenses to the intellectual property of any Contributor under
-this Agreement, whether expressly, by implication, estoppel or
-otherwise. All rights in the Program not expressly granted under this
-Agreement are reserved.
-
-This Agreement is governed by the laws of the State of New York and
-the intellectual property laws of the United States of America. No party
-to this Agreement will bring a legal action under this Agreement more
-than one year after the cause of action arose. Each party waives its
-rights to a jury trial in any resulting litigation.
-
-
-
-
diff --git a/storm-jms/README.markdown b/storm-jms/README.markdown
deleted file mode 100644
index 3f091a6..0000000
--- a/storm-jms/README.markdown
+++ /dev/null
@@ -1,40 +0,0 @@
-## About Storm JMS
-Storm JMS is a generic framework for integrating JMS messaging within the Storm framework.
-
-The [Storm Rationale page](https://github.com/nathanmarz/storm/wiki/Rationale) explains what storm is and why it was built.
-
-Storm-JMS allows you to inject data into Storm via a generic JMS spout, as well as consume data from Storm via a generic JMS bolt.
-
-Both the JMS Spout and JMS Bolt are data agnostic. To use them, you provide a simple Java class that bridges the JMS and Storm APIs and encapsulates and domain-specific logic.
-
-## Components
-
-### JMS Spout
-The JMS Spout component allows for data published to a JMS topic or queue to be consumed by a Storm topology.
-
-A JMS Spout connects to a JMS Destination (topic or queue), and emits Storm "Tuple" objects based on the content of the JMS message received.
-
-
-### JMS Bolt
-The JMS Bolt component allows for data within a Storm topology to be published to a JMS destination (topic or queue).
-
-A JMS Bolt connects to a JMS Destination, and publishes JMS Messages based on the Storm "Tuple" objects it receives.
-
-
-## Documentation
-
-Documentation and tutorials can be found on the [Storm-JMS wiki](http://github.com/ptgoetz/storm-jms/wiki).
-
-
-## License
-
-The use and distribution terms for this software are covered by the
-Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
-which can be found in the file LICENSE.html at the root of this distribution.
-By using this software in any fashion, you are agreeing to be bound by
-the terms of this license.
-You must not remove this notice, or any other, from this software.
-
-## Contributors
-
-* P. Taylor Goetz ([@ptgoetz](http://twitter.com/ptgoetz))
diff --git a/storm-jms/examples/README.markdown b/storm-jms/examples/README.markdown
deleted file mode 100644
index 7846b99..0000000
--- a/storm-jms/examples/README.markdown
+++ /dev/null
@@ -1,23 +0,0 @@
-## About Storm JMS Examples
-This project contains a simple storm topology that illustrates the usage of "storm-jms".
-
-To build:
-
-`mvn clean install`
-
-The default build will create a jar file that can be deployed to to a Storm cluster in the "target" directory:
-
-`storm-jms-examples-0.1-SNAPSHOT-jar-with-dependencies.jar`
-
-## License
-
-The use and distribution terms for this software are covered by the
-Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
-which can be found in the file LICENSE.html at the root of this distribution.
-By using this software in any fashion, you are agreeing to be bound by
-the terms of this license.
-You must not remove this notice, or any other, from this software.
-
-## Contributors
-
-* P. Taylor Goetz ([@ptgoetz](http://twitter.com/ptgoetz))
diff --git a/storm-jms/examples/pom.xml b/storm-jms/examples/pom.xml
deleted file mode 100644
index 99fdb19..0000000
--- a/storm-jms/examples/pom.xml
+++ /dev/null
@@ -1,135 +0,0 @@
-
- 4.0.0
- backtype.storm.contrib
- storm-jms-examples
- 0.1-SNAPSHOT
- Storm JMS Examples
- Storm JMS Examples
-
-
- clojars.org
- http://clojars.org/repo
-
-
-
- 2.5.6
- 0.6.2
-
-
-
- org.springframework
- spring-beans
- ${spring.version}
-
-
- org.springframework
- spring-core
- ${spring.version}
-
-
- org.springframework
- spring-context
- ${spring.version}
-
-
- org.springframework
- spring-jms
- ${spring.version}
-
-
- org.apache.xbean
- xbean-spring
- 3.7
-
-
- storm
- storm
- ${storm.version}
-
- provided
-
-
- com.github.ptgoetz
- storm-jms
- 0.1.0-SNAPSHOT
-
-
- org.apache.activemq
- activemq-core
- 5.4.0
-
-
-
-
-
-
- maven-assembly-plugin
-
-
- jar-with-dependencies
-
-
-
-
-
-
-
-
-
- make-assembly
- package
-
- single
-
-
-
-
-
-
-
- org.codehaus.mojo
- exec-maven-plugin
- 1.2.1
-
-
-
- exec
-
-
-
-
- java
- true
- true
- backtype.storm.contrib.jms.example.ExampleJmsTopology
-
-
- log4j.configuration
- file:./src/main/resources/log4j.properties
-
-
-
-
-
- storm
- storm
- ${storm.version}
- jar
-
-
-
-
- org.apache.maven.plugins
- maven-compiler-plugin
-
-
- 1.6
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/storm-jms/examples/src/main/java/backtype/storm/contrib/jms/example/ExampleJmsTopology.java b/storm-jms/examples/src/main/java/backtype/storm/contrib/jms/example/ExampleJmsTopology.java
deleted file mode 100644
index 4c6ccad..0000000
--- a/storm-jms/examples/src/main/java/backtype/storm/contrib/jms/example/ExampleJmsTopology.java
+++ /dev/null
@@ -1,114 +0,0 @@
-package backtype.storm.contrib.jms.example;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.contrib.jms.JmsMessageProducer;
-import backtype.storm.contrib.jms.JmsProvider;
-import backtype.storm.contrib.jms.JmsTupleProducer;
-import backtype.storm.contrib.jms.bolt.JmsBolt;
-import backtype.storm.contrib.jms.spout.JmsSpout;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.Utils;
-
-public class ExampleJmsTopology {
- public static final String JMS_QUEUE_SPOUT = "JMS_QUEUE_SPOUT";
- public static final String INTERMEDIATE_BOLT = "INTERMEDIATE_BOLT";
- public static final String FINAL_BOLT = "FINAL_BOLT";
- public static final String JMS_TOPIC_BOLT = "JMS_TOPIC_BOLT";
- public static final String JMS_TOPIC_SPOUT = "JMS_TOPIC_SPOUT";
- public static final String ANOTHER_BOLT = "ANOTHER_BOLT";
-
- @SuppressWarnings("serial")
- public static void main(String[] args) throws Exception {
-
- // JMS Queue Provider
- JmsProvider jmsQueueProvider = new SpringJmsProvider(
- "jms-activemq.xml", "jmsConnectionFactory",
- "notificationQueue");
-
- // JMS Topic provider
- JmsProvider jmsTopicProvider = new SpringJmsProvider(
- "jms-activemq.xml", "jmsConnectionFactory",
- "notificationTopic");
-
- // JMS Producer
- JmsTupleProducer producer = new JsonTupleProducer();
-
- // JMS Queue Spout
- JmsSpout queueSpout = new JmsSpout();
- queueSpout.setJmsProvider(jmsQueueProvider);
- queueSpout.setJmsTupleProducer(producer);
- queueSpout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
- queueSpout.setDistributed(true); // allow multiple instances
-
- TopologyBuilder builder = new TopologyBuilder();
-
- // spout with 5 parallel instances
- builder.setSpout(JMS_QUEUE_SPOUT, queueSpout, 5);
-
- // intermediate bolt, subscribes to jms spout, anchors on tuples, and auto-acks
- builder.setBolt(INTERMEDIATE_BOLT,
- new GenericBolt("INTERMEDIATE_BOLT", true, true, new Fields("json")), 3).shuffleGrouping(
- JMS_QUEUE_SPOUT);
-
- // bolt that subscribes to the intermediate bolt, and auto-acks
- // messages.
- builder.setBolt(FINAL_BOLT, new GenericBolt("FINAL_BOLT", true, true), 3).shuffleGrouping(
- INTERMEDIATE_BOLT);
-
- // bolt that subscribes to the intermediate bolt, and publishes to a JMS Topic
- JmsBolt jmsBolt = new JmsBolt();
- jmsBolt.setJmsProvider(jmsTopicProvider);
-
- // anonymous message producer just calls toString() on the tuple to create a jms message
- jmsBolt.setJmsMessageProducer(new JmsMessageProducer() {
- @Override
- public Message toMessage(Session session, Tuple input) throws JMSException{
- System.out.println("Sending JMS Message:" + input.toString());
- TextMessage tm = session.createTextMessage(input.toString());
- return tm;
- }
- });
-
- builder.setBolt(JMS_TOPIC_BOLT, jmsBolt).shuffleGrouping(INTERMEDIATE_BOLT);
-
- // JMS Topic spout
- JmsSpout topicSpout = new JmsSpout();
- topicSpout.setJmsProvider(jmsTopicProvider);
- topicSpout.setJmsTupleProducer(producer);
- topicSpout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
- topicSpout.setDistributed(false);
-
- builder.setSpout(JMS_TOPIC_SPOUT, topicSpout);
-
- builder.setBolt(ANOTHER_BOLT, new GenericBolt("ANOTHER_BOLT", true, true), 1).shuffleGrouping(
- JMS_TOPIC_SPOUT);
-
- Config conf = new Config();
-
- if (args.length > 0) {
- conf.setNumWorkers(3);
-
- StormSubmitter.submitTopology(args[0], conf,
- builder.createTopology());
- } else {
-
- conf.setDebug(true);
-
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("storm-jms-example", conf, builder.createTopology());
- Utils.sleep(60000);
- cluster.killTopology("storm-jms-example");
- cluster.shutdown();
- }
- }
-
-}
diff --git a/storm-jms/examples/src/main/java/backtype/storm/contrib/jms/example/GenericBolt.java b/storm-jms/examples/src/main/java/backtype/storm/contrib/jms/example/GenericBolt.java
deleted file mode 100644
index bd0dada..0000000
--- a/storm-jms/examples/src/main/java/backtype/storm/contrib/jms/example/GenericBolt.java
+++ /dev/null
@@ -1,99 +0,0 @@
-package backtype.storm.contrib.jms.example;
-
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-/**
- * A generic backtype.storm.topology.IRichBolt
implementation
- * for testing/debugging the Storm JMS Spout and example topologies.
- *
- * For debugging purposes, set the log level of the
- * backtype.storm.contrib.jms
package to DEBUG for debugging
- * output.
- * @author tgoetz
- *
- */
-@SuppressWarnings("serial")
-public class GenericBolt implements IRichBolt {
- private static final Logger LOG = LoggerFactory.getLogger(GenericBolt.class);
- private OutputCollector collector;
- private boolean autoAck = false;
- private boolean autoAnchor = false;
- private Fields declaredFields;
- private String name;
-
- /**
- * Constructs a new GenericBolt
instance.
- *
- * @param name The name of the bolt (used in DEBUG logging)
- * @param autoAck Whether or not this bolt should automatically acknowledge received tuples.
- * @param autoAnchor Whether or not this bolt should automatically anchor to received tuples.
- * @param declaredFields The fields this bolt declares as output.
- */
- public GenericBolt(String name, boolean autoAck, boolean autoAnchor, Fields declaredFields){
- this.name = name;
- this.autoAck = autoAck;
- this.autoAnchor = autoAnchor;
- this.declaredFields = declaredFields;
- }
-
- public GenericBolt(String name, boolean autoAck, boolean autoAnchor){
- this(name, autoAck, autoAnchor, null);
- }
-
- @SuppressWarnings("rawtypes")
- public void prepare(Map stormConf, TopologyContext context,
- OutputCollector collector) {
- this.collector = collector;
-
- }
-
- public void execute(Tuple input) {
- LOG.debug("[" + this.name + "] Received message: " + input);
-
-
-
- // only emit if we have declared fields.
- if(this.declaredFields != null){
- LOG.debug("[" + this.name + "] emitting: " + input);
- if(this.autoAnchor){
- this.collector.emit(input, input.getValues());
- } else{
- this.collector.emit(input.getValues());
- }
- }
-
- if(this.autoAck){
- LOG.debug("[" + this.name + "] ACKing tuple: " + input);
- this.collector.ack(input);
- }
-
- }
-
- public void cleanup() {
-
- }
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- if(this.declaredFields != null){
- declarer.declare(this.declaredFields);
- }
- }
-
- public boolean isAutoAck(){
- return this.autoAck;
- }
-
- public void setAutoAck(boolean autoAck){
- this.autoAck = autoAck;
- }
-
-}
diff --git a/storm-jms/examples/src/main/java/backtype/storm/contrib/jms/example/JsonTupleProducer.java b/storm-jms/examples/src/main/java/backtype/storm/contrib/jms/example/JsonTupleProducer.java
deleted file mode 100644
index 35dc9f8..0000000
--- a/storm-jms/examples/src/main/java/backtype/storm/contrib/jms/example/JsonTupleProducer.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package backtype.storm.contrib.jms.example;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.TextMessage;
-
-import backtype.storm.contrib.jms.JmsTupleProducer;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-
-/**
- * A simple JmsTupleProducer
that expects to receive
- * JMS TextMessage
objects with a body in JSON format.
- *
- * Ouputs a tuple with field name "json" and a string value
- * containing the raw json.
- *
- * NOTE: Currently this implementation assumes the text is valid
- * JSON and does not attempt to parse or validate it.
- *
- * @author tgoetz
- *
- */
-@SuppressWarnings("serial")
-public class JsonTupleProducer implements JmsTupleProducer {
-
- public Values toTuple(Message msg) throws JMSException {
- if(msg instanceof TextMessage){
- String json = ((TextMessage) msg).getText();
- return new Values(json);
- } else {
- return null;
- }
- }
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("json"));
- }
-
-}
diff --git a/storm-jms/examples/src/main/java/backtype/storm/contrib/jms/example/SpringJmsProvider.java b/storm-jms/examples/src/main/java/backtype/storm/contrib/jms/example/SpringJmsProvider.java
deleted file mode 100644
index ba2dfce..0000000
--- a/storm-jms/examples/src/main/java/backtype/storm/contrib/jms/example/SpringJmsProvider.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package backtype.storm.contrib.jms.example;
-
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.support.ClassPathXmlApplicationContext;
-
-import backtype.storm.contrib.jms.JmsProvider;
-
-
-/**
- * A JmsProvider
that uses the spring framework
- * to obtain a JMS ConnectionFactory
and
- * Desitnation
objects.
- *
- * The constructor takes three arguments:
- *
- * - A string pointing to the the spring application context file contining the JMS configuration
- * (must be on the classpath)
- *
- * - The name of the connection factory bean
- * - The name of the destination bean
- *
- *
- *
- * @author tgoetz
- *
- */
-@SuppressWarnings("serial")
-public class SpringJmsProvider implements JmsProvider {
- private ConnectionFactory connectionFactory;
- private Destination destination;
-
- /**
- * Constructs a SpringJmsProvider
object given the name of a
- * classpath resource (the spring application context file), and the bean
- * names of a JMS connection factory and destination.
- *
- * @param appContextClasspathResource - the spring configuration file (classpath resource)
- * @param connectionFactoryBean - the JMS connection factory bean name
- * @param destinationBean - the JMS destination bean name
- */
- public SpringJmsProvider(String appContextClasspathResource, String connectionFactoryBean, String destinationBean){
- ApplicationContext context = new ClassPathXmlApplicationContext(appContextClasspathResource);
- this.connectionFactory = (ConnectionFactory)context.getBean(connectionFactoryBean);
- this.destination = (Destination)context.getBean(destinationBean);
- }
-
- public ConnectionFactory connectionFactory() throws Exception {
- return this.connectionFactory;
- }
-
- public Destination destination() throws Exception {
- return this.destination;
- }
-
-}
diff --git a/storm-jms/examples/src/main/resources/jms-activemq.xml b/storm-jms/examples/src/main/resources/jms-activemq.xml
deleted file mode 100644
index db50fcf..0000000
--- a/storm-jms/examples/src/main/resources/jms-activemq.xml
+++ /dev/null
@@ -1,37 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/storm-jms/examples/src/main/resources/log4j.properties b/storm-jms/examples/src/main/resources/log4j.properties
deleted file mode 100644
index 31a50d6..0000000
--- a/storm-jms/examples/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,13 +0,0 @@
-log4j.rootLogger=INFO, stdout
-
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-
-log4j.appender.stdout.layout.ConversionPattern=%5p (%C:%L) - %m%n
-
-
-log4j.logger.backtype.storm.contrib=DEBUG
-log4j.logger.clojure.contrib=WARN
-log4j.logger.org.springframework=WARN
-log4j.logger.org.apache.zookeeper=WARN
-
diff --git a/storm-jms/pom.xml b/storm-jms/pom.xml
deleted file mode 100644
index cb11e86..0000000
--- a/storm-jms/pom.xml
+++ /dev/null
@@ -1,69 +0,0 @@
-
-
-
- org.sonatype.oss
- oss-parent
- 7
-
-
-
- 4.0.0
- com.github.ptgoetz
- storm-jms
- 0.1.1-SNAPSHOT
- Storm JMS
- Storm JMS Components
-
-
-
-
- Eclipse Public License - v 1.0
- http://www.eclipse.org/legal/epl-v10.html
- repo
-
-
-
- scm:git:git@github.com:ptgoetz/storm-jms.git
- scm:git:git@github.com:ptgoetz/storm-jms.git
- :git@github.com:ptgoetz/storm-jms.git
-
-
-
-
- ptgoetz
- P. Taylor Goetz
- ptgoetz@gmail.com
-
-
-
-
-
- 0.6.2
-
-
-
- storm
- storm
- ${storm.version}
-
- provided
-
-
- org.apache.geronimo.specs
- geronimo-jms_1.1_spec
- 1.1.1
-
-
-
-
-
- org.apache.maven.plugins
- maven-compiler-plugin
-
-
- 1.6
-
-
-
-
-
\ No newline at end of file
diff --git a/storm-jms/src/main/java/backtype/storm/contrib/jms/JmsMessageProducer.java b/storm-jms/src/main/java/backtype/storm/contrib/jms/JmsMessageProducer.java
deleted file mode 100644
index bd07ccb..0000000
--- a/storm-jms/src/main/java/backtype/storm/contrib/jms/JmsMessageProducer.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package backtype.storm.contrib.jms;
-
-import java.io.Serializable;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.Session;
-
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-/**
- * JmsMessageProducer implementations are responsible for translating
- * a backtype.storm.tuple.Values
instance into a
- * javax.jms.Message
object.
- *
- *
- *
- * @author P. Taylor Goetz
- *
- */
-public interface JmsMessageProducer extends Serializable{
-
- /**
- * Translate a backtype.storm.tuple.Tuple
object
- * to a javax.jms.Message
JmsProvider object encapsulates the ConnectionFactory
- * and Destination
JMS objects the JmsSpout
needs to manage
- * a topic/queue connection over the course of it's lifecycle.
- *
- * @author P. Taylor Goetz
- *
- */
-public interface JmsProvider extends Serializable{
- /**
- * Provides the JMS ConnectionFactory
- * @return the connection factory
- * @throws Exception
- */
- public ConnectionFactory connectionFactory() throws Exception;
-
- /**
- * Provides the Destination
(topic or queue) from which the
- * JmsSpout
will receive messages.
- * @return
- * @throws Exception
- */
- public Destination destination() throws Exception;
-}
diff --git a/storm-jms/src/main/java/backtype/storm/contrib/jms/JmsTupleProducer.java b/storm-jms/src/main/java/backtype/storm/contrib/jms/JmsTupleProducer.java
deleted file mode 100644
index a76837d..0000000
--- a/storm-jms/src/main/java/backtype/storm/contrib/jms/JmsTupleProducer.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package backtype.storm.contrib.jms;
-
-import java.io.Serializable;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Values;
-
-/**
- * Interface to define classes that can produce a Storm Values
objects
- * from a javax.jms.Message
object>.
- *
- * Implementations are also responsible for declaring the output
- * fields they produce.
- *
- * If for some reason the implementation can't process a message
- * (for example if it received a javax.jms.ObjectMessage
- * when it was expecting a javax.jms.TextMessage
it should
- * return null
to indicate to the JmsSpout
that
- * the message could not be processed.
- *
- * @author P. Taylor Goetz
- *
- */
-public interface JmsTupleProducer extends Serializable{
- /**
- * Process a JMS message object to create a Values object.
- * @param msg - the JMS message
- * @return the Values tuple, or null if the message couldn't be processed.
- * @throws JMSException
- */
- Values toTuple(Message msg) throws JMSException;
-
- /**
- * Declare the output fields produced by this JmsTupleProducer.
- * @param declarer The OuputFieldsDeclarer for the spout.
- */
- void declareOutputFields(OutputFieldsDeclarer declarer);
-}
diff --git a/storm-jms/src/main/java/backtype/storm/contrib/jms/bolt/JmsBolt.java b/storm-jms/src/main/java/backtype/storm/contrib/jms/bolt/JmsBolt.java
deleted file mode 100644
index 487aa02..0000000
--- a/storm-jms/src/main/java/backtype/storm/contrib/jms/bolt/JmsBolt.java
+++ /dev/null
@@ -1,197 +0,0 @@
-package backtype.storm.contrib.jms.bolt;
-
-import java.util.Map;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.contrib.jms.JmsMessageProducer;
-import backtype.storm.contrib.jms.JmsProvider;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-
-/**
- * A JmsBolt receives backtype.storm.tuple.Tuple
objects from a Storm
- * topology and publishes JMS Messages to a destination (topic or queue).
- *
- * To use a JmsBolt in a topology, the following must be supplied:
- *
- * - A
JmsProvider
implementation.
- * - A
JmsMessageProducer
implementation.
- *
- * The JmsProvider
provides the JMS javax.jms.ConnectionFactory
- * and javax.jms.Destination
objects requied to publish JMS messages.
- *
- * The JmsBolt uses a JmsMessageProducer
to translate
- * backtype.storm.tuple.Tuple
objects into
- * javax.jms.Message
objects for publishing.
- *
- * Both JmsProvider and JmsMessageProducer must be set, or the bolt will
- * fail upon deployment to a cluster.
- *
- * The JmsBolt is typically an endpoint in a topology -- in other words
- * it does not emit any tuples.
- *
- *
- * @author P. Taylor Goetz
- *
- */
-public class JmsBolt implements IRichBolt {
- private static Logger LOG = LoggerFactory.getLogger(JmsBolt.class);
-
- private boolean autoAck = true;
-
- // javax.jms objects
- private Connection connection;
- private Session session;
- private MessageProducer messageProducer;
-
- // JMS options
- private boolean jmsTransactional = false;
- private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
-
-
- private JmsProvider jmsProvider;
- private JmsMessageProducer producer;
-
-
- private OutputCollector collector;
-
- /**
- * Set the JmsProvider used to connect to the JMS destination topic/queue
- * @param provider
- */
- public void setJmsProvider(JmsProvider provider){
- this.jmsProvider = provider;
- }
-
- /**
- * Set the JmsMessageProducer used to convert tuples
- * into JMS messages.
- *
- * @param producer
- */
- public void setJmsMessageProducer(JmsMessageProducer producer){
- this.producer = producer;
- }
-
- /**
- * Sets the JMS acknowledgement mode for JMS messages sent
- * by this bolt.
- *
- * Possible values:
- *
- * - javax.jms.Session.AUTO_ACKNOWLEDGE
- * - javax.jms.Session.CLIENT_ACKNOWLEDGE
- * - javax.jms.Session.DUPS_OK_ACKNOWLEDGE
- *
- * @param acknowledgeMode (constant defined in javax.jms.Session)
- */
- public void setJmsAcknowledgeMode(int acknowledgeMode){
- this.jmsAcknowledgeMode = acknowledgeMode;
- }
-
- /**
- * Set the JMS transactional setting for the JMS session.
- *
- * @param transactional
- */
-// public void setJmsTransactional(boolean transactional){
-// this.jmsTransactional = transactional;
-// }
-
- /**
- * Sets whether or not tuples should be acknowledged by this
- * bolt.
- *
- * @param autoAck
- */
- public void setAutoAck(boolean autoAck){
- this.autoAck = autoAck;
- }
-
-
- /**
- * Consumes a tuple and sends a JMS message.
- *
- * If autoAck is true, the tuple will be acknowledged
- * after the message is sent.
- *
- * If JMS sending fails, the tuple will be failed.
- */
- @Override
- public void execute(Tuple input) {
- // write the tuple to a JMS destination...
- LOG.debug("Tuple received. Sending JMS message.");
-
- try {
- Message msg = this.producer.toMessage(this.session, input);
- if(msg != null){
- this.messageProducer.send(msg);
- }
- if(this.autoAck){
- LOG.debug("ACKing tuple: " + input);
- this.collector.ack(input);
- }
- } catch (JMSException e) {
- // failed to send the JMS message, fail the tuple fast
- LOG.warn("Failing tuple: " + input);
- LOG.warn("Exception: ", e);
- this.collector.fail(input);
- }
- }
-
- /**
- * Releases JMS resources.
- */
- @Override
- public void cleanup() {
- try {
- LOG.debug("Closing JMS connection.");
- this.session.close();
- this.connection.close();
- } catch (JMSException e) {
- LOG.warn("Error closing JMS connection.", e);
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- }
-
- /**
- * Initializes JMS resources.
- */
- @Override
- public void prepare(Map stormConf, TopologyContext context,
- OutputCollector collector) {
- if(this.jmsProvider == null || this.producer == null){
- throw new IllegalStateException("JMS Provider and MessageProducer not set.");
- }
- this.collector = collector;
- LOG.debug("Connecting JMS..");
- try {
- ConnectionFactory cf = this.jmsProvider.connectionFactory();
- Destination dest = this.jmsProvider.destination();
- this.connection = cf.createConnection();
- this.session = connection.createSession(this.jmsTransactional,
- this.jmsAcknowledgeMode);
- this.messageProducer = session.createProducer(dest);
-
- connection.start();
- } catch (Exception e) {
- LOG.warn("Error creating JMS connection.", e);
- }
- }
-}
diff --git a/storm-jms/src/main/java/backtype/storm/contrib/jms/spout/JmsSpout.java b/storm-jms/src/main/java/backtype/storm/contrib/jms/spout/JmsSpout.java
deleted file mode 100644
index 75500d5..0000000
--- a/storm-jms/src/main/java/backtype/storm/contrib/jms/spout/JmsSpout.java
+++ /dev/null
@@ -1,312 +0,0 @@
-package backtype.storm.contrib.jms.spout;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.contrib.jms.JmsProvider;
-import backtype.storm.contrib.jms.JmsTupleProducer;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-
-/**
- * A Storm Spout
implementation that listens to a JMS topic or queue
- * and outputs tuples based on the messages it receives.
- *
- * JmsSpout
instances rely on JmsProducer
implementations
- * to obtain the JMS ConnectionFactory
and Destination
objects
- * necessary to connect to a JMS topic/queue.
- *
- * When a JmsSpout
receives a JMS message, it delegates to an
- * internal JmsTupleProducer
instance to create a Storm tuple from the
- * incoming message.
- *
- * Typically, developers will supply a custom JmsTupleProducer
implementation
- * appropriate for the expected message content.
- *
- * @author P. Taylor Goetz
- *
- */
-@SuppressWarnings("serial")
-public class JmsSpout implements IRichSpout, MessageListener {
- private static final Logger LOG = LoggerFactory.getLogger(JmsSpout.class);
-
- // JMS options
- private boolean jmsTransactional = false;
- private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
-
- private boolean distributed = true;
-
- private JmsTupleProducer tupleProducer;
-
- private JmsProvider jmsProvider;
-
- private LinkedBlockingQueue queue;
- private ConcurrentHashMap pendingMessages;
-
- private SpoutOutputCollector collector;
-
- private transient Connection connection;
- private transient Session session;
-
- /**
- * Sets the JMS Session acknowledgement mode for the JMS seesion associated with this spout.
- *
- * Possible values:
- *
- * - javax.jms.Session.AUTO_ACKNOWLEDGE
- * - javax.jms.Session.CLIENT_ACKNOWLEDGE
- * - javax.jms.Session.DUPS_OK_ACKNOWLEDGE
- *
- * @param mode JMS Session Acknowledgement mode
- * @throws IllegalArgumentException if the mode is not recognized.
- */
- public void setJmsAcknowledgeMode(int mode){
- switch (mode) {
- case Session.AUTO_ACKNOWLEDGE:
- case Session.CLIENT_ACKNOWLEDGE:
- case Session.DUPS_OK_ACKNOWLEDGE:
- break;
- default:
- throw new IllegalArgumentException("Unknown Acknowledge mode: " + mode + " (See javax.jms.Session for valid values)");
-
- }
- this.jmsAcknowledgeMode = mode;
- }
-
- /**
- * Returns the JMS Session acknowledgement mode for the JMS seesion associated with this spout.
- * @return
- */
- public int getJmsAcknowledgeMode(){
- return this.jmsAcknowledgeMode;
- }
-
- /**
- * Set whether this Spout uses the JMS transactional model by defualt.
- *
- * If true
the spout will always request acks from downstream
- * bolts, using the incoming JMS message ID as the Storm message ID.
- *
- * If false
the spout will request acks from downstream bolts
- * only if the spout's JmsAcknowledgeMode is not AUTO_ACKNOWLEDGE
- * and the JMS message DeliveryMode is not AUTO_ACKNOWLEDGE.
- *
- * If the spout determines that a JMS message should be handled transactionally
- * (i.e. acknowledged in JMS terms), it will be JMS-acknowledged in the spout's
- * ack()
method.
- *
- * Otherwise, if a downstream spout that has anchored on one of this spouts tuples
- * fails to acknowledge an emitted tuple, the JMS message will not be not be acknowledged,
- * and potentially be set for retransmission, depending on the underlying JMS implementation
- * and configuration.
- *
- * @param transactional
- */
- public void setJmsTransactional(boolean transactional){
- this.jmsTransactional = transactional;
- }
- public boolean isJmsTransaction(){
- return this.jmsTransactional;
- }
- /**
- * Set the backtype.storm.contrib.jms.JmsProvider
- * implementation that this Spout will use to connect to
- * a JMS javax.jms.Desination
- *
- * @param provider
- */
- public void setJmsProvider(JmsProvider provider){
- this.jmsProvider = provider;
- }
- /**
- * Set the backtype.storm.contrib.jms.JmsTupleProducer
- * implementation that will convert javax.jms.Message
- * object to backtype.storm.tuple.Values
objects
- * to be emitted.
- *
- * @param producer
- */
- public void setJmsTupleProducer(JmsTupleProducer producer){
- this.tupleProducer = producer;
- }
-
- /**
- * javax.jms.MessageListener
implementation.
- *
- * Stored the JMS message in an internal queue for processing
- * by the nextTuple()
method.
- */
- public void onMessage(Message msg) {
- this.queue.offer(msg);
-
- }
-
- /**
- * ISpout
implementation.
- *
- * Connects the JMS spout to the configured JMS destination
- * topic/queue.
- *
- */
- @SuppressWarnings("rawtypes")
- public void open(Map conf, TopologyContext context,
- SpoutOutputCollector collector) {
- if(this.jmsProvider == null){
- throw new IllegalStateException("JMS provider has not been set.");
- }
- if(this.tupleProducer == null){
- throw new IllegalStateException("JMS Tuple Producer has not been set.");
- }
- queue = new LinkedBlockingQueue();
- this.pendingMessages = new ConcurrentHashMap();
- this.collector = collector;
- try {
- ConnectionFactory cf = this.jmsProvider.connectionFactory();
- Destination dest = this.jmsProvider.destination();
- this.connection = cf.createConnection();
- this.session = connection.createSession(this.jmsTransactional,
- this.jmsAcknowledgeMode);
- MessageConsumer consumer = session.createConsumer(dest);
- consumer.setMessageListener(this);
- connection.start();
-
- } catch (Exception e) {
- LOG.warn("Error creating JMS connection.", e);
- }
-
- }
-
- public void close() {
- try {
- LOG.debug("Closing JMS connection.");
- this.session.close();
- this.connection.close();
- } catch (JMSException e) {
- LOG.warn("Error closing JMS connection.", e);
- }
-
- }
-
- public void nextTuple() {
- Message msg = this.queue.poll();
- if (msg == null) {
- Utils.sleep(50);
- } else {
-
- LOG.debug("sending tuple: " + msg);
- // get the tuple from the handler
- try {
- Values vals = this.tupleProducer.toTuple(msg);
- // if we're transactional, always ack, otherwise
- // ack if we're not in AUTO_ACKNOWLEDGE mode, or the message requests ACKNOWLEDGE
- LOG.debug("Requested deliveryMode: " + toDeliveryModeString(msg.getJMSDeliveryMode()));
- LOG.debug("Our deliveryMode: " + toDeliveryModeString(this.jmsAcknowledgeMode));
- if (this.jmsTransactional
- || (this.jmsAcknowledgeMode != Session.AUTO_ACKNOWLEDGE)
- || (msg.getJMSDeliveryMode() != Session.AUTO_ACKNOWLEDGE)) {
- LOG.debug("Requesting acks.");
- this.collector.emit(vals, msg.getJMSMessageID());
-
- // at this point we successfully emitted. Store
- // the message and message ID so we can do a
- // JMS acknowledge later
- this.pendingMessages.put(msg.getJMSMessageID(), msg);
- } else {
- this.collector.emit(vals);
- }
- } catch (JMSException e) {
- LOG.warn("Unable to convert JMS message: " + msg);
- }
-
- }
-
- }
-
- /*
- * Will only be called if we're transactional or not AUTO_ACKNOWLEDGE
- */
- public void ack(Object msgId) {
-
- Message msg = this.pendingMessages.remove(msgId);
- if (msg != null) {
- try {
- msg.acknowledge();
- LOG.debug("JMS Message acked: " + msgId);
- } catch (JMSException e) {
- LOG.warn("Error acknowldging JMS message: " + msgId, e);
- }
- } else {
- LOG.warn("Couldn't acknowledge unknown JMS message ID: " + msgId);
- }
-
- }
-
- /*
- * Will only be called if we're transactional or not AUTO_ACKNOWLEDGE
- */
- public void fail(Object msgId) {
- LOG.debug("Message failed: " + msgId);
- this.pendingMessages.remove(msgId);
-
- }
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- this.tupleProducer.declareOutputFields(declarer);
-
- }
-
- public boolean isDistributed() {
- return this.distributed;
- }
-
- /**
- * Sets the "distributed" mode of this spout.
- *
- * If true
multiple instances of this spout may be
- * created across the cluster (depending on the "parallelism_hint" in the topology configuration).
- *
- * Setting this value to false
essentially means this spout will run as a singleton
- * within the cluster ("parallelism_hint" will be ignored).
- *
- * In general, this should be set to false
if the underlying JMS destination is a
- * topic, and true
if it is a JMS queue.
- *
- * @param distributed
- */
- public void setDistributed(boolean distributed){
- this.distributed = distributed;
- }
-
-
- private static final String toDeliveryModeString(int deliveryMode) {
- switch (deliveryMode) {
- case Session.AUTO_ACKNOWLEDGE:
- return "AUTO_ACKNOWLEDGE";
- case Session.CLIENT_ACKNOWLEDGE:
- return "CLIENT_ACKNOWLEDGE";
- case Session.DUPS_OK_ACKNOWLEDGE:
- return "DUPS_OK_ACKNOWLEDGE";
- default:
- return "UNKNOWN";
-
- }
- }
-
-}
diff --git a/storm-signals b/storm-signals
new file mode 160000
index 0000000..c639a03
--- /dev/null
+++ b/storm-signals
@@ -0,0 +1 @@
+Subproject commit c639a034ee28c930f46b8a7e44208e7c5ae36b72
diff --git a/storm-signals/README.md b/storm-signals/README.md
deleted file mode 100644
index ab58fc7..0000000
--- a/storm-signals/README.md
+++ /dev/null
@@ -1,101 +0,0 @@
-## Storm-Signals
-Storm-Signals aims to provide a way to send messages ("signals") to components (spouts/bolts) in a storm topology that are otherwise not addressable.
-
-Storm topologies can be considered static in that modifications to a topology's behavior require redeployment. Storm-Signals provides a simple way to modify a topology's behavior at runtime, without redeployment.
-
-
-### Project Location
-Primary development of storm-signals will take place at:
-https://github.com/ptgoetz/storm-signals
-
-Point/stable (non-SNAPSHOT) release souce code will be pushed to:
-https://github.com/nathanmarz/storm-contrib
-
-Maven artifacts for releases will be available on maven central.
-
-
-
-
-### Use Cases
-Typical storm spouts run forever (until undeployed), emitting tuples based on an underlying, presumably event-driven, data source/stream.
-
-Some storm users have expressed an interest in having more control over that pattern, for instance in situations where the data stream is not open-ended, or where the situation requires that data streams be controllable (i.e. the ability to start/stop/pause/resume processing).
-
-Storm-Signals provides a very simple mechanism for communicating with spouts deployed within a storm topology. The communication mechanism resides outside of storm's basic stream processing paradigm (i.e. calls to `nextTuple()` and the tuple ack/fail mechanism).
-
-Signals (messages)
-
-#### Sample Use Cases
-
-* Ability to start/stop/pause/resume a spout from a process _external to_ the storm topology.
-* Ability to change the source of a spout's stream without redeploying the topology.
-* Initiating processing of a set/batch of data based on a schedule (such as a Quartz or cron job)
-* Periodically sending a dynamic SQL query to a spout that emits tuples for processing.
-* Any other use case you can think of. :)
-
-## Usage
-
-### Spout Implementation
-Currently (Version 0.1.0) provides a basic abstract `BaseRichSpout` implementation that must be subclassed:
-
-`backtype.storm.contrib.signals.spout.BaseSignalSpout`
-
-Subclasses _must_ override the `onSignal()` method:
-
- protected abstract void onSignal(byte[] data);
-
-This method is called when a signal is sent to a spout. The signal payload is a `byte[]` that can contain anything (string, data, seriliazed object(s), etc.).
-
-Subclasses _must_ override the superclass constructor:
-
- public TestSignalSpout(String name) {
- super(name);
- }
-
-The `name` parameter provides a unique ID for the spout that allows `SingalClient`s to address the bolt and send it messages.
-
-Subclasses _must_ call `super.open()` if they override the `open()` method:
-
- @Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- super.open(conf, context, collector);
- }
-
-Failure to do so will prevent the spout from receiving signals (i.e. `onSignal()` will never be called).
-
-### Signal Client
-
-The `SignalClient` constructor requires two arguments:
-
-1. a zookeeper connect string ("host1:port1,host2:port2,hostN:portN") that should match the storm zookeeper configuration
-2. a name string (this should match the name used to construct the `BaseSignalSpout` subclass)
-
-Example:
-
- public static void main(String[] args) throws Exception {
- SignalClient sc = new SignalClient("localhost:2181", "test-signal-spout");
- sc.start();
- try {
- sc.send("Hello Signal Spout!".getBytes());
- } finally {
- sc.close();
- }
- }
-
-
-## Maven Usage
-
-### Maven Dependency
-
-Point (non-SNAPSHOT) releases will be available on maven central.
-
-
- com.github.ptgoetz
- storm-signals
- 0.1.0
-
-
-
-
-
-
diff --git a/storm-signals/pom.xml b/storm-signals/pom.xml
deleted file mode 100644
index 0f2a9ac..0000000
--- a/storm-signals/pom.xml
+++ /dev/null
@@ -1,60 +0,0 @@
-
-
-
- org.sonatype.oss
- oss-parent
- 7
-
-
- 4.0.0
- com.github.ptgoetz
- storm-signals
- 0.1.0
- Storm Signals
- Storm Signals
-
-
-
- Eclipse Public License - v 1.0
- http://www.eclipse.org/legal/epl-v10.html
- repo
-
-
-
-
- scm:git:git@github.com:ptgoetz/storm-signals.git
- scm:git:git@github.com:ptgoetz/storm-signals.git
- :git@github.com:ptgoetz/storm-signals.git
-
-
-
- ptgoetz
- P. Taylor Goetz
- ptgoetz@gmail.com
-
-
-
- 0.7.0
-
-
-
- storm
- storm
- ${storm.version}
-
- provided
-
-
-
-
-
- org.apache.maven.plugins
- maven-compiler-plugin
-
-
- 1.6
-
-
-
-
-
\ No newline at end of file
diff --git a/storm-signals/src/main/java/backtype/storm/contrib/signals/client/SignalClient.java b/storm-signals/src/main/java/backtype/storm/contrib/signals/client/SignalClient.java
deleted file mode 100644
index 1a167fe..0000000
--- a/storm-signals/src/main/java/backtype/storm/contrib/signals/client/SignalClient.java
+++ /dev/null
@@ -1,57 +0,0 @@
-// Copyright (c) P. Taylor Goetz (ptgoetz@gmail.com)
-
-package backtype.storm.contrib.signals.client;
-
-import java.io.IOException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.netflix.curator.framework.CuratorFramework;
-import com.netflix.curator.framework.CuratorFrameworkFactory;
-import com.netflix.curator.retry.RetryOneTime;
-
-public class SignalClient {
-
- private static final Logger LOG = LoggerFactory.getLogger(SignalClient.class);
-
- private CuratorFramework client = null;
- private String name;
-
- public SignalClient(String zkConnectString, String name) {
- this.name = name;
- try {
- this.client = CuratorFrameworkFactory.builder().namespace("storm-signals").connectString(zkConnectString)
- .retryPolicy(new RetryOneTime(500)).build();
- } catch (IOException e) {
- LOG.error("Error creating zookeeper client.", e);
- }
- LOG.debug("created Curator client");
- }
-
- public void start() {
- this.client.start();
- }
-
- public void close() {
- this.client.close();
- }
-
- public void send(byte[] signal) throws Exception {
- this.client.setData().forPath(this.name, signal);
- }
-
- /**
- * @param args
- */
- public static void main(String[] args) throws Exception {
- SignalClient sc = new SignalClient("localhost:2181", "test-signal-spout1");
- sc.start();
- try {
- sc.send("Hello Signal Spout!".getBytes());
- } finally {
- sc.close();
- }
- }
-
-}
diff --git a/storm-signals/src/main/java/backtype/storm/contrib/signals/spout/BaseSignalSpout.java b/storm-signals/src/main/java/backtype/storm/contrib/signals/spout/BaseSignalSpout.java
deleted file mode 100644
index fa94cc6..0000000
--- a/storm-signals/src/main/java/backtype/storm/contrib/signals/spout/BaseSignalSpout.java
+++ /dev/null
@@ -1,120 +0,0 @@
-// Copyright (c) P. Taylor Goetz (ptgoetz@gmail.com)
-
-package backtype.storm.contrib.signals.spout;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.base.BaseRichSpout;
-
-import com.netflix.curator.framework.CuratorFramework;
-import com.netflix.curator.framework.CuratorFrameworkFactory;
-import com.netflix.curator.retry.RetryNTimes;
-
-@SuppressWarnings("serial")
-public abstract class BaseSignalSpout extends BaseRichSpout implements Watcher {
-
- private static final Logger LOG = LoggerFactory.getLogger(BaseSignalSpout.class);
- private static final String namespace = "storm-signals";
- private String name;
- private CuratorFramework client;
-
- public BaseSignalSpout(String name) {
- this.name = name;
- }
-
- @SuppressWarnings("rawtypes")
- @Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- try {
- initZookeeper(conf);
- } catch (Exception e) {
- LOG.error("Error creating zookeeper client.", e);
- }
- }
-
- @SuppressWarnings("rawtypes")
- private void initZookeeper(Map conf) throws Exception {
- String connectString = zkHosts(conf);
- int retryCount = (Integer) conf.get("storm.zookeeper.retry.times");
- int retryInterval = (Integer) conf.get("storm.zookeeper.retry.interval");
-
- this.client = CuratorFrameworkFactory.builder().namespace(namespace).connectString(connectString)
- .retryPolicy(new RetryNTimes(retryCount, retryInterval)).build();
- this.client.start();
-
- // create base path if necessary
- Stat stat = this.client.checkExists().usingWatcher(this).forPath(this.name);
- if (stat == null) {
- String path = this.client.create().creatingParentsIfNeeded().forPath(this.name);
- LOG.info("Created: " + path);
- }
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- private String zkHosts(Map conf) {
- int zkPort = (Integer) conf.get("storm.zookeeper.port");
- List zkServers = (List) conf.get("storm.zookeeper.servers");
-
- Iterator it = zkServers.iterator();
- StringBuffer sb = new StringBuffer();
- while (it.hasNext()) {
- sb.append(it.next());
- sb.append(":");
- sb.append(zkPort);
- if (it.hasNext()) {
- sb.append(",");
- }
- }
- return sb.toString();
- }
-
- /**
- * Releases the zookeeper connection.
- */
- @Override
- public void close() {
- super.close();
- this.client.close();
- }
-
- @Override
- public void process(WatchedEvent we) {
- try {
- this.client.checkExists().usingWatcher(this).forPath(this.name);
- LOG.debug("Renewed watch for path %s", this.name);
- } catch (Exception ex) {
- LOG.error("Error renewing watch.", ex);
- }
-
- switch (we.getType()) {
- case NodeCreated:
- LOG.debug("Node created.");
- break;
- case NodeDataChanged:
- LOG.debug("Received signal.");
- try {
- this.onSignal(this.client.getData().forPath(we.getPath()));
- } catch (Exception e) {
- LOG.warn("Unable to process signal.", e);
- }
- break;
- case NodeDeleted:
- LOG.debug("NodeDeleted");
- break;
- }
-
- }
-
- protected abstract void onSignal(byte[] data);
-
-}
diff --git a/storm-signals/src/main/java/backtype/storm/contrib/signals/test/SignalTopology.java b/storm-signals/src/main/java/backtype/storm/contrib/signals/test/SignalTopology.java
deleted file mode 100644
index 461741c..0000000
--- a/storm-signals/src/main/java/backtype/storm/contrib/signals/test/SignalTopology.java
+++ /dev/null
@@ -1,31 +0,0 @@
-// Copyright (c) P. Taylor Goetz (ptgoetz@gmail.com)
-
-package backtype.storm.contrib.signals.test;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.utils.Utils;
-
-public class SignalTopology {
-
- /**
- * @param args
- */
- public static void main(String[] args) {
- // TODO Auto-generated method stub
- TopologyBuilder builder = new TopologyBuilder();
-
- builder.setSpout("signal-spout", new TestSignalSpout("test-signal-spout"));
-
- Config conf = new Config();
- conf.setDebug(true);
-
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("test", conf, builder.createTopology());
- Utils.sleep(120000);
- cluster.killTopology("test");
- cluster.shutdown();
- }
-
-}
diff --git a/storm-signals/src/main/java/backtype/storm/contrib/signals/test/TestSignalSpout.java b/storm-signals/src/main/java/backtype/storm/contrib/signals/test/TestSignalSpout.java
deleted file mode 100644
index cc18041..0000000
--- a/storm-signals/src/main/java/backtype/storm/contrib/signals/test/TestSignalSpout.java
+++ /dev/null
@@ -1,38 +0,0 @@
-// Copyright (c) P. Taylor Goetz (ptgoetz@gmail.com)
-
-package backtype.storm.contrib.signals.test;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.contrib.signals.spout.BaseSignalSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-
-public class TestSignalSpout extends BaseSignalSpout {
-
-
- private static final Logger LOG = LoggerFactory.getLogger(TestSignalSpout.class);
-
- public TestSignalSpout(String name) {
- super(name);
- }
-
- @Override
- public void onSignal(byte[] data) {
- LOG.info("Received signal: " + new String(data));
-
- }
-
- @Override
- public void nextTuple() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- // TODO Auto-generated method stub
-
- }
-
-}