Permalink
Browse files

Working version of the WordCountTopology with

  • Loading branch information...
1 parent 3f26151 commit 84dbd528798675b7ecb7e24b0e258e684cb4a05f @kitmenke committed Aug 6, 2015
View
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+ <classpathentry kind="src" output="target/classes" path="src/main/java">
+ <attributes>
+ <attribute name="optional" value="true"/>
+ <attribute name="maven.pomderived" value="true"/>
+ </attributes>
+ </classpathentry>
+ <classpathentry excluding="**" kind="src" output="target/classes" path="multilang">
+ <attributes>
+ <attribute name="maven.pomderived" value="true"/>
+ </attributes>
+ </classpathentry>
+ <classpathentry kind="src" output="target/test-classes" path="src/test/java">
+ <attributes>
+ <attribute name="optional" value="true"/>
+ <attribute name="maven.pomderived" value="true"/>
+ </attributes>
+ </classpathentry>
+ <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.7">
+ <attributes>
+ <attribute name="maven.pomderived" value="true"/>
+ </attributes>
+ </classpathentry>
+ <classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
+ <attributes>
+ <attribute name="maven.pomderived" value="true"/>
+ </attributes>
+ </classpathentry>
+ <classpathentry kind="output" path="target/classes"/>
+</classpath>
View
@@ -0,0 +1,32 @@
+*.pydevproject
+.metadata
+.gradle
+bin/
+test-output/
+tmp/
+target/
+*.tmp
+*.bak
+*.swp
+*~.nib
+local.properties
+.settings/
+.loadpath
+
+# External tool builders
+.externalToolBuilders/
+
+# Locally stored "Eclipse launch configurations"
+*.launch
+
+# CDT-specific
+.cproject
+
+# PDT-specific
+.buildpath
+
+# sbteclipse plugin
+.target
+
+# TeXlipse plugin
+.texlipse
View
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+ <name>HelloStorm</name>
+ <comment></comment>
+ <projects>
+ </projects>
+ <buildSpec>
+ <buildCommand>
+ <name>org.eclipse.jdt.core.javabuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ <buildCommand>
+ <name>org.eclipse.m2e.core.maven2Builder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ </buildSpec>
+ <natures>
+ <nature>org.eclipse.m2e.core.maven2Nature</nature>
+ <nature>org.eclipse.jdt.core.javanature</nature>
+ </natures>
+</projectDescription>
View
@@ -0,0 +1,149 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>com.kitmenke.storm</groupId>
+ <artifactId>storm-stlhug-demo</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <packaging>jar</packaging>
+
+ <name>storm-stlhug-demo</name>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <storm.version>0.10.0.2.3.0.0-2557</storm.version>
+ </properties>
+
+ <repositories>
+ <repository>
+ <id>clojars.org</id>
+ <url>http://clojars.org/repo</url>
+ </repository>
+ <repository>
+ <id>HortonworksRepo</id>
+ <name>Hortonworks Repo</name>
+ <url>http://repo.hortonworks.com/content/repositories/releases/</url>
+ </repository>
+ </repositories>
+
+ <profiles>
+ <profile>
+ <id>local</id>
+ <activation>
+ <property>
+ <name>deployTo</name>
+ <value>local</value>
+ </property>
+ </activation>
+ <dependencies>
+ </dependencies>
+ </profile>
+ </profiles>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${storm.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <version>6.8.5</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>1.9.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.5</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ <version>1.7.5</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+
+ <sourceDirectory>src/main/java</sourceDirectory>
+ <testSourceDirectory>src/test/java</testSourceDirectory>
+
+ <!-- tell maven which directory should be used for config and resource
+ files -->
+ <resources>
+ <resource>
+ <directory>${basedir}/multilang</directory>
+ </resource>
+ </resources>
+
+ <plugins>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>1.4</version>
+ <configuration>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass></mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- tell maven which JDK we want to use, in this case the HDP 2.3 sandbox uses 1.7 -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.3.2</version>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>1.2.1</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <executable>java</executable>
+ <includeProjectDependencies>true</includeProjectDependencies>
+ <includePluginDependencies>false</includePluginDependencies>
+ <classpathScope>compile</classpathScope>
+ <mainClass>${storm.topology}</mainClass>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
View
@@ -0,0 +1,37 @@
+Storm Demo - St. Louis Hadoop User Group
+----------------------------------------
+
+A word count topology originally forked from the [storm-starter](https://github.com/apache/storm/tree/master/examples/storm-starter) project and modified for a St. Louis Hadoop User Group presentation.
+
+Outline:
+
+1. RandomSentenceSpout: emits random sentence tuples
+1. SplitSentenceBolt: splits each sentence into word tuples
+1. WordCountBolt: keeps track of counts for each word and emits (word, count) tuples
+1. OutputBolt: LOG the current word and the count
+
+Running the topology in local mode
+----------------------------------
+
+Assuming you're using Eclipse and you're able to open the project without errors.
+
+1. Open com.kitmenke.storm.WordCountTopology
+1. Right click on the class, Debug As -> Java Application
+
+Running the topology on a cluster
+---------------------------------
+
+For testing I'm using the Hortonworks HDP 2.3 Sandbox.
+
+1. Build the project using `mvn clean package`
+1. Upload the jar to a node in the cluster which has the storm client
+1. Submit the topology: `storm jar storm-stlhug-demo-0.0.1-SNAPSHOT.jar com.kitmenke.storm.WordCountTopology WordCountTopology`
+
+Environment
+-----------
+
+I developed this topology on Windows 10 using the following:
+
+- Eclipse Mars IDE for Java Developers (includes maven and git)
+- [TestNG plugin](http://testng.org/doc/download.html for Eclipse)
+- Oracle Virtual Box + [HortonWorks HDP 2.3 Sandbox](Eclipse IDE for Java Developers) (not required for running storm in local mode)
@@ -0,0 +1,61 @@
+package com.kitmenke.storm;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.kitmenke.storm.bolt.SplitSentenceBolt;
+import com.kitmenke.storm.bolt.WordCountBolt;
+import com.kitmenke.storm.bolt.OutputBolt;
+import com.kitmenke.storm.spout.RandomSentenceSpout;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.topology.TopologyBuilder;
+
+/**
+ * WordCountTopology
+ */
+public class WordCountTopology {
+ private static final Logger LOG = LoggerFactory.getLogger(WordCountTopology.class);
+
+ public static void main(String[] args) throws Exception {
+
+ LOG.info("Setting up WordCountTopology");
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("1-spout", new RandomSentenceSpout(), 1);
+ builder.setBolt("2-split-sentence", new SplitSentenceBolt(), 1).shuffleGrouping("1-spout");
+ builder.setBolt("3-count-words", new WordCountBolt(), 1).shuffleGrouping("2-split-sentence");
+ builder.setBolt("4-output", new OutputBolt(), 1).shuffleGrouping("3-count-words");
+
+ Config conf = new Config();
+
+
+ if (args != null && args.length > 0) {
+ LOG.info("Submitting topology with name {}", args[0]);
+ //List<String> seeds = new ArrayList<String>();
+ //seeds.add("sandbox.hortonworks.com");
+ //conf.put(Config.NIMBUS_SEEDS, seeds);
+ //conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
+ conf.setNumWorkers(3);
+
+ StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
+ } else {
+ // turning debug on prints out when tuples are emitted
+ //conf.setDebug(true);
+
+ conf.setMaxTaskParallelism(3);
+
+ LocalCluster cluster = new LocalCluster();
+ LOG.info("Submitting local topology, will be shutdown in 60s");
+ cluster.submitTopology("CountingTopology", conf, builder.createTopology());
+
+ Thread.sleep(60000);
+
+ cluster.shutdown();
+ }
+ }
+}
@@ -0,0 +1,39 @@
+package com.kitmenke.storm.bolt;
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+
+public class OutputBolt extends BaseRichBolt {
+ private static final long serialVersionUID = 1L;
+ private static Logger LOG = LoggerFactory.getLogger(OutputBolt.class);
+ OutputCollector _collector;
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context,
+ OutputCollector collector) {
+ _collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ String word = input.getString(0);
+ int count = input.getInteger(1);
+ LOG.info("{} = {}", word, count);
+ _collector.ack(input);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ // TODO Auto-generated method stub
+
+ }
+
+}
Oops, something went wrong.

0 comments on commit 84dbd52

Please sign in to comment.