Skip to content
Tony Arcieri edited this page May 31, 2015 · 16 revisions

This is a short tutorial designed to help you quickly get Turbine set up on your computer and assist you in writing your first stream processing job.

Step 1: Clone Repo

Begin by cloning the Turbine repo from GitHub:

$ git clone https://github.com/tarcieri/turbine.git
Cloning into 'turbine'...
remote: Counting objects: 425, done.
remote: Compressing objects: 100% (85/85), done.
remote: Total 425 (delta 40), reused 0 (delta 0), pack-reused 336
Receiving objects: 100% (425/425), 68.91 KiB | 0 bytes/s, done.
Resolving deltas: 100% (179/179), done.
Checking connectivity... done
$ cd turbine

Step 2: Bundle!

$ bundle
Updating git://github.com/bsm/poseidon_cluster.git
Fetching gem metadata from https://rubygems.org/.........
Fetching version metadata from https://rubygems.org/...
Fetching dependency metadata from https://rubygems.org/..
Resolving dependencies....
[And then a miracle happens...]

Step 3: Install and launch Kafka and Zookeeper

NOTE: the rake tasks to install Kafka and Zookeeper are bundled as part of Turbine and usable within your own project! Please see the Rake Tasks and Kafka Helper section of the Wiki on how to reuse this functionality in your own project.

$ bundle exec rake kafka:start
mkdir -p tmp
*** Downloading Kafka
curl https://www.apache.org/dist/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz -o tmp/kafka_2.10-0.8.2.1.tgz
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 15.4M  100 15.4M    0     0   881k      0  0:00:17  0:00:17 --:--:--  834k
*** Unpacking Kafka
tar -zxf tmp/kafka_2.10-0.8.2.1.tgz
mv kafka_2.10-0.8.2.1 kafka
*** Downloading Zookeeper
curl https://archive.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz -o tmp/zookeeper-3.4.6.tar.gz
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 16.8M  100 16.8M    0     0   110k      0  0:02:35  0:02:35 --:--:--  152k
*** Unpacking Zookeeper
tar -zxf tmp/zookeeper-3.4.6.tar.gz
mv zookeeper-3.4.6 zookeeper
mkdir -p /private/tmp/turbine/zookeeper/data
rm -r /private/tmp/turbine/zookeeper/conf/zoo_sample.cfg
*** Starting Zookeeper
cd zookeeper && bin/zkServer.sh start
JMX enabled by default
Using config: /tmp/turbine/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
*** Starting Kafka
cd kafka && bin/kafka-server-start.sh config/server.properties &
[And then a miracle happens...]
*** Kafka started!

Step 4: Create and populate a Kafka topic

Kafka organizes logs of messages into "topics". Our stream processor will be configured to process one of these topics at a time.

First, let's create a topic. We'll call it stufftodo:

$ kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic stufftodo
Created topic "stufftodo".

Now let's put some messages into the stufftodo topic to process:

$ kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic stufftodo
foo
bar
baz
^D
$

Great! We now have 3 messages in the stufftodo topic to be processed.

If you'd like to do this from Ruby, check out the Poseidon::Producer class from the poseidon gem:

https://github.com/bpot/poseidon

Step 5: Create Turbine processor

Put the following Ruby source code into a file:

require "turbine"
require "turbine/consumer/kafka"

consumer = Turbine::Consumer::Kafka.new(
  "stufftodo-processors", # Group name
  ["localhost:9092"],     # Kafka brokers
  ["localhost:2181"],     # Zookeeper hosts
  "stufftodo"             # Topic name
)

processor = Turbine::Processor.new(
  min_threads: 4,
  max_threads: 4,
  max_queue: 64
)

processor.process(consumer) do |msg|
  p msg
end

Step 6: Run it!

Run your processor for the first time. It should start with the messages in the topic we had queued up from before:

$ bundle exec ruby myprocessor.rb
"foo"
"bar"
"baz"

In a different terminal, launch the console producer, and send your processor some other messages:

$ kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic stufftodo
qux
herp
derp

You should see these messages get processed by your processor:

"qux"
"herp"
"derp"

Now, try hitting ^C on your processor and queueing up some more messages using kafka-console-producer:

$ kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic stufftodo
queue
stuff
up

When you restart your processor, it should pick up where it left off:

$ bundle exec ruby myprocessor.rb
"queue"
"stuff"
"up"