Skip to content

Commit

Permalink
Merge 57a02de into 71e92a3
Browse files Browse the repository at this point in the history
  • Loading branch information
hfjn committed Jul 3, 2019
2 parents 71e92a3 + 57a02de commit 02c584c
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 10 deletions.
27 changes: 23 additions & 4 deletions esque/cli/commands.py
Expand Up @@ -9,7 +9,7 @@
from esque.broker import Broker
from esque.cli.helpers import ensure_approval
from esque.cli.options import State, no_verify_option, pass_state
from esque.cli.output import bold, pretty, get_output_topic_diffs, get_output_new_topics
from esque.cli.output import bold, pretty, pretty_topic_diffs, get_output_new_topics
from esque.clients import Consumer, Producer
from esque.cluster import Cluster
from esque.config import PING_TOPIC, Config
Expand Down Expand Up @@ -48,6 +48,11 @@ def delete():
pass


@esque.group(help="Edit a resource")
def edit():
pass


# TODO: Figure out how to pass the state object
def list_topics(ctx, args, incomplete):
cluster = Cluster()
Expand All @@ -66,6 +71,20 @@ def list_contexts(ctx, args, incomplete):
]


@edit.command("topic")
@click.argument("topic-name", required=True)
@pass_state
def edit_topic(state: State, topic_name: str):
controller = TopicController(state.cluster)
topic = TopicController(state.cluster).get_topic(topic_name)
new_conf = click.edit(topic.to_yaml())
topic.from_yaml(new_conf)
diff = pretty_topic_diffs({topic_name: topic.config_diff()})
click.echo(diff)
if ensure_approval("Are you sure?"):
controller.alter_configs([topic])


@esque.command("ctx", help="Switch clusters.")
@click.argument("context", required=False, default=None, autocompletion=list_contexts)
@pass_state
Expand All @@ -87,7 +106,7 @@ def ctx(state, context):
@click.argument("topic-name", required=True)
@no_verify_option
@pass_state
def create_topic(state: State, topic_name):
def create_topic(state: State, topic_name: str):
if ensure_approval("Are you sure?", no_verify=state.no_verify):
topic_controller = TopicController(state.cluster)
TopicController(state.cluster).create_topics(
Expand Down Expand Up @@ -121,7 +140,7 @@ def apply(state: State, file: str):
}

if len(topic_config_diffs) > 0:
click.echo(get_output_topic_diffs(topic_config_diffs))
click.echo(pretty_topic_diffs(topic_config_diffs))
if ensure_approval(
"Are you sure to change configs?", no_verify=state.no_verify
):
Expand Down Expand Up @@ -255,7 +274,7 @@ def get_consumergroups(state):
@get.command("topics")
@click.argument("topic", required=False, type=click.STRING, autocompletion=list_topics)
@pass_state
def get_topics(state, topic):
def get_topics(state, topic, o):
topics = TopicController(state.cluster).list_topics(search_string=topic)
for topic in topics:
click.echo(topic.name)
Expand Down
2 changes: 1 addition & 1 deletion esque/cli/output.py
Expand Up @@ -124,7 +124,7 @@ def pretty_duration(value: Any, *, multiplier: int = 1) -> str:
return pendulum.duration(milliseconds=value).in_words()


def get_output_topic_diffs(
def pretty_topic_diffs(
topics_config_diff: Dict[str, Dict[str, Tuple[str, str]]]
) -> str:
output = []
Expand Down
28 changes: 23 additions & 5 deletions esque/topic.py
@@ -1,7 +1,8 @@
import re
from typing import Dict, List, Tuple
from typing import Dict, List, Tuple, Union

import pykafka
import yaml
from confluent_kafka.admin import ConfigResource
from confluent_kafka.cimpl import NewTopic

Expand Down Expand Up @@ -33,6 +34,21 @@ def __init__(
)
self.config = config if config is not None else {}

def as_dict(self) -> Dict[str, Union[int, Dict[str, str]]]:
return {
"num_partitions": self.num_partitions,
"replication_factor": self.replication_factor,
"config": self._retrieve_kafka_config(),
}

def to_yaml(self) -> str:
return yaml.dump(self.as_dict())

def from_yaml(self, data) -> None:
new_values = yaml.safe_load(data)
for attr, value in new_values.items():
setattr(self, attr, value)

@property
def _pykafka_topic(self) -> pykafka.Topic:
if not self._pykafka_topic_instance:
Expand Down Expand Up @@ -77,10 +93,13 @@ def get_offsets(self) -> Dict[int, Tuple[int, int]]:
for partition_id in partitions
}

def _retrieve_kafka_config(self):
conf = self.cluster.retrieve_config(ConfigResource.Type.TOPIC, self.name)
return unpack_confluent_config(conf)

@raise_for_kafka_exception
def config_diff(self) -> Dict[str, Tuple[str, str]]:
conf = self.cluster.retrieve_config(ConfigResource.Type.TOPIC, self.name)
config_list = unpack_confluent_config(conf)
config_list = self._retrieve_kafka_config()
return {
name: [str(value), str(self.config.get(name))]
for name, value in config_list.items()
Expand All @@ -107,8 +126,7 @@ def describe(self):
for partition, partition_meta in t.partitions.items()
]

conf = self.cluster.retrieve_config(ConfigResource.Type.TOPIC, self.name)
conf = unpack_confluent_config(conf)
conf = self._retrieve_kafka_config()

return replicas, {"Config": conf}

Expand Down

0 comments on commit 02c584c

Please sign in to comment.