Skip to content

Commit

Permalink
Move KafkaHelper and rake tasks to lib/turbine
Browse files Browse the repository at this point in the history
  • Loading branch information
tarcieri committed May 31, 2015
1 parent 20f4629 commit 2c59b2b
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 144 deletions.
87 changes: 87 additions & 0 deletions lib/turbine/kafka_helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
require "open3"
require "poseidon"

module Turbine
# Helper functions for integration testing with Kafka
module KafkaHelper
extend self

ZOOKEEPER_ADDR = "localhost:2181"
KAFKA_ADDR = "localhost:9092"

def delete_topic(topic)
log "*** Deleting Kafka topic: #{topic}"

topic_command :delete, topic: topic
end

def create_topic(topic)
log "*** Creating Kafka topic: #{topic}"

required_topic_command :create,
"replication-factor" => 1,
"partitions" => 1,
"topic" => topic
end

def list_topics
topic_command(:list).split("\n")
end

def topic_exists?(topic)
list_topics.include?(topic)
end

def fill_topic(topic, n = 100_000)
fail ArgumentError, "min messages is 1000" if n < 1000

producer = Poseidon::Producer.new([KAFKA_ADDR], "my_test_producer", type: :sync)

log "*** Filling topic with #{n} messages: #{topic}"

(n / 1000).times do |i|
messages = []

1000.times do |j|
n = (i * 1000 + j)
messages << Poseidon::MessageToSend.new(topic, n.to_s)
end

producer.send_messages(messages)
end
ensure
producer.close if producer
end

private

def kafka_path
ENV["KAFKA_PATH"] || File.expand_path("../../../kafka", __FILE__)
end

def kafka_topics_bin_path
"#{kafka_path}/bin/kafka-topics.sh"
end

def kafka_args(args = {})
{ zookeeper: ZOOKEEPER_ADDR }.merge(args).map { |k, v| "--#{k} #{v}" }.join(" ")
end

def topic_command(command, args = {})
cmd = "#{kafka_topics_bin_path} --#{command} #{kafka_args(args)}"
stdout_str, _stderr_str, status = Open3.capture3(cmd)
return unless status.success?
stdout_str
end

def required_topic_command(command, args = {})
result = topic_command(command, args)
fail "Kafka command failed!" unless result
true
end

def log(message)
STDERR.puts(message)
end
end
end
55 changes: 55 additions & 0 deletions lib/turbine/rake_tasks.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
require "rake/clean"
require "colorize"
require "socket"
require "timeout"

KAFKA_PORT = 9092
START_TIMEOUT = 10

namespace :kafka do
KAFKA_VERSION = "0.8.2.1"
KAFKA_TARBALL = "kafka_2.10-#{KAFKA_VERSION}.tgz"

task download: "tmp/#{KAFKA_TARBALL}"
directory "tmp"

file "tmp/#{KAFKA_TARBALL}" => "tmp" do
puts "#{'***'.blue} #{'Downloading Kafka'.light_white}"
url = "https://www.apache.org/dist/kafka/#{KAFKA_VERSION}/kafka_2.10-#{KAFKA_VERSION}.tgz"
sh "curl #{url} -o tmp/#{KAFKA_TARBALL}"
end

task install: :download do
puts "#{'***'.blue} #{'Unpacking Kafka'.light_white}"

rm_rf "kafka" if File.exist? "kafka"
sh "tar -zxf tmp/#{KAFKA_TARBALL}"
mv "kafka_2.10-#{KAFKA_VERSION}", "kafka"
end

task start: %w(kafka zookeeper:start) do
puts "#{'***'.blue} #{'Starting Kafka'.light_white}"
sh "cd kafka && bin/kafka-server-start.sh config/server.properties &"

Timeout.timeout(START_TIMEOUT) do
begin
socket = TCPSocket.open("localhost", 9092)
rescue Errno::ECONNREFUSED
sleep 0.01
retry
end

socket.close
end

# Give Kafka some time to finish printing startup messages
sleep 0.5
puts "#{'***'.blue} #{'Kafka started!'.light_white}"
end
end

file "kafka" do
Rake::Task["kafka:install"].invoke
end

CLEAN.include "tmp", "kafka"
85 changes: 0 additions & 85 deletions lib/turbine/rspec/kafka_helper.rb

This file was deleted.

8 changes: 4 additions & 4 deletions spec/turbine/consumer/kafka_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
require "spec_helper"
require "turbine/consumer/kafka"
require "turbine/rspec/kafka_helper"
require "turbine/kafka_helper"
require "benchmark"

RSpec.describe Turbine::Consumer::Kafka do
Expand All @@ -27,12 +27,12 @@ def with_consumer
timestamp = Time.now.strftime("%Y%m%d%H%M%S%L")

@example_topic = "turbike-kafka-specs-#{timestamp}"
KafkaHelper.create_topic(@example_topic)
KafkaHelper.fill_topic(@example_topic, MESSAGE_COUNT)
Turbine::KafkaHelper.create_topic(@example_topic)
Turbine::KafkaHelper.fill_topic(@example_topic, MESSAGE_COUNT)
end

after :all do
KafkaHelper.delete_topic(@example_topic)
Turbine::KafkaHelper.delete_topic(@example_topic)
end

it "fetches batches of messages" do
Expand Down
56 changes: 1 addition & 55 deletions tasks/kafka.rake
Original file line number Diff line number Diff line change
@@ -1,55 +1 @@
require "rake/clean"
require "colorize"
require "socket"
require "timeout"

KAFKA_PORT = 9092
START_TIMEOUT = 5

namespace :kafka do
KAFKA_VERSION = "0.8.2.1"
KAFKA_TARBALL = "kafka_2.10-#{KAFKA_VERSION}.tgz"

task download: "tmp/#{KAFKA_TARBALL}"
directory "tmp"

file "tmp/#{KAFKA_TARBALL}" => "tmp" do
puts "#{'***'.blue} #{'Downloading Kafka'.light_white}"
url = "https://www.apache.org/dist/kafka/#{KAFKA_VERSION}/kafka_2.10-#{KAFKA_VERSION}.tgz"
sh "curl #{url} -o tmp/#{KAFKA_TARBALL}"
end

task install: :download do
puts "#{'***'.blue} #{'Unpacking Kafka'.light_white}"

rm_rf "kafka" if File.exist? "kafka"
sh "tar -zxf tmp/#{KAFKA_TARBALL}"
mv "kafka_2.10-#{KAFKA_VERSION}", "kafka"
end

task start: %w(kafka zookeeper:start) do
puts "#{'***'.blue} #{'Starting Kafka'.light_white}"
sh "cd kafka && bin/kafka-server-start.sh config/server.properties &"

Timeout.timeout(START_TIMEOUT) do
begin
socket = TCPSocket.open("localhost", 9092)
rescue Errno::ECONNREFUSED
sleep 0.01
retry
end

socket.close
end

# Give Kafka some time to finish printing startup messages
sleep 0.5
puts "#{'***'.blue} #{'Kafka started!'.light_white}"
end
end

file "kafka" do
Rake::Task["kafka:install"].invoke
end

CLEAN.include "tmp", "kafka"
require "turbine/rake_tasks"

0 comments on commit 2c59b2b

Please sign in to comment.