Skip to content

Commit

Permalink
Use jar dependencies to fix snappy compression issue
Browse files Browse the repository at this point in the history
  • Loading branch information
suyograo committed Apr 28, 2016
1 parent 589e9b2 commit 5de6b25
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 29 deletions.
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ before_install:
- kafka/bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic topic1 --zookeeper localhost:2181
- kafka/bin/kafka-topics.sh --create --partitions 2 --replication-factor 1 --topic topic2 --zookeeper localhost:2181
- kafka/bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic gzip_topic --zookeeper localhost:2181
- kafka/bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic snappy_topic --zookeeper localhost:2181
- kafka/bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic lz4_topic --zookeeper localhost:2181
- kafka/bin/kafka-topics.sh --create --partitions 3 --replication-factor 1 --topic topic3 --zookeeper localhost:2181
before_script:
- bundle exec rake vendor
- bundle exec rake install_jars
script: bundle exec rspec && bundle exec rspec --tag integration
37 changes: 9 additions & 28 deletions Rakefile
Original file line number Diff line number Diff line change
@@ -1,35 +1,16 @@
require 'gem_publisher'

desc 'Publish gem to RubyGems.org'
task :publish_gem do |t|
gem = GemPublisher.publish_if_updated('logstash-output-kafka.gemspec', :rubygems)
puts "Published #{gem}" if gem
end
require "logstash/devutils/rake"

task :default do
system('rake -T')
end

desc "Get jars"
task :vendor do
jar_target = "vendor/jar-dependencies/runtime-jars"
kafka_version = "0.9.0.1"
kafka_url = "http://central.maven.org/maven2/org/apache/kafka/kafka-clients/#{kafka_version}/kafka-clients-#{kafka_version}.jar"
slf4j_version = "1.7.13"
slf4j_url = "http://central.maven.org/maven2/org/slf4j/slf4j-api/#{slf4j_version}/slf4j-api-#{slf4j_version}.jar"
slf4j_nop_url = "http://central.maven.org/maven2/org/slf4j/slf4j-nop/#{slf4j_version}/slf4j-nop-#{slf4j_version}.jar"

puts "Will get jars for version #{kafka_version}"
puts "Removing current jars"
`rm -rf #{jar_target}`
`mkdir -p #{jar_target}`
Dir.chdir jar_target
puts "Will download #{kafka_url}"
`curl #{kafka_url} -o kafka-clients-#{kafka_version}.jar`
puts "Will download #{slf4j_url}"
`curl #{slf4j_url} -o slf4j-api-#{slf4j_version}.jar`
puts "Will download #{slf4j_nop_url}"
`curl #{slf4j_nop_url} -o slf4j-noop-#{slf4j_version}.jar`
require 'jars/installer'
task :install_jars do
# If we don't have these env variables set, jar-dependencies will
# download the jars and place it in $PWD/lib/. We actually want them in
# $PWD/vendor
ENV['JARS_HOME'] = Dir.pwd + "/vendor/jar-dependencies/runtime-jars"
ENV['JARS_VENDOR'] = "false"
Jars::Installer.new.vendor_jars!(false)
end

require "logstash/devutils/rake"
2 changes: 1 addition & 1 deletion lib/logstash/outputs/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base
config :buffer_memory, :validate => :number, :default => 33554432
# The compression type for all data generated by the producer.
# The default is none (i.e. no compression). Valid values are none, gzip, or snappy.
config :compression_type, :validate => ["none", "gzip", "snappy"], :default => "none"
config :compression_type, :validate => ["none", "gzip", "snappy", "lz4"], :default => "none"
# The id string to pass to the server when making requests.
# The purpose of this is to be able to track the source of requests beyond just
# ip/port by allowing a logical application name to be included with the request
Expand Down
6 changes: 6 additions & 0 deletions logstash-output-kafka.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,17 @@ Gem::Specification.new do |s|
# Special flag to let us know this is actually a logstash plugin
s.metadata = { 'logstash_plugin' => 'true', 'group' => 'output'}

s.requirements << "jar 'org.apache.kafka:kafka-clients', '0.9.0.1'"
s.requirements << "jar 'org.slf4j:slf4j-log4j12', '1.7.13'"

s.add_development_dependency 'jar-dependencies', '~> 0.3.2'

# Gem dependencies
s.add_runtime_dependency "logstash-core-plugin-api", "~> 1.0"
s.add_runtime_dependency 'logstash-codec-plain'
s.add_runtime_dependency 'logstash-codec-json'

s.add_development_dependency 'logstash-devutils'
s.add_development_dependency 'poseidon'
s.add_development_dependency 'snappy'
end
32 changes: 32 additions & 0 deletions spec/integration/outputs/kafka_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,38 @@
end
end

context 'when using snappy compression' do
let(:test_topic) { 'snappy_topic' }
let!(:consumer) do
Poseidon::PartitionConsumer.new("my_test_consumer", kafka_host, kafka_port,
test_topic, 0, :earliest_offset)
end
subject do
consumer.fetch
end

before :each do
config = base_config.merge({"topic_id" => test_topic, "compression_type" => "snappy"})
load_kafka_data(config)
end

it 'should have data integrity' do
expect(subject.size).to eq(num_events)
subject.each do |m|
expect(m.value).to eq(event.to_s)
end
end
end

context 'when using LZ4 compression' do
let(:test_topic) { 'lz4_topic' }

before :each do
config = base_config.merge({"topic_id" => test_topic, "compression_type" => "lz4"})
load_kafka_data(config)
end
end

context 'when using multi partition topic' do
let(:num_events) { 10 }
let(:test_topic) { 'topic3' }
Expand Down

0 comments on commit 5de6b25

Please sign in to comment.