Skip to content

Commit

Permalink
Add drain command
Browse files Browse the repository at this point in the history
When we need to reassign replicas from a broker to other brokers,
say, when the broker is dead, we used to use `reassign` command,
which reassign all replicas randomly, whereas replicas on healthy
brokers might be moved around. This is unefficient.

In this PR, I bring into a new command `drain`, which essentially
empty a specific broker and assign its replicas to a list of
healthy ones. Replicas on healthy brokers will not be moved around.

Usage:
drain <broker id> [topic] [--brokers <ids>]

Tests are also be brought into this repo, start from covering the
new command.
  • Loading branch information
lucaluo committed Jun 12, 2015
1 parent a667bb2 commit d0c9457
Show file tree
Hide file tree
Showing 12 changed files with 317 additions and 21 deletions.
2 changes: 2 additions & 0 deletions .rspec
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
--color
--require spec_helper
45 changes: 39 additions & 6 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,60 @@ PATH
specs:
kafkat (0.0.10)
colored (~> 1.2)
factory_girl (~> 4.5.0)
highline (~> 1.6, >= 1.6.21)
retryable (~> 1.3, >= 1.3.5)
rspec (~> 3.2.0)
rspec-collection_matchers (~> 1.1.0)
trollop (~> 2.0)
zk (~> 1.9, >= 1.9.4)

GEM
remote: https://rubygems.org/
specs:
activesupport (4.2.1)
i18n (~> 0.7)
json (~> 1.7, >= 1.7.7)
minitest (~> 5.1)
thread_safe (~> 0.3, >= 0.3.4)
tzinfo (~> 1.1)
colored (1.2)
highline (1.6.21)
diff-lcs (1.2.5)
factory_girl (4.5.0)
activesupport (>= 3.0.0)
highline (1.7.2)
i18n (0.7.0)
json (1.8.3)
little-plugger (1.1.3)
logging (1.8.2)
little-plugger (>= 1.1.3)
multi_json (>= 1.8.4)
multi_json (1.10.1)
retryable (1.3.5)
trollop (2.0)
zk (1.9.4)
minitest (5.7.0)
multi_json (1.11.1)
retryable (1.3.6)
rspec (3.2.0)
rspec-core (~> 3.2.0)
rspec-expectations (~> 3.2.0)
rspec-mocks (~> 3.2.0)
rspec-collection_matchers (1.1.2)
rspec-expectations (>= 2.99.0.beta1)
rspec-core (3.2.3)
rspec-support (~> 3.2.0)
rspec-expectations (3.2.1)
diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.2.0)
rspec-mocks (3.2.1)
diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.2.0)
rspec-support (3.2.2)
thread_safe (0.3.5)
trollop (2.1.2)
tzinfo (1.2.2)
thread_safe (~> 0.1)
zk (1.9.5)
logging (~> 1.8.2)
zookeeper (~> 1.4.0)
zookeeper (1.4.9)
zookeeper (1.4.10)

PLATFORMS
ruby
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ Here's a list of supported commands:
set-replication-factor [topic] [--newrf <n>] [--brokers id[,id]] Set the replication factor of
shutdown <broker id> Gracefully remove leaderships from a broker (requires JMX).
topics Print all topics.
drain <broker id> [topic] [--brokers <ids>] Reassign partitions from a dead broker to healthy brokers.
```

Expand Down
3 changes: 3 additions & 0 deletions kafkat.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,7 @@ Gem::Specification.new do |s|
s.add_runtime_dependency 'highline', '~> 1.6', '>= 1.6.21'
s.add_runtime_dependency 'retryable', '~> 1.3', '>= 1.3.5'
s.add_runtime_dependency 'colored', '~> 1.2'
s.add_runtime_dependency 'rspec', '~> 3.2.0'
s.add_runtime_dependency 'rspec-collection_matchers', '~> 1.1.0'
s.add_runtime_dependency 'factory_girl', '~> 4.5.0'
end
1 change: 1 addition & 0 deletions lib/kafkat/command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ def self.get(name)

class Base
include Formatting
include CommandIO

attr_reader :config

Expand Down
60 changes: 60 additions & 0 deletions lib/kafkat/command/drain.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
module Kafkat
module Command
class Drain < Base
register_as 'drain'

usage 'drain <broker id> [topic] [--brokers <ids>]',
'Reassign partitions from a dead broker to healthy brokers.'

def run
dead_broker_id = ARGV[0] && ARGV.shift.to_i
if dead_broker_id.nil?
puts "You must specify a broker ID."
exit 1
end

topic_name = ARGV.shift unless ARGV[0] && ARGV[0].start_with?('--')
topics = topic_name && zookeeper.get_topics([topic_name])
topics ||= zookeeper.get_topics

opts = Trollop.options do
opt :brokers, "replica set (broker IDs)", type: :string
end

healthy_broker_ids = opts[:brokers] && opts[:brokers].split(',').map(&:to_i)
healthy_broker_ids ||= zookeeper.get_brokers.values.map(&:id)
healthy_broker_ids.delete(dead_broker_id)

all_brokers = zookeeper.get_brokers
all_brokers_id = all_brokers.values.map(&:id)

healthy_broker_ids.each do |id|
if !all_brokers_id.include?(id)
print "ERROR: Broker #{id} is not currently active.\n"
exit 1
end
end

assignments = generate_assignments(dead_broker_id, topics, healthy_broker_ids)
prompt_and_execute_assignments(assignments)
end

def generate_assignments(dead_broker_id, topics, healthy_broker_ids)
assignments = []

topics.each do |_, t|
t.partitions.each do |p|
if p.replicas.include? dead_broker_id
replicas = p.replicas - [dead_broker_id]
potential_broker_ids = healthy_broker_ids - replicas
replicas << potential_broker_ids.sample unless potential_broker_ids.empty?
assignments << Assignment.new(t.name, p.id, replicas)
end
end
end

assignments
end
end
end
end
16 changes: 1 addition & 15 deletions lib/kafkat/command/reassign.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,21 +68,7 @@ def run

# ****************

print "This operation executes the following assignments:\n\n"
print_assignment_header
assignments.each { |a| print_assignment(a) }
print "\n"

return unless agree("Proceed (y/n)?")

result = nil
begin
print "\nBeginning.\n"
result = admin.reassign!(assignments)
print "Started.\n"
rescue Interface::Admin::ExecutionFailedError
print result
end
prompt_and_execute_assignments(assignments)
end
end
end
Expand Down
1 change: 1 addition & 0 deletions lib/kafkat/utility.rb
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
require 'kafkat/utility/formatting'
require 'kafkat/utility/command_io'
21 changes: 21 additions & 0 deletions lib/kafkat/utility/command_io.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
module Kafkat
module CommandIO
def prompt_and_execute_assignments(assignments)
print "This operation executes the following assignments:\n\n"
print_assignment_header
assignments.each { |a| print_assignment(a) }
print "\n"

return unless agree("Proceed (y/n)?")

result = nil
begin
print "\nBeginning.\n"
result = admin.reassign!(assignments)
print "Started.\n"
rescue Interface::Admin::ExecutionFailedError
print result
end
end
end
end
25 changes: 25 additions & 0 deletions spec/factories/topic.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
module Kafkat
FactoryGirl.define do
factory :topic, class:Topic do
name "topic_name"

factory :topic_rep_factor_one do
partitions {[Partition.new(name, 0, [0], 0, 0),
Partition.new(name, 1, [1], 1, 1),
Partition.new(name, 2, [2], 2, 2)]}
end

factory :topic_rep_factor_two do
partitions {[Partition.new(name, 0, [0, 1], 0, 0),
Partition.new(name, 1, [0, 2], 2, 2),
Partition.new(name, 2, [1, 2], 1, 1)]}
end

factory :topic_rep_factor_three do
partitions {[Partition.new(name, 0, [0, 1, 2], 0, 0),
Partition.new(name, 1, [0, 1, 2], 1, 1),
Partition.new(name, 2, [0, 1, 2], 2, 2)]}
end
end
end
end
63 changes: 63 additions & 0 deletions spec/lib/kafkat/command/drain_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
require 'spec_helper'

module Kafkat
RSpec.describe Command::Drain do
let(:drain) { Command::Drain.new({}) }
let(:dead_broker) { 0 }
let(:healthy_brokers) { [1, 2] }

context 'three nodes with replication factor 1' do
let(:topic_rep_factor_one) { FactoryGirl.build(:topic_rep_factor_one) }

it 'should put replicas to one of the other two healthy nodes' do
assignments = drain.generate_assignments(dead_broker,
{"topic_name" => topic_rep_factor_one},
healthy_brokers)
expect(assignments).to have_exactly(1).Partition
assignments.each do |a|
expect(a.replicas).to have_exactly(1).broker_id

a.replicas.each do |r|
expect(healthy_brokers).to include(r)
end
end
end
end

context 'three nodes with replication factor 2' do
let(:topic_rep_factor_two) { FactoryGirl.build(:topic_rep_factor_two) }

it 'should put replicas to both of the other two healthy nodes' do
assignments = drain.generate_assignments(dead_broker,
{"topic_name" => topic_rep_factor_two},
healthy_brokers)
expect(assignments).to have_exactly(2).Partitions
assignments.each do |a|
expect(a.replicas).to have_exactly(2).broker_ids

a.replicas.each do |r|
expect(healthy_brokers).to include(r)
end
end
end
end

context 'three nodes with replication factor 3' do
let(:topic_rep_factor_three) { FactoryGirl.build(:topic_rep_factor_three) }

it 'should put replicas to both of the other two healthy nodes' do
assignments = drain.generate_assignments(dead_broker,
{"topic_name" => topic_rep_factor_three},
healthy_brokers)
expect(assignments).to have_exactly(3).Partitions
assignments.each do |a|
expect(a.replicas).to have_exactly(2).broker_ids

a.replicas.each do |r|
expect(healthy_brokers).to include(r)
end
end
end
end
end
end
100 changes: 100 additions & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# This file was generated by the `rspec --init` command. Conventionally, all
# specs live under a `spec` directory, which RSpec adds to the `$LOAD_PATH`.
# The generated `.rspec` file contains `--require spec_helper` which will cause
# this file to always be loaded, without a need to explicitly require it in any
# files.
#
# Given that it is always loaded, you are encouraged to keep this file as
# light-weight as possible. Requiring heavyweight dependencies from this file
# will add to the boot time of your test suite on EVERY test run, even for an
# individual file that may not need all of that loaded. Instead, consider making
# a separate helper file that requires the additional dependencies and performs
# the additional setup, and require it from the spec files that actually need
# it.
#
# The `.rspec` file also contains a few flags that are not defaults but that
# users commonly want.
#
# See http://rubydoc.info/gems/rspec-core/RSpec/Core/Configuration

require 'kafkat'
require 'factory_girl'
require 'rspec/collection_matchers'

require_relative '../spec/factories/topic'

RSpec.configure do |config|
# rspec-expectations config goes here. You can use an alternate
# assertion/expectation library such as wrong or the stdlib/minitest
# assertions if you prefer.
config.expect_with :rspec do |expectations|
# This option will default to `true` in RSpec 4. It makes the `description`
# and `failure_message` of custom matchers include text for helper methods
# defined using `chain`, e.g.:
# be_bigger_than(2).and_smaller_than(4).description
# # => "be bigger than 2 and smaller than 4"
# ...rather than:
# # => "be bigger than 2"
expectations.include_chain_clauses_in_custom_matcher_descriptions = true
end

# rspec-mocks config goes here. You can use an alternate test double
# library (such as bogus or mocha) by changing the `mock_with` option here.
config.mock_with :rspec do |mocks|
# Prevents you from mocking or stubbing a method that does not exist on
# a real object. This is generally recommended, and will default to
# `true` in RSpec 4.
mocks.verify_partial_doubles = true
end

config.include FactoryGirl::Syntax::Methods

# The settings below are suggested to provide a good initial experience
# with RSpec, but feel free to customize to your heart's content.
=begin
# These two settings work together to allow you to limit a spec run
# to individual examples or groups you care about by tagging them with
# `:focus` metadata. When nothing is tagged with `:focus`, all examples
# get run.
config.filter_run :focus
config.run_all_when_everything_filtered = true
# Limits the available syntax to the non-monkey patched syntax that is
# recommended. For more details, see:
# - http://myronmars.to/n/dev-blog/2012/06/rspecs-new-expectation-syntax
# - http://teaisaweso.me/blog/2013/05/27/rspecs-new-message-expectation-syntax/
# - http://myronmars.to/n/dev-blog/2014/05/notable-changes-in-rspec-3#new__config_option_to_disable_rspeccore_monkey_patching
config.disable_monkey_patching!
# This setting enables warnings. It's recommended, but in some cases may
# be too noisy due to issues in dependencies.
config.warnings = true
# Many RSpec users commonly either run the entire suite or an individual
# file, and it's useful to allow more verbose output when running an
# individual spec file.
if config.files_to_run.one?
# Use the documentation formatter for detailed output,
# unless a formatter has already been configured
# (e.g. via a command-line flag).
config.default_formatter = 'doc'
end
# Print the 10 slowest examples and example groups at the
# end of the spec run, to help surface which specs are running
# particularly slow.
config.profile_examples = 10
# Run specs in random order to surface order dependencies. If you find an
# order dependency and want to debug it, you can fix the order by providing
# the seed, which is printed after each run.
# --seed 1234
config.order = :random
# Seed global randomization in this process using the `--seed` CLI option.
# Setting this allows you to use `--seed` to deterministically reproduce
# test failures related to randomization by passing the same `--seed` value
# as the one that triggered the failure.
Kernel.srand config.seed
=end
end

0 comments on commit d0c9457

Please sign in to comment.