Skip to content
Browse files

added storm-jms to storm-contrib

  • Loading branch information...
1 parent 94f9a3a commit ddbc242fd9fd3f42a2635b54016ae590d85ba3de @ptgoetz ptgoetz committed Apr 2, 2012
View
261 storm-jms/LICENSE.html
@@ -0,0 +1,261 @@
+<?xml version="1.0" encoding="ISO-8859-1" ?>
+<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
+<html xmlns="http://www.w3.org/1999/xhtml">
+
+<head>
+<meta http-equiv="Content-Type" content="text/html; charset=ISO-8859-1" />
+<title>Eclipse Public License - Version 1.0</title>
+<style type="text/css">
+ body {
+ size: 8.5in 11.0in;
+ margin: 0.25in 0.5in 0.25in 0.5in;
+ tab-interval: 0.5in;
+ }
+ p {
+ margin-left: auto;
+ margin-top: 0.5em;
+ margin-bottom: 0.5em;
+ }
+ p.list {
+ margin-left: 0.5in;
+ margin-top: 0.05em;
+ margin-bottom: 0.05em;
+ }
+ </style>
+
+</head>
+
+<body lang="EN-US">
+
+<h2>Eclipse Public License - v 1.0</h2>
+
+<p>THE ACCOMPANYING PROGRAM IS PROVIDED UNDER THE TERMS OF THIS ECLIPSE
+PUBLIC LICENSE (&quot;AGREEMENT&quot;). ANY USE, REPRODUCTION OR
+DISTRIBUTION OF THE PROGRAM CONSTITUTES RECIPIENT'S ACCEPTANCE OF THIS
+AGREEMENT.</p>
+
+<p><b>1. DEFINITIONS</b></p>
+
+<p>&quot;Contribution&quot; means:</p>
+
+<p class="list">a) in the case of the initial Contributor, the initial
+code and documentation distributed under this Agreement, and</p>
+<p class="list">b) in the case of each subsequent Contributor:</p>
+<p class="list">i) changes to the Program, and</p>
+<p class="list">ii) additions to the Program;</p>
+<p class="list">where such changes and/or additions to the Program
+originate from and are distributed by that particular Contributor. A
+Contribution 'originates' from a Contributor if it was added to the
+Program by such Contributor itself or anyone acting on such
+Contributor's behalf. Contributions do not include additions to the
+Program which: (i) are separate modules of software distributed in
+conjunction with the Program under their own license agreement, and (ii)
+are not derivative works of the Program.</p>
+
+<p>&quot;Contributor&quot; means any person or entity that distributes
+the Program.</p>
+
+<p>&quot;Licensed Patents&quot; mean patent claims licensable by a
+Contributor which are necessarily infringed by the use or sale of its
+Contribution alone or when combined with the Program.</p>
+
+<p>&quot;Program&quot; means the Contributions distributed in accordance
+with this Agreement.</p>
+
+<p>&quot;Recipient&quot; means anyone who receives the Program under
+this Agreement, including all Contributors.</p>
+
+<p><b>2. GRANT OF RIGHTS</b></p>
+
+<p class="list">a) Subject to the terms of this Agreement, each
+Contributor hereby grants Recipient a non-exclusive, worldwide,
+royalty-free copyright license to reproduce, prepare derivative works
+of, publicly display, publicly perform, distribute and sublicense the
+Contribution of such Contributor, if any, and such derivative works, in
+source code and object code form.</p>
+
+<p class="list">b) Subject to the terms of this Agreement, each
+Contributor hereby grants Recipient a non-exclusive, worldwide,
+royalty-free patent license under Licensed Patents to make, use, sell,
+offer to sell, import and otherwise transfer the Contribution of such
+Contributor, if any, in source code and object code form. This patent
+license shall apply to the combination of the Contribution and the
+Program if, at the time the Contribution is added by the Contributor,
+such addition of the Contribution causes such combination to be covered
+by the Licensed Patents. The patent license shall not apply to any other
+combinations which include the Contribution. No hardware per se is
+licensed hereunder.</p>
+
+<p class="list">c) Recipient understands that although each Contributor
+grants the licenses to its Contributions set forth herein, no assurances
+are provided by any Contributor that the Program does not infringe the
+patent or other intellectual property rights of any other entity. Each
+Contributor disclaims any liability to Recipient for claims brought by
+any other entity based on infringement of intellectual property rights
+or otherwise. As a condition to exercising the rights and licenses
+granted hereunder, each Recipient hereby assumes sole responsibility to
+secure any other intellectual property rights needed, if any. For
+example, if a third party patent license is required to allow Recipient
+to distribute the Program, it is Recipient's responsibility to acquire
+that license before distributing the Program.</p>
+
+<p class="list">d) Each Contributor represents that to its knowledge it
+has sufficient copyright rights in its Contribution, if any, to grant
+the copyright license set forth in this Agreement.</p>
+
+<p><b>3. REQUIREMENTS</b></p>
+
+<p>A Contributor may choose to distribute the Program in object code
+form under its own license agreement, provided that:</p>
+
+<p class="list">a) it complies with the terms and conditions of this
+Agreement; and</p>
+
+<p class="list">b) its license agreement:</p>
+
+<p class="list">i) effectively disclaims on behalf of all Contributors
+all warranties and conditions, express and implied, including warranties
+or conditions of title and non-infringement, and implied warranties or
+conditions of merchantability and fitness for a particular purpose;</p>
+
+<p class="list">ii) effectively excludes on behalf of all Contributors
+all liability for damages, including direct, indirect, special,
+incidental and consequential damages, such as lost profits;</p>
+
+<p class="list">iii) states that any provisions which differ from this
+Agreement are offered by that Contributor alone and not by any other
+party; and</p>
+
+<p class="list">iv) states that source code for the Program is available
+from such Contributor, and informs licensees how to obtain it in a
+reasonable manner on or through a medium customarily used for software
+exchange.</p>
+
+<p>When the Program is made available in source code form:</p>
+
+<p class="list">a) it must be made available under this Agreement; and</p>
+
+<p class="list">b) a copy of this Agreement must be included with each
+copy of the Program.</p>
+
+<p>Contributors may not remove or alter any copyright notices contained
+within the Program.</p>
+
+<p>Each Contributor must identify itself as the originator of its
+Contribution, if any, in a manner that reasonably allows subsequent
+Recipients to identify the originator of the Contribution.</p>
+
+<p><b>4. COMMERCIAL DISTRIBUTION</b></p>
+
+<p>Commercial distributors of software may accept certain
+responsibilities with respect to end users, business partners and the
+like. While this license is intended to facilitate the commercial use of
+the Program, the Contributor who includes the Program in a commercial
+product offering should do so in a manner which does not create
+potential liability for other Contributors. Therefore, if a Contributor
+includes the Program in a commercial product offering, such Contributor
+(&quot;Commercial Contributor&quot;) hereby agrees to defend and
+indemnify every other Contributor (&quot;Indemnified Contributor&quot;)
+against any losses, damages and costs (collectively &quot;Losses&quot;)
+arising from claims, lawsuits and other legal actions brought by a third
+party against the Indemnified Contributor to the extent caused by the
+acts or omissions of such Commercial Contributor in connection with its
+distribution of the Program in a commercial product offering. The
+obligations in this section do not apply to any claims or Losses
+relating to any actual or alleged intellectual property infringement. In
+order to qualify, an Indemnified Contributor must: a) promptly notify
+the Commercial Contributor in writing of such claim, and b) allow the
+Commercial Contributor to control, and cooperate with the Commercial
+Contributor in, the defense and any related settlement negotiations. The
+Indemnified Contributor may participate in any such claim at its own
+expense.</p>
+
+<p>For example, a Contributor might include the Program in a commercial
+product offering, Product X. That Contributor is then a Commercial
+Contributor. If that Commercial Contributor then makes performance
+claims, or offers warranties related to Product X, those performance
+claims and warranties are such Commercial Contributor's responsibility
+alone. Under this section, the Commercial Contributor would have to
+defend claims against the other Contributors related to those
+performance claims and warranties, and if a court requires any other
+Contributor to pay any damages as a result, the Commercial Contributor
+must pay those damages.</p>
+
+<p><b>5. NO WARRANTY</b></p>
+
+<p>EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, THE PROGRAM IS
+PROVIDED ON AN &quot;AS IS&quot; BASIS, WITHOUT WARRANTIES OR CONDITIONS
+OF ANY KIND, EITHER EXPRESS OR IMPLIED INCLUDING, WITHOUT LIMITATION,
+ANY WARRANTIES OR CONDITIONS OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY
+OR FITNESS FOR A PARTICULAR PURPOSE. Each Recipient is solely
+responsible for determining the appropriateness of using and
+distributing the Program and assumes all risks associated with its
+exercise of rights under this Agreement , including but not limited to
+the risks and costs of program errors, compliance with applicable laws,
+damage to or loss of data, programs or equipment, and unavailability or
+interruption of operations.</p>
+
+<p><b>6. DISCLAIMER OF LIABILITY</b></p>
+
+<p>EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, NEITHER RECIPIENT
+NOR ANY CONTRIBUTORS SHALL HAVE ANY LIABILITY FOR ANY DIRECT, INDIRECT,
+INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING
+WITHOUT LIMITATION LOST PROFITS), HOWEVER CAUSED AND ON ANY THEORY OF
+LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OR
+DISTRIBUTION OF THE PROGRAM OR THE EXERCISE OF ANY RIGHTS GRANTED
+HEREUNDER, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGES.</p>
+
+<p><b>7. GENERAL</b></p>
+
+<p>If any provision of this Agreement is invalid or unenforceable under
+applicable law, it shall not affect the validity or enforceability of
+the remainder of the terms of this Agreement, and without further action
+by the parties hereto, such provision shall be reformed to the minimum
+extent necessary to make such provision valid and enforceable.</p>
+
+<p>If Recipient institutes patent litigation against any entity
+(including a cross-claim or counterclaim in a lawsuit) alleging that the
+Program itself (excluding combinations of the Program with other
+software or hardware) infringes such Recipient's patent(s), then such
+Recipient's rights granted under Section 2(b) shall terminate as of the
+date such litigation is filed.</p>
+
+<p>All Recipient's rights under this Agreement shall terminate if it
+fails to comply with any of the material terms or conditions of this
+Agreement and does not cure such failure in a reasonable period of time
+after becoming aware of such noncompliance. If all Recipient's rights
+under this Agreement terminate, Recipient agrees to cease use and
+distribution of the Program as soon as reasonably practicable. However,
+Recipient's obligations under this Agreement and any licenses granted by
+Recipient relating to the Program shall continue and survive.</p>
+
+<p>Everyone is permitted to copy and distribute copies of this
+Agreement, but in order to avoid inconsistency the Agreement is
+copyrighted and may only be modified in the following manner. The
+Agreement Steward reserves the right to publish new versions (including
+revisions) of this Agreement from time to time. No one other than the
+Agreement Steward has the right to modify this Agreement. The Eclipse
+Foundation is the initial Agreement Steward. The Eclipse Foundation may
+assign the responsibility to serve as the Agreement Steward to a
+suitable separate entity. Each new version of the Agreement will be
+given a distinguishing version number. The Program (including
+Contributions) may always be distributed subject to the version of the
+Agreement under which it was received. In addition, after a new version
+of the Agreement is published, Contributor may elect to distribute the
+Program (including its Contributions) under the new version. Except as
+expressly stated in Sections 2(a) and 2(b) above, Recipient receives no
+rights or licenses to the intellectual property of any Contributor under
+this Agreement, whether expressly, by implication, estoppel or
+otherwise. All rights in the Program not expressly granted under this
+Agreement are reserved.</p>
+
+<p>This Agreement is governed by the laws of the State of New York and
+the intellectual property laws of the United States of America. No party
+to this Agreement will bring a legal action under this Agreement more
+than one year after the cause of action arose. Each party waives its
+rights to a jury trial in any resulting litigation.</p>
+
+</body>
+
+</html>
View
40 storm-jms/README.markdown
@@ -0,0 +1,40 @@
+## About Storm JMS
+Storm JMS is a generic framework for integrating JMS messaging within the Storm framework.
+
+The [Storm Rationale page](https://github.com/nathanmarz/storm/wiki/Rationale) explains what storm is and why it was built.
+
+Storm-JMS allows you to inject data into Storm via a generic JMS spout, as well as consume data from Storm via a generic JMS bolt.
+
+Both the JMS Spout and JMS Bolt are data agnostic. To use them, you provide a simple Java class that bridges the JMS and Storm APIs and encapsulates and domain-specific logic.
+
+## Components
+
+### JMS Spout
+The JMS Spout component allows for data published to a JMS topic or queue to be consumed by a Storm topology.
+
+A JMS Spout connects to a JMS Destination (topic or queue), and emits Storm "Tuple" objects based on the content of the JMS message received.
+
+
+### JMS Bolt
+The JMS Bolt component allows for data within a Storm topology to be published to a JMS destination (topic or queue).
+
+A JMS Bolt connects to a JMS Destination, and publishes JMS Messages based on the Storm "Tuple" objects it receives.
+
+
+## Documentation
+
+Documentation and tutorials can be found on the [Storm-JMS wiki](http://github.com/ptgoetz/storm-jms/wiki).
+
+
+## License
+
+The use and distribution terms for this software are covered by the
+Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
+which can be found in the file LICENSE.html at the root of this distribution.
+By using this software in any fashion, you are agreeing to be bound by
+the terms of this license.
+You must not remove this notice, or any other, from this software.
+
+## Contributors
+
+* P. Taylor Goetz ([@ptgoetz](http://twitter.com/ptgoetz))
View
23 storm-jms/examples/README.markdown
@@ -0,0 +1,23 @@
+## About Storm JMS Examples
+This project contains a simple storm topology that illustrates the usage of "storm-jms".
+
+To build:
+
+`mvn clean install`
+
+The default build will create a jar file that can be deployed to to a Storm cluster in the "target" directory:
+
+`storm-jms-examples-0.1-SNAPSHOT-jar-with-dependencies.jar`
+
+## License
+
+The use and distribution terms for this software are covered by the
+Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
+which can be found in the file LICENSE.html at the root of this distribution.
+By using this software in any fashion, you are agreeing to be bound by
+the terms of this license.
+You must not remove this notice, or any other, from this software.
+
+## Contributors
+
+* P. Taylor Goetz ([@ptgoetz](http://twitter.com/ptgoetz))
View
135 storm-jms/examples/pom.xml
@@ -0,0 +1,135 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>backtype.storm.contrib</groupId>
+ <artifactId>storm-jms-examples</artifactId>
+ <version>0.1-SNAPSHOT</version>
+ <name>Storm JMS Examples</name>
+ <description>Storm JMS Examples</description>
+ <repositories>
+ <repository>
+ <id>clojars.org</id>
+ <url>http://clojars.org/repo</url>
+ </repository>
+ </repositories>
+ <properties>
+ <spring.version>2.5.6</spring.version>
+ <storm.version>0.6.2</storm.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-beans</artifactId>
+ <version>${spring.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-core</artifactId>
+ <version>${spring.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-context</artifactId>
+ <version>${spring.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-jms</artifactId>
+ <version>${spring.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.xbean</groupId>
+ <artifactId>xbean-spring</artifactId>
+ <version>3.7</version>
+ </dependency>
+ <dependency>
+ <groupId>storm</groupId>
+ <artifactId>storm</artifactId>
+ <version>${storm.version}</version>
+ <!-- keep storm out of the jar-with-dependencies -->
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.github.ptgoetz</groupId>
+ <artifactId>storm-jms</artifactId>
+ <version>0.1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-core</artifactId>
+ <version>5.4.0</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <!-- bind the maven-assembly-plugin to the package phase this will create
+ a jar file without the storm dependencies suitable for deployment to a cluster. -->
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ <archive>
+ <manifest>
+ <mainClass></mainClass>
+ </manifest>
+ </archive>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+
+ </plugin>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>1.2.1</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <executable>java</executable>
+ <includeProjectDependencies>true</includeProjectDependencies>
+ <includePluginDependencies>true</includePluginDependencies>
+ <mainClass>backtype.storm.contrib.jms.example.ExampleJmsTopology</mainClass>
+ <systemProperties>
+ <systemProperty>
+ <key>log4j.configuration</key>
+ <value>file:./src/main/resources/log4j.properties</value>
+ </systemProperty>
+ </systemProperties>
+ </configuration>
+ <dependencies>
+ <dependency>
+ <groupId>storm</groupId>
+ <artifactId>storm</artifactId>
+ <version>${storm.version}</version>
+ <type>jar</type>
+ </dependency>
+ </dependencies>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+
+ </plugins>
+
+ </build>
+</project>
View
114 storm-jms/examples/src/main/java/backtype/storm/contrib/jms/example/ExampleJmsTopology.java
@@ -0,0 +1,114 @@
+package backtype.storm.contrib.jms.example;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.contrib.jms.JmsMessageProducer;
+import backtype.storm.contrib.jms.JmsProvider;
+import backtype.storm.contrib.jms.JmsTupleProducer;
+import backtype.storm.contrib.jms.bolt.JmsBolt;
+import backtype.storm.contrib.jms.spout.JmsSpout;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.Utils;
+
+public class ExampleJmsTopology {
+ public static final String JMS_QUEUE_SPOUT = "JMS_QUEUE_SPOUT";
+ public static final String INTERMEDIATE_BOLT = "INTERMEDIATE_BOLT";
+ public static final String FINAL_BOLT = "FINAL_BOLT";
+ public static final String JMS_TOPIC_BOLT = "JMS_TOPIC_BOLT";
+ public static final String JMS_TOPIC_SPOUT = "JMS_TOPIC_SPOUT";
+ public static final String ANOTHER_BOLT = "ANOTHER_BOLT";
+
+ @SuppressWarnings("serial")
+ public static void main(String[] args) throws Exception {
+
+ // JMS Queue Provider
+ JmsProvider jmsQueueProvider = new SpringJmsProvider(
+ "jms-activemq.xml", "jmsConnectionFactory",
+ "notificationQueue");
+
+ // JMS Topic provider
+ JmsProvider jmsTopicProvider = new SpringJmsProvider(
+ "jms-activemq.xml", "jmsConnectionFactory",
+ "notificationTopic");
+
+ // JMS Producer
+ JmsTupleProducer producer = new JsonTupleProducer();
+
+ // JMS Queue Spout
+ JmsSpout queueSpout = new JmsSpout();
+ queueSpout.setJmsProvider(jmsQueueProvider);
+ queueSpout.setJmsTupleProducer(producer);
+ queueSpout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
+ queueSpout.setDistributed(true); // allow multiple instances
+
+ TopologyBuilder builder = new TopologyBuilder();
+
+ // spout with 5 parallel instances
+ builder.setSpout(JMS_QUEUE_SPOUT, queueSpout, 5);
+
+ // intermediate bolt, subscribes to jms spout, anchors on tuples, and auto-acks
+ builder.setBolt(INTERMEDIATE_BOLT,
+ new GenericBolt("INTERMEDIATE_BOLT", true, true, new Fields("json")), 3).shuffleGrouping(
+ JMS_QUEUE_SPOUT);
+
+ // bolt that subscribes to the intermediate bolt, and auto-acks
+ // messages.
+ builder.setBolt(FINAL_BOLT, new GenericBolt("FINAL_BOLT", true, true), 3).shuffleGrouping(
+ INTERMEDIATE_BOLT);
+
+ // bolt that subscribes to the intermediate bolt, and publishes to a JMS Topic
+ JmsBolt jmsBolt = new JmsBolt();
+ jmsBolt.setJmsProvider(jmsTopicProvider);
+
+ // anonymous message producer just calls toString() on the tuple to create a jms message
+ jmsBolt.setJmsMessageProducer(new JmsMessageProducer() {
+ @Override
+ public Message toMessage(Session session, Tuple input) throws JMSException{
+ System.out.println("Sending JMS Message:" + input.toString());
+ TextMessage tm = session.createTextMessage(input.toString());
+ return tm;
+ }
+ });
+
+ builder.setBolt(JMS_TOPIC_BOLT, jmsBolt).shuffleGrouping(INTERMEDIATE_BOLT);
+
+ // JMS Topic spout
+ JmsSpout topicSpout = new JmsSpout();
+ topicSpout.setJmsProvider(jmsTopicProvider);
+ topicSpout.setJmsTupleProducer(producer);
+ topicSpout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
+ topicSpout.setDistributed(false);
+
+ builder.setSpout(JMS_TOPIC_SPOUT, topicSpout);
+
+ builder.setBolt(ANOTHER_BOLT, new GenericBolt("ANOTHER_BOLT", true, true), 1).shuffleGrouping(
+ JMS_TOPIC_SPOUT);
+
+ Config conf = new Config();
+
+ if (args.length > 0) {
+ conf.setNumWorkers(3);
+
+ StormSubmitter.submitTopology(args[0], conf,
+ builder.createTopology());
+ } else {
+
+ conf.setDebug(true);
+
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("storm-jms-example", conf, builder.createTopology());
+ Utils.sleep(60000);
+ cluster.killTopology("storm-jms-example");
+ cluster.shutdown();
+ }
+ }
+
+}
View
99 storm-jms/examples/src/main/java/backtype/storm/contrib/jms/example/GenericBolt.java
@@ -0,0 +1,99 @@
+package backtype.storm.contrib.jms.example;
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+/**
+ * A generic <code>backtype.storm.topology.IRichBolt</code> implementation
+ * for testing/debugging the Storm JMS Spout and example topologies.
+ * <p/>
+ * For debugging purposes, set the log level of the
+ * <code>backtype.storm.contrib.jms</code> package to DEBUG for debugging
+ * output.
+ * @author tgoetz
+ *
+ */
+@SuppressWarnings("serial")
+public class GenericBolt implements IRichBolt {
+ private static final Logger LOG = LoggerFactory.getLogger(GenericBolt.class);
+ private OutputCollector collector;
+ private boolean autoAck = false;
+ private boolean autoAnchor = false;
+ private Fields declaredFields;
+ private String name;
+
+ /**
+ * Constructs a new <code>GenericBolt</code> instance.
+ *
+ * @param name The name of the bolt (used in DEBUG logging)
+ * @param autoAck Whether or not this bolt should automatically acknowledge received tuples.
+ * @param autoAnchor Whether or not this bolt should automatically anchor to received tuples.
+ * @param declaredFields The fields this bolt declares as output.
+ */
+ public GenericBolt(String name, boolean autoAck, boolean autoAnchor, Fields declaredFields){
+ this.name = name;
+ this.autoAck = autoAck;
+ this.autoAnchor = autoAnchor;
+ this.declaredFields = declaredFields;
+ }
+
+ public GenericBolt(String name, boolean autoAck, boolean autoAnchor){
+ this(name, autoAck, autoAnchor, null);
+ }
+
+ @SuppressWarnings("rawtypes")
+ public void prepare(Map stormConf, TopologyContext context,
+ OutputCollector collector) {
+ this.collector = collector;
+
+ }
+
+ public void execute(Tuple input) {
+ LOG.debug("[" + this.name + "] Received message: " + input);
+
+
+
+ // only emit if we have declared fields.
+ if(this.declaredFields != null){
+ LOG.debug("[" + this.name + "] emitting: " + input);
+ if(this.autoAnchor){
+ this.collector.emit(input, input.getValues());
+ } else{
+ this.collector.emit(input.getValues());
+ }
+ }
+
+ if(this.autoAck){
+ LOG.debug("[" + this.name + "] ACKing tuple: " + input);
+ this.collector.ack(input);
+ }
+
+ }
+
+ public void cleanup() {
+
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ if(this.declaredFields != null){
+ declarer.declare(this.declaredFields);
+ }
+ }
+
+ public boolean isAutoAck(){
+ return this.autoAck;
+ }
+
+ public void setAutoAck(boolean autoAck){
+ this.autoAck = autoAck;
+ }
+
+}
View
41 storm-jms/examples/src/main/java/backtype/storm/contrib/jms/example/JsonTupleProducer.java
@@ -0,0 +1,41 @@
+package backtype.storm.contrib.jms.example;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+
+import backtype.storm.contrib.jms.JmsTupleProducer;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+
+/**
+ * A simple <code>JmsTupleProducer</code> that expects to receive
+ * JMS <code>TextMessage</code> objects with a body in JSON format.
+ * <p/>
+ * Ouputs a tuple with field name "json" and a string value
+ * containing the raw json.
+ * <p/>
+ * <b>NOTE: </b> Currently this implementation assumes the text is valid
+ * JSON and does not attempt to parse or validate it.
+ *
+ * @author tgoetz
+ *
+ */
+@SuppressWarnings("serial")
+public class JsonTupleProducer implements JmsTupleProducer {
+
+ public Values toTuple(Message msg) throws JMSException {
+ if(msg instanceof TextMessage){
+ String json = ((TextMessage) msg).getText();
+ return new Values(json);
+ } else {
+ return null;
+ }
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("json"));
+ }
+
+}
View
58 storm-jms/examples/src/main/java/backtype/storm/contrib/jms/example/SpringJmsProvider.java
@@ -0,0 +1,58 @@
+package backtype.storm.contrib.jms.example;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+import backtype.storm.contrib.jms.JmsProvider;
+
+
+/**
+ * A <code>JmsProvider</code> that uses the spring framework
+ * to obtain a JMS <code>ConnectionFactory</code> and
+ * <code>Desitnation</code> objects.
+ * <p/>
+ * The constructor takes three arguments:
+ * <ol>
+ * <li>A string pointing to the the spring application context file contining the JMS configuration
+ * (must be on the classpath)
+ * </li>
+ * <li>The name of the connection factory bean</li>
+ * <li>The name of the destination bean</li>
+ * </ol>
+ *
+ *
+ * @author tgoetz
+ *
+ */
+@SuppressWarnings("serial")
+public class SpringJmsProvider implements JmsProvider {
+ private ConnectionFactory connectionFactory;
+ private Destination destination;
+
+ /**
+ * Constructs a <code>SpringJmsProvider</code> object given the name of a
+ * classpath resource (the spring application context file), and the bean
+ * names of a JMS connection factory and destination.
+ *
+ * @param appContextClasspathResource - the spring configuration file (classpath resource)
+ * @param connectionFactoryBean - the JMS connection factory bean name
+ * @param destinationBean - the JMS destination bean name
+ */
+ public SpringJmsProvider(String appContextClasspathResource, String connectionFactoryBean, String destinationBean){
+ ApplicationContext context = new ClassPathXmlApplicationContext(appContextClasspathResource);
+ this.connectionFactory = (ConnectionFactory)context.getBean(connectionFactoryBean);
+ this.destination = (Destination)context.getBean(destinationBean);
+ }
+
+ public ConnectionFactory connectionFactory() throws Exception {
+ return this.connectionFactory;
+ }
+
+ public Destination destination() throws Exception {
+ return this.destination;
+ }
+
+}
View
37 storm-jms/examples/src/main/resources/jms-activemq.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans
+ xmlns="http://www.springframework.org/schema/beans"
+ xmlns:amq="http://activemq.apache.org/schema/core"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+ http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+ <!-- ActiveMQ -->
+
+ <!-- embedded ActiveMQ Broker -->
+ <!-- <amq:broker useJmx="false" persistent="false">
+ <amq:transportConnectors>
+ <amq:transportConnector uri="tcp://localhost:61616" />
+ </amq:transportConnectors>
+ </amq:broker> -->
+
+ <amq:queue id="notificationQueue" physicalName="backtype.storm.contrib.example.queue" />
+
+ <amq:topic id="notificationTopic" physicalName="backtype.storm.contrib.example.topic" />
+
+ <amq:connectionFactory id="jmsConnectionFactory"
+ brokerURL="tcp://localhost:61616" />
+
+ <!-- <bean id="queueTemplate" class="org.springframework.jms.core.JmsTemplate">
+ <property name="connectionFactory">
+ <ref bean="jmsConnectionFactory" />
+ </property>
+ <property name="pubSubDomain" value="false" />
+ </bean> -->
+
+</beans>
+
+
+
+
+
View
13 storm-jms/examples/src/main/resources/log4j.properties
@@ -0,0 +1,13 @@
+log4j.rootLogger=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+
+log4j.appender.stdout.layout.ConversionPattern=%5p (%C:%L) - %m%n
+
+
+log4j.logger.backtype.storm.contrib=DEBUG
+log4j.logger.clojure.contrib=WARN
+log4j.logger.org.springframework=WARN
+log4j.logger.org.apache.zookeeper=WARN
+
View
69 storm-jms/pom.xml
@@ -0,0 +1,69 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <parent>
+ <groupId>org.sonatype.oss</groupId>
+ <artifactId>oss-parent</artifactId>
+ <version>7</version>
+ </parent>
+
+
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>com.github.ptgoetz</groupId>
+ <artifactId>storm-jms</artifactId>
+ <version>0.1.1-SNAPSHOT</version>
+ <name>Storm JMS</name>
+ <description>Storm JMS Components</description>
+
+
+ <licenses>
+ <license>
+ <name>Eclipse Public License - v 1.0</name>
+ <url>http://www.eclipse.org/legal/epl-v10.html</url>
+ <distribution>repo</distribution>
+ </license>
+ </licenses>
+ <scm>
+ <connection>scm:git:git@github.com:ptgoetz/storm-jms.git</connection>
+ <developerConnection>scm:git:git@github.com:ptgoetz/storm-jms.git</developerConnection>
+ <url>:git@github.com:ptgoetz/storm-jms.git</url>
+ </scm>
+
+ <developers>
+ <developer>
+ <id>ptgoetz</id>
+ <name>P. Taylor Goetz</name>
+ <email>ptgoetz@gmail.com</email>
+ </developer>
+ </developers>
+
+
+ <properties>
+ <storm.version>0.6.2</storm.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>storm</groupId>
+ <artifactId>storm</artifactId>
+ <version>${storm.version}</version>
+ <!-- keep storm out of the jar-with-dependencies -->
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-jms_1.1_spec</artifactId>
+ <version>1.1.1</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
View
33 storm-jms/src/main/java/backtype/storm/contrib/jms/JmsMessageProducer.java
@@ -0,0 +1,33 @@
+package backtype.storm.contrib.jms;
+
+import java.io.Serializable;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+/**
+ * JmsMessageProducer implementations are responsible for translating
+ * a <code>backtype.storm.tuple.Values</code> instance into a
+ * <code>javax.jms.Message</code> object.
+ * <p/>
+ *
+ *
+ * @author P. Taylor Goetz
+ *
+ */
+public interface JmsMessageProducer extends Serializable{
+
+ /**
+ * Translate a <code>backtype.storm.tuple.Tuple</code> object
+ * to a <code>javax.jms.Message</code object.
+ *
+ * @param session
+ * @param input
+ * @return
+ * @throws JMSException
+ */
+ public Message toMessage(Session session, Tuple input) throws JMSException;
+}
View
30 storm-jms/src/main/java/backtype/storm/contrib/jms/JmsProvider.java
@@ -0,0 +1,30 @@
+package backtype.storm.contrib.jms;
+
+import java.io.Serializable;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+/**
+ * A <code>JmsProvider</code> object encapsulates the <code>ConnectionFactory</code>
+ * and <code>Destination</code> JMS objects the <code>JmsSpout</code> needs to manage
+ * a topic/queue connection over the course of it's lifecycle.
+ *
+ * @author P. Taylor Goetz
+ *
+ */
+public interface JmsProvider extends Serializable{
+ /**
+ * Provides the JMS <code>ConnectionFactory</code>
+ * @return the connection factory
+ * @throws Exception
+ */
+ public ConnectionFactory connectionFactory() throws Exception;
+
+ /**
+ * Provides the <code>Destination</code> (topic or queue) from which the
+ * <code>JmsSpout</code> will receive messages.
+ * @return
+ * @throws Exception
+ */
+ public Destination destination() throws Exception;
+}
View
41 storm-jms/src/main/java/backtype/storm/contrib/jms/JmsTupleProducer.java
@@ -0,0 +1,41 @@
+package backtype.storm.contrib.jms;
+
+import java.io.Serializable;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Values;
+
+/**
+ * Interface to define classes that can produce a Storm <code>Values</code> objects
+ * from a <code>javax.jms.Message</code> object>.
+ * <p/>
+ * Implementations are also responsible for declaring the output
+ * fields they produce.
+ * <p/>
+ * If for some reason the implementation can't process a message
+ * (for example if it received a <code>javax.jms.ObjectMessage</code>
+ * when it was expecting a <code>javax.jms.TextMessage</code> it should
+ * return <code>null</code> to indicate to the <code>JmsSpout</code> that
+ * the message could not be processed.
+ *
+ * @author P. Taylor Goetz
+ *
+ */
+public interface JmsTupleProducer extends Serializable{
+ /**
+ * Process a JMS message object to create a Values object.
+ * @param msg - the JMS message
+ * @return the Values tuple, or null if the message couldn't be processed.
+ * @throws JMSException
+ */
+ Values toTuple(Message msg) throws JMSException;
+
+ /**
+ * Declare the output fields produced by this JmsTupleProducer.
+ * @param declarer The OuputFieldsDeclarer for the spout.
+ */
+ void declareOutputFields(OutputFieldsDeclarer declarer);
+}
View
197 storm-jms/src/main/java/backtype/storm/contrib/jms/bolt/JmsBolt.java
@@ -0,0 +1,197 @@
+package backtype.storm.contrib.jms.bolt;
+
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.contrib.jms.JmsMessageProducer;
+import backtype.storm.contrib.jms.JmsProvider;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+
+/**
+ * A JmsBolt receives <code>backtype.storm.tuple.Tuple</code> objects from a Storm
+ * topology and publishes JMS Messages to a destination (topic or queue).
+ * <p/>
+ * To use a JmsBolt in a topology, the following must be supplied:
+ * <ol>
+ * <li>A <code>JmsProvider</code> implementation.</li>
+ * <li>A <code>JmsMessageProducer</code> implementation.</li>
+ * </ol>
+ * The <code>JmsProvider</code> provides the JMS <code>javax.jms.ConnectionFactory</code>
+ * and <code>javax.jms.Destination</code> objects requied to publish JMS messages.
+ * <p/>
+ * The JmsBolt uses a <code>JmsMessageProducer</code> to translate
+ * <code>backtype.storm.tuple.Tuple</code> objects into
+ * <code>javax.jms.Message</code> objects for publishing.
+ * <p/>
+ * Both JmsProvider and JmsMessageProducer must be set, or the bolt will
+ * fail upon deployment to a cluster.
+ * <p/>
+ * The JmsBolt is typically an endpoint in a topology -- in other words
+ * it does not emit any tuples.
+ *
+ *
+ * @author P. Taylor Goetz
+ *
+ */
+public class JmsBolt implements IRichBolt {
+ private static Logger LOG = LoggerFactory.getLogger(JmsBolt.class);
+
+ private boolean autoAck = true;
+
+ // javax.jms objects
+ private Connection connection;
+ private Session session;
+ private MessageProducer messageProducer;
+
+ // JMS options
+ private boolean jmsTransactional = false;
+ private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+
+
+ private JmsProvider jmsProvider;
+ private JmsMessageProducer producer;
+
+
+ private OutputCollector collector;
+
+ /**
+ * Set the JmsProvider used to connect to the JMS destination topic/queue
+ * @param provider
+ */
+ public void setJmsProvider(JmsProvider provider){
+ this.jmsProvider = provider;
+ }
+
+ /**
+ * Set the JmsMessageProducer used to convert tuples
+ * into JMS messages.
+ *
+ * @param producer
+ */
+ public void setJmsMessageProducer(JmsMessageProducer producer){
+ this.producer = producer;
+ }
+
+ /**
+ * Sets the JMS acknowledgement mode for JMS messages sent
+ * by this bolt.
+ * <p/>
+ * Possible values:
+ * <ul>
+ * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li>
+ * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li>
+ * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li>
+ * </ul>
+ * @param acknowledgeMode (constant defined in javax.jms.Session)
+ */
+ public void setJmsAcknowledgeMode(int acknowledgeMode){
+ this.jmsAcknowledgeMode = acknowledgeMode;
+ }
+
+ /**
+ * Set the JMS transactional setting for the JMS session.
+ *
+ * @param transactional
+ */
+// public void setJmsTransactional(boolean transactional){
+// this.jmsTransactional = transactional;
+// }
+
+ /**
+ * Sets whether or not tuples should be acknowledged by this
+ * bolt.
+ * <p/>
+ * @param autoAck
+ */
+ public void setAutoAck(boolean autoAck){
+ this.autoAck = autoAck;
+ }
+
+
+ /**
+ * Consumes a tuple and sends a JMS message.
+ * <p/>
+ * If autoAck is true, the tuple will be acknowledged
+ * after the message is sent.
+ * <p/>
+ * If JMS sending fails, the tuple will be failed.
+ */
+ @Override
+ public void execute(Tuple input) {
+ // write the tuple to a JMS destination...
+ LOG.debug("Tuple received. Sending JMS message.");
+
+ try {
+ Message msg = this.producer.toMessage(this.session, input);
+ if(msg != null){
+ this.messageProducer.send(msg);
+ }
+ if(this.autoAck){
+ LOG.debug("ACKing tuple: " + input);
+ this.collector.ack(input);
+ }
+ } catch (JMSException e) {
+ // failed to send the JMS message, fail the tuple fast
+ LOG.warn("Failing tuple: " + input);
+ LOG.warn("Exception: ", e);
+ this.collector.fail(input);
+ }
+ }
+
+ /**
+ * Releases JMS resources.
+ */
+ @Override
+ public void cleanup() {
+ try {
+ LOG.debug("Closing JMS connection.");
+ this.session.close();
+ this.connection.close();
+ } catch (JMSException e) {
+ LOG.warn("Error closing JMS connection.", e);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ }
+
+ /**
+ * Initializes JMS resources.
+ */
+ @Override
+ public void prepare(Map stormConf, TopologyContext context,
+ OutputCollector collector) {
+ if(this.jmsProvider == null || this.producer == null){
+ throw new IllegalStateException("JMS Provider and MessageProducer not set.");
+ }
+ this.collector = collector;
+ LOG.debug("Connecting JMS..");
+ try {
+ ConnectionFactory cf = this.jmsProvider.connectionFactory();
+ Destination dest = this.jmsProvider.destination();
+ this.connection = cf.createConnection();
+ this.session = connection.createSession(this.jmsTransactional,
+ this.jmsAcknowledgeMode);
+ this.messageProducer = session.createProducer(dest);
+
+ connection.start();
+ } catch (Exception e) {
+ LOG.warn("Error creating JMS connection.", e);
+ }
+ }
+}
View
312 storm-jms/src/main/java/backtype/storm/contrib/jms/spout/JmsSpout.java
@@ -0,0 +1,312 @@
+package backtype.storm.contrib.jms.spout;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.contrib.jms.JmsProvider;
+import backtype.storm.contrib.jms.JmsTupleProducer;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+
+/**
+ * A Storm <code>Spout</code> implementation that listens to a JMS topic or queue
+ * and outputs tuples based on the messages it receives.
+ * <p/>
+ * <code>JmsSpout</code> instances rely on <code>JmsProducer</code> implementations
+ * to obtain the JMS <code>ConnectionFactory</code> and <code>Destination</code> objects
+ * necessary to connect to a JMS topic/queue.
+ * <p/>
+ * When a <code>JmsSpout</code> receives a JMS message, it delegates to an
+ * internal <code>JmsTupleProducer</code> instance to create a Storm tuple from the
+ * incoming message.
+ * <p/>
+ * Typically, developers will supply a custom <code>JmsTupleProducer</code> implementation
+ * appropriate for the expected message content.
+ *
+ * @author P. Taylor Goetz
+ *
+ */
+@SuppressWarnings("serial")
+public class JmsSpout implements IRichSpout, MessageListener {
+ private static final Logger LOG = LoggerFactory.getLogger(JmsSpout.class);
+
+ // JMS options
+ private boolean jmsTransactional = false;
+ private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+
+ private boolean distributed = true;
+
+ private JmsTupleProducer tupleProducer;
+
+ private JmsProvider jmsProvider;
+
+ private LinkedBlockingQueue<Message> queue;
+ private ConcurrentHashMap<String, Message> pendingMessages;
+
+ private SpoutOutputCollector collector;
+
+ private transient Connection connection;
+ private transient Session session;
+
+ /**
+ * Sets the JMS Session acknowledgement mode for the JMS seesion associated with this spout.
+ * <p/>
+ * Possible values:
+ * <ul>
+ * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li>
+ * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li>
+ * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li>
+ * </ul>
+ * @param mode JMS Session Acknowledgement mode
+ * @throws IllegalArgumentException if the mode is not recognized.
+ */
+ public void setJmsAcknowledgeMode(int mode){
+ switch (mode) {
+ case Session.AUTO_ACKNOWLEDGE:
+ case Session.CLIENT_ACKNOWLEDGE:
+ case Session.DUPS_OK_ACKNOWLEDGE:
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown Acknowledge mode: " + mode + " (See javax.jms.Session for valid values)");
+
+ }
+ this.jmsAcknowledgeMode = mode;
+ }
+
+ /**
+ * Returns the JMS Session acknowledgement mode for the JMS seesion associated with this spout.
+ * @return
+ */
+ public int getJmsAcknowledgeMode(){
+ return this.jmsAcknowledgeMode;
+ }
+
+ /**
+ * Set whether this Spout uses the JMS transactional model by defualt.
+ * <p/>
+ * If <code>true</code> the spout will always request acks from downstream
+ * bolts, using the incoming JMS message ID as the Storm message ID.
+ * <p/>
+ * If <code>false</code> the spout will request acks from downstream bolts
+ * only if the spout's JmsAcknowledgeMode is <b><i>not</i></b> AUTO_ACKNOWLEDGE
+ * and the JMS message DeliveryMode is <b><i>not</i></b> AUTO_ACKNOWLEDGE.
+ * <p/>
+ * If the spout determines that a JMS message should be handled transactionally
+ * (i.e. acknowledged in JMS terms), it will be JMS-acknowledged in the spout's
+ * <code>ack()</code> method.
+ * <p/>
+ * Otherwise, if a downstream spout that has anchored on one of this spouts tuples
+ * fails to acknowledge an emitted tuple, the JMS message will not be not be acknowledged,
+ * and potentially be set for retransmission, depending on the underlying JMS implementation
+ * and configuration.
+ *
+ * @param transactional
+ */
+ public void setJmsTransactional(boolean transactional){
+ this.jmsTransactional = transactional;
+ }
+ public boolean isJmsTransaction(){
+ return this.jmsTransactional;
+ }
+ /**
+ * Set the <code>backtype.storm.contrib.jms.JmsProvider</code>
+ * implementation that this Spout will use to connect to
+ * a JMS <code>javax.jms.Desination</code>
+ *
+ * @param provider
+ */
+ public void setJmsProvider(JmsProvider provider){
+ this.jmsProvider = provider;
+ }
+ /**
+ * Set the <code>backtype.storm.contrib.jms.JmsTupleProducer</code>
+ * implementation that will convert <code>javax.jms.Message</code>
+ * object to <code>backtype.storm.tuple.Values</code> objects
+ * to be emitted.
+ *
+ * @param producer
+ */
+ public void setJmsTupleProducer(JmsTupleProducer producer){
+ this.tupleProducer = producer;
+ }
+
+ /**
+ * <code>javax.jms.MessageListener</code> implementation.
+ * <p/>
+ * Stored the JMS message in an internal queue for processing
+ * by the <code>nextTuple()</code> method.
+ */
+ public void onMessage(Message msg) {
+ this.queue.offer(msg);
+
+ }
+
+ /**
+ * <code>ISpout</code> implementation.
+ * <p/>
+ * Connects the JMS spout to the configured JMS destination
+ * topic/queue.
+ *
+ */
+ @SuppressWarnings("rawtypes")
+ public void open(Map conf, TopologyContext context,
+ SpoutOutputCollector collector) {
+ if(this.jmsProvider == null){
+ throw new IllegalStateException("JMS provider has not been set.");
+ }
+ if(this.tupleProducer == null){
+ throw new IllegalStateException("JMS Tuple Producer has not been set.");
+ }
+ queue = new LinkedBlockingQueue<Message>();
+ this.pendingMessages = new ConcurrentHashMap<String, Message>();
+ this.collector = collector;
+ try {
+ ConnectionFactory cf = this.jmsProvider.connectionFactory();
+ Destination dest = this.jmsProvider.destination();
+ this.connection = cf.createConnection();
+ this.session = connection.createSession(this.jmsTransactional,
+ this.jmsAcknowledgeMode);
+ MessageConsumer consumer = session.createConsumer(dest);
+ consumer.setMessageListener(this);
+ connection.start();
+
+ } catch (Exception e) {
+ LOG.warn("Error creating JMS connection.", e);
+ }
+
+ }
+
+ public void close() {
+ try {
+ LOG.debug("Closing JMS connection.");
+ this.session.close();
+ this.connection.close();
+ } catch (JMSException e) {
+ LOG.warn("Error closing JMS connection.", e);
+ }
+
+ }
+
+ public void nextTuple() {
+ Message msg = this.queue.poll();
+ if (msg == null) {
+ Utils.sleep(50);
+ } else {
+
+ LOG.debug("sending tuple: " + msg);
+ // get the tuple from the handler
+ try {
+ Values vals = this.tupleProducer.toTuple(msg);
+ // if we're transactional, always ack, otherwise
+ // ack if we're not in AUTO_ACKNOWLEDGE mode, or the message requests ACKNOWLEDGE
+ LOG.debug("Requested deliveryMode: " + toDeliveryModeString(msg.getJMSDeliveryMode()));
+ LOG.debug("Our deliveryMode: " + toDeliveryModeString(this.jmsAcknowledgeMode));
+ if (this.jmsTransactional
+ || (this.jmsAcknowledgeMode != Session.AUTO_ACKNOWLEDGE)
+ || (msg.getJMSDeliveryMode() != Session.AUTO_ACKNOWLEDGE)) {
+ LOG.debug("Requesting acks.");
+ this.collector.emit(vals, msg.getJMSMessageID());
+
+ // at this point we successfully emitted. Store
+ // the message and message ID so we can do a
+ // JMS acknowledge later
+ this.pendingMessages.put(msg.getJMSMessageID(), msg);
+ } else {
+ this.collector.emit(vals);
+ }
+ } catch (JMSException e) {
+ LOG.warn("Unable to convert JMS message: " + msg);
+ }
+
+ }
+
+ }
+
+ /*
+ * Will only be called if we're transactional or not AUTO_ACKNOWLEDGE
+ */
+ public void ack(Object msgId) {
+
+ Message msg = this.pendingMessages.remove(msgId);
+ if (msg != null) {
+ try {
+ msg.acknowledge();
+ LOG.debug("JMS Message acked: " + msgId);
+ } catch (JMSException e) {
+ LOG.warn("Error acknowldging JMS message: " + msgId, e);
+ }
+ } else {
+ LOG.warn("Couldn't acknowledge unknown JMS message ID: " + msgId);
+ }
+
+ }
+
+ /*
+ * Will only be called if we're transactional or not AUTO_ACKNOWLEDGE
+ */
+ public void fail(Object msgId) {
+ LOG.debug("Message failed: " + msgId);
+ this.pendingMessages.remove(msgId);
+
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ this.tupleProducer.declareOutputFields(declarer);
+
+ }
+
+ public boolean isDistributed() {
+ return this.distributed;
+ }
+
+ /**
+ * Sets the "distributed" mode of this spout.
+ * <p/>
+ * If <code>true</code> multiple instances of this spout <i>may</i> be
+ * created across the cluster (depending on the "parallelism_hint" in the topology configuration).
+ * <p/>
+ * Setting this value to <code>false</code> essentially means this spout will run as a singleton
+ * within the cluster ("parallelism_hint" will be ignored).
+ * <p/>
+ * In general, this should be set to <code>false</code> if the underlying JMS destination is a
+ * topic, and <code>true</code> if it is a JMS queue.
+ *
+ * @param distributed
+ */
+ public void setDistributed(boolean distributed){
+ this.distributed = distributed;
+ }
+
+
+ private static final String toDeliveryModeString(int deliveryMode) {
+ switch (deliveryMode) {
+ case Session.AUTO_ACKNOWLEDGE:
+ return "AUTO_ACKNOWLEDGE";
+ case Session.CLIENT_ACKNOWLEDGE:
+ return "CLIENT_ACKNOWLEDGE";
+ case Session.DUPS_OK_ACKNOWLEDGE:
+ return "DUPS_OK_ACKNOWLEDGE";
+ default:
+ return "UNKNOWN";
+
+ }
+ }
+
+}

0 comments on commit ddbc242

Please sign in to comment.
Something went wrong with that request. Please try again.