Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Show consumergroup in describe topic #89

Merged
merged 13 commits into from Oct 30, 2019
23 changes: 22 additions & 1 deletion esque/cli/commands.py
Expand Up @@ -20,6 +20,7 @@
pretty_new_topic_configs,
pretty_topic_diffs,
pretty_unchanged_topic_configs,
red_bold,
)
from esque.clients.consumer import AvroFileConsumer, FileConsumer, PingConsumer
from esque.clients.producer import AvroFileProducer, FileProducer, PingProducer
Expand Down Expand Up @@ -240,10 +241,19 @@ def apply(state: State, file: str):
@click.argument(
"topic-name", callback=fallback_to_stdin, required=False, type=click.STRING, autocompletion=list_topics
)
@click.option(
"--consumers",
"-C",
required=False,
is_flag=True,
default=False,
help=f"Will output the consumergroups reading from this topic. "
f"{red_bold('Beware! This can be a really expensive operation.')}",
)
@output_format_option
@error_handler
@pass_state
def describe_topic(state: State, topic_name: str, output_format: str):
def describe_topic(state: State, topic_name: str, consumers: bool, output_format: str):
topic = state.cluster.topic_controller.get_cluster_topic(topic_name)

output_dict = {
Expand All @@ -252,6 +262,17 @@ def describe_topic(state: State, topic_name: str, output_format: str):
"config": topic.config,
}

if consumers:
consumergroup_controller = ConsumerGroupController(state.cluster)
groups = consumergroup_controller.list_consumer_groups()

consumergroups = [
group_name
for group_name in groups
if topic_name in consumergroup_controller.get_consumergroup(group_name).topics
]

output_dict["consumergroups"] = consumergroups
click.echo(format_output(output_dict, output_format))

hfjn marked this conversation as resolved.
Show resolved Hide resolved

Expand Down
4 changes: 4 additions & 0 deletions esque/cli/output.py
Expand Up @@ -194,6 +194,10 @@ def green_bold(s: str) -> str:
return bold(click.style(s, fg="green"))


def red_bold(s: str) -> str:
return bold(click.style(s, fg="red"))


STYLE_MAPPING = {
"topic": green_bold,
"cleanup.policy": bold,
Expand Down
30 changes: 27 additions & 3 deletions esque/resources/consumergroup.py
@@ -1,3 +1,4 @@
import struct
from typing import Any, Dict, List, Optional, cast

import pykafka
Expand Down Expand Up @@ -26,6 +27,29 @@ def _pykafka_group_coordinator(self) -> pykafka.Broker:

return self._pykafka_group_coordinator_instance

@property
def topics(self):
consumer_id = self.id.encode("UTF-8")

# Get Consumers who already have an offset
consumer_offsets = self._unpack_offset_response(
self._pykafka_group_coordinator.fetch_consumer_group_offsets(consumer_id, preqs=[])
)
topic_with_offsets = set(topic.decode("UTF-8") for topic in consumer_offsets.keys())

topics_with_members = set()
# Get Consumers which have a member
try:
resp = self._pykafka_group_coordinator.describe_groups([consumer_id])
meta = self._unpack_consumer_group_response(resp.groups[consumer_id])
topics_with_members = set(
member["member_metadata"]["subscription"][0].decode("UTF-8") for member in meta["members"]
)
except struct.error:
pass

return list(topic_with_offsets | topics_with_members)

def describe(self, *, verbose=False):
consumer_id = self.id.encode("UTF-8")
if consumer_id not in self._pykafka_group_coordinator.list_groups().groups:
Expand All @@ -35,7 +59,7 @@ def describe(self, *, verbose=False):
assert len(resp.groups) == 1

meta = self._unpack_consumer_group_response(resp.groups[consumer_id])
consumer_offsets = self._get_consumer_offsets(self._pykafka_group_coordinator, consumer_id, verbose=verbose)
consumer_offsets = self._get_consumer_offsets(consumer_id, verbose=verbose)

return {
"group_id": consumer_id,
Expand All @@ -44,9 +68,9 @@ def describe(self, *, verbose=False):
"meta": meta,
}

def _get_consumer_offsets(self, group_coordinator, consumer_id, verbose: bool):
def _get_consumer_offsets(self, consumer_id, verbose: bool):
consumer_offsets = self._unpack_offset_response(
group_coordinator.fetch_consumer_group_offsets(consumer_id, preqs=[])
self._pykafka_group_coordinator.fetch_consumer_group_offsets(consumer_id, preqs=[])
)

if verbose:
Expand Down
21 changes: 19 additions & 2 deletions tests/integration/commands/test_describe.py
@@ -1,10 +1,11 @@
from typing import Union, Callable
from typing import Callable, Union

import pytest
from click import MissingParameter
from click.testing import CliRunner

from esque.cli.commands import describe_topic, describe_broker
from esque.cli.commands import describe_broker, describe_topic
from esque.resources.topic import Topic
from tests.conftest import parameterized_output_formats

VARIOUS_IMPORTANT_BROKER_OPTIONS = [
Expand Down Expand Up @@ -109,3 +110,19 @@ def check_described_broker(described_broker: Union[str, dict]):
keys = described_broker.keys()
for option in VARIOUS_IMPORTANT_BROKER_OPTIONS:
assert option in keys


@pytest.mark.integration
hfjn marked this conversation as resolved.
Show resolved Hide resolved
@parameterized_output_formats
def test_describe_topic_consumergroup_in_output(
non_interactive_cli_runner: CliRunner,
filled_topic: Topic,
partly_read_consumer_group: str,
output_format: str,
loader: Callable,
):
result = non_interactive_cli_runner.invoke(describe_topic, ["-o", output_format, "-C", filled_topic.name])
assert result.exit_code == 0
output_dict = loader(result.output)

assert partly_read_consumer_group in output_dict.get("consumergroups", None)