Skip to content

Commit

Permalink
Merge branch 'master' into add-more-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
hfjn committed Sep 18, 2019
2 parents ad179f6 + 7b10ecb commit 7b58c64
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 15 deletions.
12 changes: 10 additions & 2 deletions esque/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,22 @@ def ctx(state, context):
@create.command("topic")
@click.argument("topic-name", required=True)
@no_verify_option
@click.option("-l", "--like", help="Topic to use as template", required=False)
@pass_state
def create_topic(state: State, topic_name: str):
def create_topic(state: State, topic_name: str, like=None):
if not ensure_approval("Are you sure?", no_verify=state.no_verify):
click.echo("Aborted")
return

topic_controller = state.cluster.topic_controller
topic_controller.create_topics([Topic(topic_name)])
if like:
template_config = topic_controller.get_cluster_topic(like)
topic = Topic(
topic_name, template_config.num_partitions, template_config.replication_factor, template_config.config
)
else:
topic = Topic(topic_name)
topic_controller.create_topics([topic])


@edit.command("topic")
Expand Down
4 changes: 2 additions & 2 deletions esque/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from confluent_kafka.admin import AdminClient, ConfigResource

from esque.config import Config
from esque.helpers import ensure_kafka_futures_done, unpack_confluent_config
from esque.helpers import ensure_kafka_future_done, unpack_confluent_config
from esque.topic_controller import TopicController


Expand Down Expand Up @@ -43,6 +43,6 @@ def retrieve_config(self, config_type: ConfigResource.Type, id):
requested_resources = [ConfigResource(config_type, str(id))]
futures = self.confluent_client.describe_configs(requested_resources)
(old_resource, future), = futures.items()
future = ensure_kafka_futures_done([future])
future = ensure_kafka_future_done(future)
result = future.result()
return unpack_confluent_config(result)
5 changes: 5 additions & 0 deletions esque/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ class ContextNotDefinedException(Exception):
pass


class FutureTimeoutException(Exception):
def __init__(self, message):
self.message = message


class MessageEmptyException(KafkaException):
def __init__(self):
super().__init__(-185, None)
Expand Down
13 changes: 7 additions & 6 deletions esque/helpers.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import functools
from concurrent.futures import Future, wait
from itertools import islice
from typing import List

import click
import pendulum
from confluent_kafka.cimpl import KafkaError, Message

from esque.errors import raise_for_kafka_error
from esque.errors import raise_for_kafka_error, FutureTimeoutException


def invalidate_cache_before(func):
Expand All @@ -29,12 +28,14 @@ def wrapper(self, *args, **kwargs):
return wrapper


def ensure_kafka_futures_done(future: List[Future]) -> Future:
def ensure_kafka_future_done(future: Future, timeout: int = 60 * 5) -> Future:
# Clients, such as confluents AdminClient, may return a done future with an exception
done, failed = wait(future, timeout=15)
assert len(failed) + len(done) == 1
done, not_done = wait({future}, timeout=timeout)

result = next(islice(done, 1)) if len(done) == 1 else next(islice(failed, 1))
if not_done:
raise FutureTimeoutException("Future timed out after {} seconds".format(timeout))

result = next(islice(done, 1))

exception = result.exception()

Expand Down
11 changes: 6 additions & 5 deletions esque/topic_controller.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import re
from itertools import islice
from enum import Enum
from typing import List, TYPE_CHECKING, Union

Expand All @@ -8,7 +9,7 @@

from esque.config import Config
from esque.errors import raise_for_kafka_exception
from esque.helpers import invalidate_cache_after, ensure_kafka_futures_done
from esque.helpers import invalidate_cache_after, ensure_kafka_future_done
from esque.topic import Topic, PartitionInfo, Partition, TopicDiff

if TYPE_CHECKING:
Expand Down Expand Up @@ -71,22 +72,22 @@ def create_topics(self, topics: List[Topic]):
new_topic = NewTopic(
topic.name, num_partitions=partitions, replication_factor=replicas, config=topic.config
)
future_list = self.cluster.confluent_client.create_topics([new_topic])
ensure_kafka_futures_done(list(future_list.values()))
future_list = self.cluster.confluent_client.create_topics([new_topic], operation_timeout=60)
ensure_kafka_future_done(next(islice(future_list.values(), 1)))

@raise_for_kafka_exception
@invalidate_cache_after
def alter_configs(self, topics: List[Topic]):
for topic in topics:
config_resource = ConfigResource(ConfigResource.Type.TOPIC, topic.name, topic.config)
future_list = self.cluster.confluent_client.alter_configs([config_resource])
ensure_kafka_futures_done(list(future_list.values()))
ensure_kafka_future_done(next(islice(future_list.values(), 1)))

@raise_for_kafka_exception
@invalidate_cache_after
def delete_topic(self, topic: Topic):
future = self.cluster.confluent_client.delete_topics([topic.name])[topic.name]
ensure_kafka_futures_done([future])
ensure_kafka_future_done(future)

def get_cluster_topic(self, topic_name: str) -> Topic:
"""Convenience function getting an existing topic based on topic_name"""
Expand Down
5 changes: 5 additions & 0 deletions tests/integration/test_topic_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@
from esque.topic_controller import TopicController


@pytest.fixture()
def state(test_config):
yield State()


@pytest.fixture()
def topic_controller(cluster):
yield cluster.topic_controller
Expand Down

0 comments on commit 7b58c64

Please sign in to comment.