You might also want to check out some other projects which enrich the Wukong and Hadoop experience:
- wukong-hadoop: Run Wukong processors and dataflows as mappers and/or reducers within the Hadoop framework. Model jobs locally before you run them.
- wukong-load: Load the output data from your local Wukong jobs and flows into a variety of different data stores.
- wukong-deploy: Orchestrate Wukong and other wu-tools together to support an application running on the Infochimps Platform.
Installation & Setup
Wukong-Storm can be installed as a RubyGem:
$ sudo gem install wukong-storm
If you actually want to run your dataflows as functioning Storm topologies reading/writing to/from Kafka, you'll of course need access to Storm and Kafka installations. Ironfan is a great tool for building and managing Storm clusters and other distributed infrastructure quickly and easily.
To run Storm jobs through Wukong-Storm, you'll need to move your your Wukong code to each worker of the Storm cluster, install Wukong-Storm on each, and log in and launch your job fron one of them. Ironfan again helps with configuring this.
Anatomy of a running topology
Storm defines the concept of a topology. A topology contains spouts and bolts. A spout is a source of data. A bolt processes data. Bolts can be connected to each other and to spouts in arbitrary ways.
Tooplogies submitted to Storm's Nimbus but run within a Storm supervisor. Each supervisor can dedicate a certain number of workers to a topology. Within each worker, parallelism controls the number of threads the worker assigns to the topology.
Wukong-Storm runs each Wukong dataflow as a single bolt within a single topology. Data is passed to this bolt over STDIN and collected over STDOUT, similar to the way Hadoop streaming operates.
This topology is hooked up to a
storm.kafka.trident.OpaqueTridentKafkaSpout (part of
reads from a single input topic within Kafka.
Output records are written to a default Kafka topic but this can be overridden on a per-record basis.
A Wukong dataflow launched within Storm runs as a single bolt (see
This bolt works by launching an arbitrary command-line and sending it
records over STDIN and reading its output over STDOUT. The
SubprocessFunction class expects whatever command it launched to
obey a protocol under which the output after each input consists
of each output record followed by a newline, with the full batch of
output records followed by a batch terminator (default:
Wukong-Storm comes with a command
wu-bolt which works very similarly
wu-local but implements this protocol. Here's an example of
wu-bolt directly with a processor:
$ echo 2 | wu-bolt prime_factorizer.rb 2 --- $ echo 12 | wu-bolt prime_factorizer.rb 2 2 3 --- $ echo 19 | wu-bolt prime_factorizer.rb ---
Notice that in the last example, the presence of the batch delimiter after each input record make it easy to tell the difference between "no output records" and "no output records yet" which, over STDIN/STDOUT, is rather hard to tell otherwise.
Running a dataflow
A simple processor
Assuming you have correctly installed Wukong-Storm, Storm, Kafka, Zookeeper, &c., and you have defined a simple dataflow (or in this case, just a single processor) like this:
# in upcaser.rb Wukong.processor(:upcaser) do def process line yield line.upcase end end
Then you can launch it directly into Storm:
$ wu-storm upcaser.rb --input=some_input_topic --output=some_output_topic
If a topology named
upcaser already exists, you'll get an error.
--rm flag to first kill the running topology before
launching the new one:
$ wu-storm upcaser.rb --input=some_input_topic --output=some_output_topic --rm
The default amount of time to wait for the topology to die is 300
seconds (5 minutes), just like the
storm kill command (which is used
under the hood). When debugging a topology in development, it's
helpful to add
--wait=1 to immediately kill the topology.
See exactly what happened behind the scenes by adding the
flag which will print commands and not execute them:
$ wu-storm upcaser.rb --input=some_input_topic --output=some_output_topic --rm --dry_run
A more complicated example
Say you have a dataflow:
# in my_flow.rb Wukong.dataflow(:my_flow) do my_parser | does_something | then_something_else | to_json end
You can launch it using a different topology name as well as target arbitrary locations for your Zookeeper, Kafka, and Storm servers:
$ wu-storm my_flow.rb --name=my_flow_attempt_3 --zookeeper_hosts=10.121.121.121,10.122.122.122 --kafka_hosts=10.123.123.123 --nimbus_host=10.124.124.124 --input=some_input_topic --output=some_output_topic
Running non-Wukong or non-Ruby code
You can also use Wukong-Storm as a harness to run non-Wukong or
non-Ruby code. As long as you can specificy a command-line to run
which supports the communication protocol, then you can
run it with
$ wu-storm --bolt_command='my_cmd --some-option=value -af -q 3' --input=some_input_topic --output=some_output_topic
Storm provides several options for scaling up or down a topology. Wukong-Storm makes them accessible at launch time via the following options:
--workersspecify the number of workers (a.k.a. "executors" or "slots") for the topology. Defaults to 1.
--input_parallelismspecify the number of threads within the spout reading from Kafka within each worker. Defaults to 1.
--parallelismspecify the number of threads within the bolt running Wukong code within each worker. Defaults to 1.