Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added save plan, execute given plan, throttle and rack level un-skew algorithm #109

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion kafka/tools/assigner/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,28 @@ def is_dry_run(args):
return True
return False

def save_plan(args, move_partitions):
if not args.save_plan_path:
return

folder, filename = args.save_plan_path.rsplit("/", 1)
if os.path.isdir(folder) and (not os.path.isdir(args.save_plan_path)):
plans_as_json_list = [p.dict_for_reassignment() for p in move_partitions]
with open(args.save_plan_path, "w") as f:
f.write(json.dumps(plans_as_json_list, indent=4))
log.info("Saved plan at location={}".format(args.save_plan_path))
else:
raise FileExistsError("Given file path={} is a directory not File".format(args.save_plan_path))

def get_throttle_limit(args):
throttle = args.throttle
if args.throttle_limit_file_path:
with open(args.throttle_limit_file_path) as f:
try:
throttle = int(f.read())
except Exception as e:
log.error("Error while getting throttle limit from file {}".format(e))
return throttle

def main():
# Start by loading all the modules
Expand All @@ -103,6 +125,15 @@ def main():

# Set up and parse all CLI arguments
args = set_up_arguments(action_map, sizer_map, plugins)
cluster = Cluster.create_from_zookeeper(args.zookeeper, getattr(args, 'default_retention', 1))

# if include_topics list is empty it means we have to include all topics.
# if include_topics is non-empty then except include topics all will be in exclude_topics list
if args.include_topics and len(args.include_topics) > 0:
for topic in cluster.topics.keys():
if topic not in args.include_topics:
args.exclude_topics.append(topic)

run_plugins_at_step(plugins, 'set_arguments', args)

tools_path = get_tools_path(args.tools_path)
Expand All @@ -124,6 +155,7 @@ def main():
print_leadership("after", newcluster, args.leadership)

move_partitions = cluster.changed_partitions(action_to_run.cluster)
save_plan(args, move_partitions)
batches = split_partitions_into_batches(move_partitions, batch_size=args.moves, use_class=Reassignment)
run_plugins_at_step(plugins, 'set_batches', batches)

Expand All @@ -133,7 +165,7 @@ def main():

for i, batch in enumerate(batches):
log.info("Executing partition reassignment {0}/{1}: {2}".format(i + 1, len(batches), repr(batch)))
batch.execute(i + 1, len(batches), args.zookeeper, tools_path, plugins, dry_run)
batch.execute(i + 1, len(batches), args.zookeeper, tools_path, get_throttle_limit(args), plugins, dry_run)

run_plugins_at_step(plugins, 'before_ple')

Expand Down
15 changes: 14 additions & 1 deletion kafka/tools/assigner/actions/balance.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,20 @@ class ActionBalance(ActionModule):
needs_sizes = True

def __init__(self, args, cluster):
if "rackaware" in args.types and args.types[len(args.types)-1] != "rackaware":


if "topic_partition_in_rack" in args.types:
if args.types[len(args.types)-1] != "topic_partition_in_rack":
raise BalanceException(
"In order to work properly, topic_partition_in_rack must always be the last module specified"
)

elif "rackaware" in args.types and args.types[len(args.types)-2] != "rackaware":
raise BalanceException(
"In order to work properly, if you specify topic_partition_in_rack and " +
"rackaware both module then rackaware must always be in 2nd last & topic_partition_in_rack always be last")

elif "rackaware" in args.types and args.types[len(args.types)-1] != "rackaware":
raise BalanceException("In order to work properly, rackaware must always be the last module specified")

super(ActionBalance, self).__init__(args, cluster)
Expand Down
230 changes: 230 additions & 0 deletions kafka/tools/assigner/actions/balancemodules/topic_partition_in_rack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
import math
from kafka.tools import log
from prettytable import PrettyTable
from kafka.tools.exceptions import BalanceException
from kafka.tools.assigner.actions import ActionBalanceModule


class ActionBalanceTopicPartitionInRack(ActionBalanceModule):
name = "topic_partition_in_rack"
helpstr = "Reassign topic partition replicas to assure they are almost equal distributed among brokers in a racks"

def __init__(self, args, cluster):
super(ActionBalanceTopicPartitionInRack, self).__init__(args, cluster)

def __calculate_max_possible_count(self, broker_count):
total_count = sum(map(lambda x: x[1], broker_count))
return int(math.ceil(total_count * 1.0 / len(broker_count)))

def _unskew_topic_partition_in_rack(self, topic, partitions):
"""
For given partitions of a topic as params applying following algorithm

1. Grouping partitions based on rack id (getting rack id from broker).
2. In a rack, sorting (reverse) brokers based on number of partitions present.
3. Balancing partitions on brokers by moving partitions from left to right
"""

# Creating a map of rack id, broker and count of partition on broker
rack_broker_count, broker_partition = {}, {}
for p in partitions:
for broker in p.replicas:
if broker.rack not in rack_broker_count:
rack_broker_count[broker.rack] = {}

if broker.id not in rack_broker_count[broker.rack]:
rack_broker_count[broker.rack][broker.id] = 0

if broker.id not in broker_partition:
broker_partition[broker.id] = set()

broker_partition[broker.id].add(p)
rack_broker_count[broker.rack][broker.id] += 1
# For a rack, balancing partitions
for rack_id in rack_broker_count:
broker_count = sorted(
map(
lambda x: list(x),
rack_broker_count[rack_id].items()
),
key=lambda x: x[1],
reverse=True
)

# calculating max possible number of partitions in a broker
count = self.__calculate_max_possible_count(broker_count)

# On sorted broker_count list,
# balancing number of partitions from left to right
left, right, adjusted_count = 0, len(broker_count) - 1, 0
while left < right:
bleft = broker_count[left]
bid, bcount = bleft[0], bleft[1]
adjusted_count = bcount - count
if adjusted_count <= 0:
left += 1
continue

for p in broker_partition[bid]:
bright = broker_count[right]
if bright[1] >= count:
right -= 1

if left >= right or adjusted_count == 0:
break

replicas = [b.id for b in p.replicas]
if bid in replicas and bright[0] not in replicas:
index = replicas.index(bid)
p.swap_replicas(p.replicas[index], self.cluster.brokers[bright[0]])
adjusted_count -= 1
bleft[1] -= 1
bright[1] += 1
left += 1


def _unskew_topic_partition_leader_in_rack(self, topic, partitions):

# Creating a map of rack id, broker and count of partition leader on broker
rack_broker_leader_count_map, broker_partition = {}, {}
for p in partitions:
for broker in p.replicas:
if broker.rack not in rack_broker_leader_count_map:
rack_broker_leader_count_map[broker.rack] = {}

if broker.id not in rack_broker_leader_count_map[broker.rack]:
rack_broker_leader_count_map[broker.rack][broker.id] = 0

if p.replicas.index(broker) == 0:
rack_broker_leader_count_map[broker.rack][broker.id] += 1

if broker.id not in broker_partition:
broker_partition[broker.id] = set()

broker_partition[broker.id].add(p)

# For a rack, balancing partitions leader
for rack_id in rack_broker_leader_count_map:
broker_count = sorted(
map(
lambda x: list(x),
rack_broker_leader_count_map[rack_id].items()
),
key=lambda x:x[1],
reverse=True
)

count = self.__calculate_max_possible_count(broker_count)
left, right, adjusted_count = 0, len(broker_count) - 1, 0
while left < right:
bleft = broker_count[left]
bid, bcount = bleft[0], bleft[1]
adjusted_count = bcount - count
if adjusted_count <= 0:
left += 1
continue

for p in broker_partition[bid]:
bright = broker_count[right]
if bright[1] >= count:
right -= 1

if left >= right or adjusted_count == 0:
break

replicas = [b.id for b in p.replicas]
if not(bid in replicas and bright[0] not in replicas and replicas.index(bid) == 0):
continue

# Swaping leader from left side broker and non leader from right side broker
p.swap_replicas(p.replicas[0], self.cluster.brokers[bright[0]])
adjusted_count -= 1
bleft[1] -= 1
bright[1] += 1

for p2 in broker_partition[bright[0]]:
replicas = [b.id for b in p2.replicas]
if bright[0] in replicas and replicas.index(bright[0]) != 0 and bid not in replicas:
index2 = replicas.index(bright[0])
p2.swap_replicas(p2.replicas[index2], self.cluster.brokers[bid])
break
left += 1

def process_cluster(self):
log.info("Starting {} module".format(self.name))

# Check if rack information is set for the cluster
for broker in self.cluster.brokers.values():
if broker.rack:
continue
raise BalanceException("Cannot balance cluster by rack as it has no rack information")

for topic in self.cluster.topics:
if topic in self.args.exclude_topics:
log.debug("Skipping topic {0} as it is explicitly excluded".format(topic))
continue

before_rearrange = self.__create_count_map(
self.cluster.topics[topic].partitions
)
self._unskew_topic_partition_in_rack(
topic, self.cluster.topics[topic].partitions)

after_rearrange = self.__create_count_map(
self.cluster.topics[topic].partitions
)
self._unskew_topic_partition_leader_in_rack(
topic, self.cluster.topics[topic].partitions)

after_leader_rearrange = self.__create_count_map(
self.cluster.topics[topic].partitions
)

self.__stats(
topic,
before_rearrange,
after_rearrange,
after_leader_rearrange
)

def __stats(self, topic, before, after, after_leader):
table = PrettyTable()
table.field_names = [
"broker_id", "rack_id", "before(leader + follower = total)",
"after(leader + follower = total)", "after_leader(leader + follower = total)"
]

fmt = "{} + {} = {}"

for _id in before:
table.add_row([
_id, self.cluster.brokers[_id].rack,
fmt.format(
before[_id]["leader"], before[_id]["follower"],
before[_id]["leader"] + before[_id]["follower"]
),
fmt.format(
after[_id]["leader"], after[_id]["follower"],
after[_id]["leader"] + after[_id]["follower"]
),
fmt.format(
after_leader[_id]["leader"], after_leader[_id]["follower"],
after_leader[_id]["leader"] + after_leader[_id]["follower"]
)
])
log.info("\n" + table.get_string(sortby="rack_id"))


def __create_count_map(self, partitions):
"""Create broker, leader & follower count dict."""
count_map = {}
for p in partitions:
for i in range(len(p.replicas)):
broker_id = p.replicas[i].id
if broker_id not in count_map:
count_map[broker_id] = {"leader": 0, "follower": 0}
if i == 0:
count_map[broker_id]["leader"] += 1
else:
count_map[broker_id]["follower"] += 1
return count_map
60 changes: 60 additions & 0 deletions kafka/tools/assigner/actions/execute_plan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import json
from kafka.tools.assigner.actions import ActionModule
from kafka.tools.assigner.arguments import file_path_checker
from kafka.tools.models.partition import Partition
from kafka.tools.exceptions import InvalidPlanFormatException, UnknownBrokerException


class ActionExecutePlan(ActionModule):
name = "execute_plan"
helpstr = "Execute plan from given path"
needs_sizes = False

def __init__(self, args, cluster):
super(ActionExecutePlan, self).__init__(args, cluster)

@classmethod
def _add_args(cls, parser):
parser.add_argument(
'--plan-file-path',
required=True,
help="File path where kafka re-assign plan is stored in json format",
type=file_path_checker
)

def __check_plan_format(self, plan):
"""Expecting plan will of type list of dict"""
if not (isinstance(plan, list) and len(plan) > 0 and isinstance(plan[0], dict)):
raise InvalidPlanFormatException()

# Check all broker id in plan must be present in cluster
brokers_in_plan, brokers_in_cluster = set(), set(self.cluster.brokers.keys())

for partition_plan in plan:
brokers_in_plan.update(partition_plan["replicas"])

missing = list(brokers_in_plan - brokers_in_cluster)
if missing:
raise UnknownBrokerException(
"Broker ids = {} in plan not present in cluster".format(str(missing))
)

def process_cluster(self):
plan = json.load(open(self.args.plan_file_path))
self.__check_plan_format(plan)

# check broker exist in cluster or not in plan

topic_track = {}

for partition_plan in plan:
if partition_plan['topic'] not in topic_track:
self.cluster.topics[partition_plan["topic"]].partitions = []
topic_track[partition_plan['topic']] = True

topic = self.cluster.topics[partition_plan["topic"]]
partition = Partition(topic, partition_plan["partition"])
topic.partitions.append(partition)

for replica in partition_plan["replicas"]:
partition.add_replica(self.cluster.brokers[replica])
Loading