Skip to content

Commit

Permalink
some refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Swen committed Sep 29, 2021
1 parent 139d015 commit 4d626d7
Showing 1 changed file with 48 additions and 15 deletions.
63 changes: 48 additions & 15 deletions esque/cli/commands/create/topic.py
Expand Up @@ -6,13 +6,21 @@
from esque.cli.helpers import ensure_approval, fallback_to_stdin
from esque.cli.options import State, default_options
from esque.cli.output import blue_bold
from esque.controller.topic_controller import TopicController
from esque.errors import ValidationException
from esque.resources.topic import Topic


@click.command("topic")
@click.argument("topic-name", metavar="TOPIC_NAME", callback=fallback_to_stdin, required=False)
@click.option("-l", "--like", metavar="<template_topic>", help="Topic to use as template.", autocompletion=list_topics)
@click.option(
"-l",
"--like",
"template_topic",
metavar="<template_topic>",
help="Topic to use as template.",
autocompletion=list_topics,
)
@click.option(
"-p",
"--partitions",
Expand All @@ -33,7 +41,7 @@
)
@default_options
def create_topic(
state: State, topic_name: str, like: str, partitions: Optional[int], replication_factor: Optional[int]
state: State, topic_name: str, template_topic: str, partitions: Optional[int], replication_factor: Optional[int]
):
"""Create a topic.
Expand All @@ -46,25 +54,16 @@ def create_topic(
if topic_controller.topic_exists(topic_name):
raise ValidationException(f"Topic {topic_name!r} already exists.")

if like:
template_config = topic_controller.get_cluster_topic(like)
partitions = template_config.num_partitions if partitions is None else partitions
replication_factor = template_config.replication_factor if replication_factor is None else replication_factor
config = template_config.config
if template_topic:
topic = topic_from_template(template_topic, partitions, replication_factor, topic_controller, topic_name)
else:
if partitions is None:
partitions = state.config.default_num_partitions
if replication_factor is None:
replication_factor = state.config.default_replication_factor
config = None

topic = Topic(topic_name, num_partitions=partitions, replication_factor=replication_factor, config=config)
topic = topic_with_defaults(partitions, replication_factor, state, topic_name)

if not ensure_approval(
f"Create topic {blue_bold(topic.name)} "
+ f"with replication factor {blue_bold(str(topic.replication_factor))} "
+ f"and {blue_bold(str(topic.num_partitions))} partition"
+ ("s" if partitions != 1 else "")
+ ("s" if topic.num_partitions != 1 else "")
+ f" in context {blue_bold(state.config.current_context)}?",
no_verify=state.no_verify,
):
Expand All @@ -73,3 +72,37 @@ def create_topic(

topic_controller.create_topics([topic])
click.echo(click.style(f"Topic with '{topic.name}' successfully created.", fg="green"))


def topic_with_defaults(
partitions: Optional[int], replication_factor: Optional[int], state: State, topic_name: str
) -> Topic:
if partitions is None:
partitions = state.config.default_num_partitions

if replication_factor is None:
replication_factor = state.config.default_replication_factor

topic = Topic(topic_name, num_partitions=partitions, replication_factor=replication_factor)
return topic


def topic_from_template(
template_topic: str,
partitions: Optional[int],
replication_factor: Optional[int],
topic_controller: TopicController,
topic_name: str,
) -> Topic:
template_config = topic_controller.get_cluster_topic(template_topic)

if partitions is None:
partitions = template_config.num_partitions

if replication_factor is None:
replication_factor = template_config.replication_factor

config = template_config.config

topic = Topic(topic_name, num_partitions=partitions, replication_factor=replication_factor, config=config)
return topic

0 comments on commit 4d626d7

Please sign in to comment.