Skip to content

Commit

Permalink
Merge fa39bfd into fb77e77
Browse files Browse the repository at this point in the history
  • Loading branch information
mthssdrbrg committed May 9, 2015
2 parents fb77e77 + fa39bfd commit e640687
Show file tree
Hide file tree
Showing 11 changed files with 183 additions and 47 deletions.
2 changes: 1 addition & 1 deletion .ruby-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
jruby-1.7.19
jruby-1.7.20
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
language: ruby
sudo: false
cache: bundler
env:
- JRUBY_OPTS="$JRUBY_OPTS -Xcli.debug=true --debug"
before_script:
- ./bin/kafka start || true
- tail -n +1 tmp/*.log
Expand Down
4 changes: 4 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# encoding: utf-8

source 'https://rubygems.org'

gem 'kafka-jars', '0.8.2.1.pre0'

gemspec

group :test do
Expand Down
87 changes: 52 additions & 35 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,55 +2,71 @@ PATH
remote: .
specs:
heller (0.1.0-java)
kafka-jars (= 0.8.1.1.pre1)
kafka-jars

GEM
remote: https://rubygems.org/
specs:
coveralls (0.7.0)
multi_json (~> 1.3)
rest-client
simplecov (>= 0.7)
term-ansicolor
thor
coveralls (0.8.1)
json (~> 1.8)
rest-client (>= 1.6.8, < 2)
simplecov (~> 0.10.0)
term-ansicolor (~> 1.3)
thor (~> 0.19.1)
diff-lcs (1.2.5)
docile (1.1.3)
kafka-jars (0.8.1.1.pre1-java)
metrics-core-jars (~> 2.1.2)
docile (1.1.5)
domain_name (0.5.24)
unf (>= 0.0.5, < 1.0.0)
http-cookie (1.0.2)
domain_name (~> 0.5)
json (1.8.2-java)
kafka-clients-jars (0.8.2.1.pre0-java)
lz4-jars (~> 1.2)
metrics-core-jars (>= 2.1, < 3.0.0)
slf4j-jars (~> 1.7)
snappy-jars (~> 1.1)
kafka-jars (0.8.2.1.pre0-java)
kafka-clients-jars (~> 0.8)
lz4-jars (~> 1.2)
metrics-core-jars (>= 2.1, < 3.0.0)
scala-library-jars (~> 2.9.2)
slf4j-jars (~> 1, >= 1.6.2)
snappy-jars (~> 1.1.0.1)
zookeeper-jars (~> 3.4.6)
metrics-core-jars (2.1.2-java)
slf4j-jars (~> 1)
mime-types (2.3)
multi_json (1.10.1)
rest-client (1.6.7)
mime-types (>= 1.16)
rspec (3.0.0)
rspec-core (~> 3.0.0)
rspec-expectations (~> 3.0.0)
rspec-mocks (~> 3.0.0)
rspec-core (3.0.0)
rspec-support (~> 3.0.0)
rspec-expectations (3.0.0)
slf4j-jars (~> 1.7)
snappy-jars (~> 1.1)
zookeeper-jars (~> 3.4)
lz4-jars (1.2.0-java)
metrics-core-jars (2.2.0-java)
mime-types (2.5)
netrc (0.10.3)
rest-client (1.8.0)
http-cookie (>= 1.0.2, < 2.0)
mime-types (>= 1.16, < 3.0)
netrc (~> 0.7)
rspec (3.2.0)
rspec-core (~> 3.2.0)
rspec-expectations (~> 3.2.0)
rspec-mocks (~> 3.2.0)
rspec-core (3.2.3)
rspec-support (~> 3.2.0)
rspec-expectations (3.2.1)
diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.0.0)
rspec-mocks (3.0.0)
rspec-support (~> 3.0.0)
rspec-support (3.0.0)
rspec-support (~> 3.2.0)
rspec-mocks (3.2.1)
diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.2.0)
rspec-support (3.2.2)
scala-library-jars (2.9.2-java)
simplecov (0.8.2)
simplecov (0.10.0)
docile (~> 1.1.0)
multi_json
simplecov-html (~> 0.8.0)
simplecov-html (0.8.0)
json (~> 1.8)
simplecov-html (~> 0.10.0)
simplecov-html (0.10.0)
slf4j-jars (1.7.7-java)
snappy-jars (1.1.0.1.2-java)
term-ansicolor (1.3.0)
tins (~> 1.0)
thor (0.19.1)
tins (1.3.0)
tins (1.5.1)
unf (0.1.4-java)
zookeeper-jars (3.4.6-java)

PLATFORMS
Expand All @@ -59,5 +75,6 @@ PLATFORMS
DEPENDENCIES
coveralls
heller!
kafka-jars (= 0.8.2.1.pre0)
rspec
simplecov
17 changes: 9 additions & 8 deletions bin/kafka
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ function start() {
base_dir=$(dirname $0)/..
log_dir=$base_dir/tmp
config_dir=$base_dir/spec/support/config
classpath=$(ls -1 $(bundle show scala-library-jars)/lib/*.jar | paste -sd: -)
classpath=$classpath:$(ls -1 $(bundle show slf4j-jars)/lib/slf4j-{api,simple}-*.jar | paste -sd: -)
classpath=$classpath:$(ls -1 $(bundle show zookeeper-jars)/lib/*.jar | paste -sd: -)
classpath=$classpath:$(ls -1 $(bundle show metrics-core-jars)/lib/*.jar | paste -sd: -)
classpath=$classpath:$(ls -1 $(bundle show snappy-jars)/lib/*.jar | paste -sd: -)
classpath=$classpath:$(ls -1 $(bundle show kafka-jars)/lib/*.jar | paste -sd: -)
slf4j_dir=$(find $GEM_HOME -name 'slf4j-jars*' -type d | sort | tail -n 1)
classpath=$(find $slf4j_dir -name 'slf4j-api-*.jar' | paste -sd: -)
classpath=$classpath:$(find $slf4j_dir -name 'slf4j-simple-*.jar' | paste -sd: -)
for gem in scala-library-jars zookeeper-jars metrics-core-jars snappy-jars lz4-jars kafka-clients-jars kafka-jars; do
gem_dir=$(find $GEM_HOME -name $gem'*' -type d | sort | tail -n 1)
classpath=$classpath:$(find $gem_dir -name '*.jar' | paste -sd: -)
done
kafka_java_opts="-Xmx512M -server -Dlog4j.configuration=file:$config_dir/log4j.properties -cp $classpath"

mkdir -p $log_dir
Expand All @@ -37,8 +38,8 @@ function start() {
}

function stop() {
kill $(jps -m | grep Kafka | cut -d ' ' -f 1) > /dev/null 2>&1
kill $(jps -m | grep zookeeper | cut -d ' ' -f 1) > /dev/null 2>&1
kill -9 $(jps -m | grep Kafka | cut -d ' ' -f 1) > /dev/null 2>&1
kill -9 $(jps -m | grep zookeeper | cut -d ' ' -f 1) > /dev/null 2>&1
}

RETVAL=0
Expand Down
2 changes: 1 addition & 1 deletion heller.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ Gem::Specification.new do |s|

s.platform = 'java'

s.add_runtime_dependency 'kafka-jars', '= 0.8.1.1.pre1'
s.add_runtime_dependency 'kafka-jars'
end
6 changes: 5 additions & 1 deletion lib/heller/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@ def merge_with_defaults(options)
end
end

def convert_key(key)
key_mappings.key?(key) ? key_mappings[key] : key.to_s.gsub('_', '.')
end

def to_properties
@configuration.each_with_object(Properties.new) do |(key, value), props|
props.put(key_mappings[key.to_sym], value.to_s)
props.put(convert_key(key), value.to_s)
end
end
end
Expand Down
42 changes: 42 additions & 0 deletions spec/heller/consumer_configuration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,34 @@ module Heller
it 'sets #zk_sync_time_ms' do
expect(configuration.zk_sync_time_ms).to eq(100)
end

it 'sets #partition_assignment_strategy' do
expect(configuration.partition_assignment_strategy).to eq('range')
end

it 'sets #offsets_storage' do
expect(configuration.offsets_storage).to eq('kafka')
end

it 'sets #offsets_channel_backoff_ms' do
expect(configuration.offsets_channel_backoff_ms).to eq(1000)
end

it 'sets #offsets_channel_socket_timeout_ms' do
expect(configuration.offsets_channel_socket_timeout_ms).to eq(1500)
end

it 'sets #offsets_commit_max_retries' do
expect(configuration.offsets_commit_max_retries).to eq(15)
end

it 'sets #dual_commit_enabled' do
expect(configuration.dual_commit_enabled).to be true
end

it 'sets #exclude_internal_topics' do
expect(configuration.exclude_internal_topics).to be false
end
end

context 'with Symbol keys' do
Expand All @@ -115,6 +143,13 @@ module Heller
zk_session_timeout: 125,
zk_connection_timeout: 150,
zk_sync_time: 100,
partition_assignment_strategy: 'range',
offsets_storage: 'kafka',
offsets_channel_backoff_ms: 1000,
offsets_channel_socket_timeout_ms: 1500,
offsets_commit_max_retries: 15,
dual_commit_enabled: true,
exclude_internal_topics: false,
}
end

Expand Down Expand Up @@ -144,6 +179,13 @@ module Heller
'zk_session_timeout' => 125,
'zk_connection_timeout' => 150,
'zk_sync_time' => 100,
'partition_assignment_strategy' => 'range',
'offsets_storage' => 'kafka',
'offsets_channel_backoff_ms' => 1000,
'offsets_channel_socket_timeout_ms' => 1500,
'offsets_commit_max_retries' => 15,
'dual_commit_enabled' => true,
'exclude_internal_topics' => false,
}
end

Expand Down
62 changes: 61 additions & 1 deletion spec/integration/end_to_end_communication_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,22 @@

describe 'End-to-end communication' do
let :producer do
Heller::Producer.new('localhost:9092', client_id: 'spec-producer', batch_size: 1, num_retries: 10, retry_backoff: 1000)
Heller::Producer.new('localhost:9092', producer_options)
end

let :consumer do
Heller::Consumer.new('localhost:9092', client_id: 'spec-consumer')
end

let :producer_options do
{
client_id: 'spec-producer',
batch_size: 1,
num_retries: 10,
retry_backoff: 1000,
}
end

after do
producer.close
consumer.close
Expand Down Expand Up @@ -90,6 +99,57 @@
expect(JSON.parse(message)).to eq({'a key' => 'a value'})
end
end

context 'with Snappy compression' do
let :producer_options do
super.merge({
client_id: 'spec-producer-snappy',
compression: :snappy,
})
end

context 'simple string messages' do
let :topic do
"spec-snappy-simple-string-#{Time.now.to_i.to_s(36)}"
end

before do
producer.push(Heller::Message.new(topic, 'simple string message'))
end

it 'is no big deal' do
expect(enumerator).to be_a(Heller::MessageSetEnumerator)

messages = enumerator.to_a
expect(messages.size).to eq(1)

offset, message = messages.last
expect(offset).to be_zero
expect(message).to eq('simple string message')
end
end

context 'JSON serialized hashes' do
let :topic do
"spec-snappy-json-hash-#{Time.now.to_i.to_s(36)}"
end

before do
producer.push(Heller::Message.new(topic, {'a key' => 'a value'}.to_json))
end

it 'is no big deal' do
expect(enumerator).to be_a(Heller::MessageSetEnumerator)

messages = enumerator.to_a
expect(messages.size).to eq(1)

offset, message = messages.last
expect(offset).to be_zero
expect(JSON.parse(message)).to eq({'a key' => 'a value'})
end
end
end
end

describe '#metadata' do
Expand Down
5 changes: 5 additions & 0 deletions spec/support/config/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
log4j.rootLogger=INFO, CONSOLE

log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=[%d] %p %m (%c)%n
1 change: 1 addition & 0 deletions spec/support/config/server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ socket.receive.buffer.bytes=1048576
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600

controlled.shutdown.enable=false

############################# Log Basics #############################

Expand Down

0 comments on commit e640687

Please sign in to comment.