From 5d0e376d098627ce26d7fce58638e9ea3de84038 Mon Sep 17 00:00:00 2001 From: Jannik Hoffjann Date: Wed, 3 Jul 2019 21:13:35 +0200 Subject: [PATCH 1/3] Adds in-place edit functionality --- esque/cli/commands.py | 27 +++++++++++++++++++++++---- esque/cli/output.py | 2 +- esque/resource.py | 16 ++++++++++++++++ esque/topic.py | 22 ++++++++++++++++------ 4 files changed, 56 insertions(+), 11 deletions(-) create mode 100644 esque/resource.py diff --git a/esque/cli/commands.py b/esque/cli/commands.py index 54d02d3e..c587c08a 100644 --- a/esque/cli/commands.py +++ b/esque/cli/commands.py @@ -9,7 +9,7 @@ from esque.broker import Broker from esque.cli.helpers import ensure_approval from esque.cli.options import State, no_verify_option, pass_state -from esque.cli.output import bold, pretty, get_output_topic_diffs, get_output_new_topics +from esque.cli.output import bold, pretty, pretty_topic_diffs, get_output_new_topics from esque.clients import Consumer, Producer from esque.cluster import Cluster from esque.config import PING_TOPIC, Config @@ -48,6 +48,11 @@ def delete(): pass +@esque.group(help="Edit a resource") +def edit(): + pass + + # TODO: Figure out how to pass the state object def list_topics(ctx, args, incomplete): cluster = Cluster() @@ -66,6 +71,20 @@ def list_contexts(ctx, args, incomplete): ] +@edit.command("topic") +@click.argument("topic-name", required=True) +@pass_state +def edit_topic(state: State, topic_name: str): + controller = TopicController(state.cluster) + topic = TopicController(state.cluster).get_topic(topic_name) + new_conf = click.edit(topic.to_yaml()) + topic.from_yaml(new_conf) + diff = pretty_topic_diffs({topic_name: topic.config_diff()}) + click.echo(diff) + if ensure_approval("Are you sure?"): + controller.alter_configs([topic]) + + @esque.command("ctx", help="Switch clusters.") @click.argument("context", required=False, default=None, autocompletion=list_contexts) @pass_state @@ -87,7 +106,7 @@ def ctx(state, context): @click.argument("topic-name", required=True) @no_verify_option @pass_state -def create_topic(state: State, topic_name): +def create_topic(state: State, topic_name: str): if ensure_approval("Are you sure?", no_verify=state.no_verify): topic_controller = TopicController(state.cluster) TopicController(state.cluster).create_topics( @@ -121,7 +140,7 @@ def apply(state: State, file: str): } if len(topic_config_diffs) > 0: - click.echo(get_output_topic_diffs(topic_config_diffs)) + click.echo(pretty_topic_diffs(topic_config_diffs)) if ensure_approval( "Are you sure to change configs?", no_verify=state.no_verify ): @@ -255,7 +274,7 @@ def get_consumergroups(state): @get.command("topics") @click.argument("topic", required=False, type=click.STRING, autocompletion=list_topics) @pass_state -def get_topics(state, topic): +def get_topics(state, topic, o): topics = TopicController(state.cluster).list_topics(search_string=topic) for topic in topics: click.echo(topic.name) diff --git a/esque/cli/output.py b/esque/cli/output.py index b11da850..5301414a 100644 --- a/esque/cli/output.py +++ b/esque/cli/output.py @@ -124,7 +124,7 @@ def pretty_duration(value: Any, *, multiplier: int = 1) -> str: return pendulum.duration(milliseconds=value).in_words() -def get_output_topic_diffs( +def pretty_topic_diffs( topics_config_diff: Dict[str, Dict[str, Tuple[str, str]]] ) -> str: output = [] diff --git a/esque/resource.py b/esque/resource.py new file mode 100644 index 00000000..e3a0c602 --- /dev/null +++ b/esque/resource.py @@ -0,0 +1,16 @@ +from typing import Dict, Union + +import yaml + + +class Resource: + def as_dict(self) -> Dict[str, Union[str, Dict[str, str]]]: + pass + + def to_yaml(self) -> str: + return yaml.dump(self.as_dict()) + + def from_yaml(self, data) -> None: + new_values = yaml.safe_load(data) + for attr, value in new_values.items(): + setattr(self, attr, value) diff --git a/esque/topic.py b/esque/topic.py index 93ce6083..3ba2f714 100644 --- a/esque/topic.py +++ b/esque/topic.py @@ -1,5 +1,5 @@ import re -from typing import Dict, List, Tuple +from typing import Dict, List, Tuple, Union import pykafka from confluent_kafka.admin import ConfigResource @@ -12,9 +12,10 @@ invalidate_cache_after, unpack_confluent_config, ) +from esque.resource import Resource -class Topic: +class Topic(Resource): def __init__( self, name: str, @@ -33,6 +34,13 @@ def __init__( ) self.config = config if config is not None else {} + def as_dict(self) -> Dict[str, Union[int, Dict[str, str]]]: + return { + "num_partitions": self.num_partitions, + "replication_factor": self.replication_factor, + "config": self._retrieve_kafka_config() + } + @property def _pykafka_topic(self) -> pykafka.Topic: if not self._pykafka_topic_instance: @@ -77,10 +85,13 @@ def get_offsets(self) -> Dict[int, Tuple[int, int]]: for partition_id in partitions } + def _retrieve_kafka_config(self): + conf = self.cluster.retrieve_config(ConfigResource.Type.TOPIC, self.name) + return unpack_confluent_config(conf) + @raise_for_kafka_exception def config_diff(self) -> Dict[str, Tuple[str, str]]: - conf = self.cluster.retrieve_config(ConfigResource.Type.TOPIC, self.name) - config_list = unpack_confluent_config(conf) + config_list = self._retrieve_kafka_config() return { name: [str(value), str(self.config.get(name))] for name, value in config_list.items() @@ -107,8 +118,7 @@ def describe(self): for partition, partition_meta in t.partitions.items() ] - conf = self.cluster.retrieve_config(ConfigResource.Type.TOPIC, self.name) - conf = unpack_confluent_config(conf) + conf = self._retrieve_kafka_config() return replicas, {"Config": conf} From 50c3ed7558e769f21515d7392e63a8565ec456c5 Mon Sep 17 00:00:00 2001 From: Jannik Hoffjann Date: Wed, 3 Jul 2019 21:15:07 +0200 Subject: [PATCH 2/3] Remove resource definition --- esque/resource.py | 16 ---------------- esque/topic.py | 13 ++++++++++--- 2 files changed, 10 insertions(+), 19 deletions(-) delete mode 100644 esque/resource.py diff --git a/esque/resource.py b/esque/resource.py deleted file mode 100644 index e3a0c602..00000000 --- a/esque/resource.py +++ /dev/null @@ -1,16 +0,0 @@ -from typing import Dict, Union - -import yaml - - -class Resource: - def as_dict(self) -> Dict[str, Union[str, Dict[str, str]]]: - pass - - def to_yaml(self) -> str: - return yaml.dump(self.as_dict()) - - def from_yaml(self, data) -> None: - new_values = yaml.safe_load(data) - for attr, value in new_values.items(): - setattr(self, attr, value) diff --git a/esque/topic.py b/esque/topic.py index 3ba2f714..4c37565f 100644 --- a/esque/topic.py +++ b/esque/topic.py @@ -12,10 +12,9 @@ invalidate_cache_after, unpack_confluent_config, ) -from esque.resource import Resource -class Topic(Resource): +class Topic: def __init__( self, name: str, @@ -38,9 +37,17 @@ def as_dict(self) -> Dict[str, Union[int, Dict[str, str]]]: return { "num_partitions": self.num_partitions, "replication_factor": self.replication_factor, - "config": self._retrieve_kafka_config() + "config": self._retrieve_kafka_config(), } + def to_yaml(self) -> str: + return yaml.dump(self.as_dict()) + + def from_yaml(self, data) -> None: + new_values = yaml.safe_load(data) + for attr, value in new_values.items(): + setattr(self, attr, value) + @property def _pykafka_topic(self) -> pykafka.Topic: if not self._pykafka_topic_instance: From 57a02de7ff7395bf63a8d9bd91f6b0412380832a Mon Sep 17 00:00:00 2001 From: Jannik Hoffjann Date: Wed, 3 Jul 2019 21:17:17 +0200 Subject: [PATCH 3/3] Fix missing yaml package --- esque/topic.py | 1 + 1 file changed, 1 insertion(+) diff --git a/esque/topic.py b/esque/topic.py index 4c37565f..9a31127a 100644 --- a/esque/topic.py +++ b/esque/topic.py @@ -2,6 +2,7 @@ from typing import Dict, List, Tuple, Union import pykafka +import yaml from confluent_kafka.admin import ConfigResource from confluent_kafka.cimpl import NewTopic