Skip to content

Commit

Permalink
check if topic already exists
Browse files Browse the repository at this point in the history
  • Loading branch information
Swen committed Sep 29, 2021
1 parent d3ba5c8 commit 139d015
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 1 deletion.
3 changes: 3 additions & 0 deletions esque/cli/commands/create/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from esque.cli.helpers import ensure_approval, fallback_to_stdin
from esque.cli.options import State, default_options
from esque.cli.output import blue_bold
from esque.errors import ValidationException
from esque.resources.topic import Topic


Expand Down Expand Up @@ -42,6 +43,8 @@ def create_topic(
or <replication-factor> takes precedence over corresponding attributes of <template_topic>.
"""
topic_controller = state.cluster.topic_controller
if topic_controller.topic_exists(topic_name):
raise ValidationException(f"Topic {topic_name!r} already exists.")

if like:
template_config = topic_controller.get_cluster_topic(like)
Expand Down
19 changes: 18 additions & 1 deletion tests/integration/commands/test_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from esque.cli.commands import esque
from esque.cli.options import State
from esque.errors import NoConfirmationPossibleException
from esque.errors import NoConfirmationPossibleException, ValidationException
from esque.resources.topic import Topic


Expand Down Expand Up @@ -46,6 +46,23 @@ def test_create_topic_as_argument_with_verification_works(
assert topic_id in topics


@pytest.mark.integration
def test_create_existing_topic_fails(
non_interactive_cli_runner: CliRunner,
confluent_admin_client: confluent_kafka.admin.AdminClient,
topic_id: str,
state: State,
):
state.cluster.topic_controller.create_topics([Topic(topic_id, replication_factor=1, num_partitions=1)])

result = non_interactive_cli_runner.invoke(
esque, args=["create", "topic", "--no-verify", topic_id], catch_exceptions=True
)
assert isinstance(result.exception, ValidationException)
assert topic_id in result.exception.message
assert "exists" in result.exception.message.lower()


@pytest.mark.integration
def test_create_topic_with_stdin_works(
non_interactive_cli_runner: CliRunner, confluent_admin_client: confluent_kafka.admin.AdminClient, topic_id: str
Expand Down

0 comments on commit 139d015

Please sign in to comment.