Skip to content

Commit

Permalink
Just moved some stuff around because it felt more logical (#52)
Browse files Browse the repository at this point in the history
* Just moved some stuff around because it felt more logical

* Fixed move bug

* Fixed tests

* Move config.py to __init__.py

* Moved consumer group controller
  • Loading branch information
hfjn committed Sep 19, 2019
1 parent 7b10ecb commit 7129c8a
Show file tree
Hide file tree
Showing 28 changed files with 279 additions and 261 deletions.
19 changes: 10 additions & 9 deletions esque/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,25 @@
from click import version_option

from esque.__version__ import __version__
from esque.broker import Broker
from esque.cli.helpers import ensure_approval, HandleFileOnFinished
from esque.cli.helpers import HandleFileOnFinished, ensure_approval
from esque.cli.options import State, no_verify_option, pass_state
from esque.cli.output import (
blue_bold,
bold,
green_bold,
pretty,
pretty_topic_diffs,
pretty_new_topic_configs,
blue_bold,
green_bold,
pretty_topic_diffs,
pretty_unchanged_topic_configs,
)
from esque.clients import FileConsumer, FileProducer, AvroFileProducer, AvroFileConsumer, PingConsumer, PingProducer
from esque.clients.consumer import AvroFileConsumer, FileConsumer, PingConsumer
from esque.clients.producer import AvroFileProducer, FileProducer, PingProducer
from esque.cluster import Cluster
from esque.config import PING_TOPIC, Config, PING_GROUP_ID, config_dir, sample_config_path, config_path
from esque.consumergroup import ConsumerGroupController
from esque.topic import Topic
from esque.config import PING_TOPIC, PING_GROUP_ID, config_dir, config_path, sample_config_path, Config
from esque.errors import ConsumerGroupDoesNotExistException, ContextNotDefinedException, TopicAlreadyExistsException
from esque.resources.broker import Broker
from esque.controller.consumergroup_controller import ConsumerGroupController
from esque.resources.topic import Topic


@click.group(help="esque - an operational kafka tool.", invoke_without_command=True)
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion esque/cli/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from esque.cli.helpers import ensure_approval
from esque.cluster import Cluster
from esque.config import Config, config_dir, config_path, sample_config_path
from esque.config import config_dir, config_path, sample_config_path, Config
from esque.errors import ConfigNotExistsException


Expand Down
4 changes: 2 additions & 2 deletions esque/cli/output.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from collections import OrderedDict
from functools import partial
from typing import Any, List, MutableMapping, Dict
from typing import Any, Dict, List, MutableMapping

import click
import pendulum

from esque.topic import Topic, TopicDiff
from esque.resources.topic import Topic, TopicDiff

MILLISECONDS_PER_YEAR = 1000 * 3600 * 24 * 365

Expand Down
200 changes: 0 additions & 200 deletions esque/clients.py

This file was deleted.

Empty file added esque/clients/__init__.py
Empty file.
106 changes: 106 additions & 0 deletions esque/clients/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import pathlib
from abc import ABC, abstractmethod
from contextlib import ExitStack
from typing import Optional, Tuple

import confluent_kafka
import pendulum
from confluent_kafka.cimpl import Message

from esque.clients.schemaregistry import SchemaRegistryClient
from esque.config import Config
from esque.errors import MessageEmptyException, raise_for_kafka_error, raise_for_message
from esque.messages.avromessage import AvroFileWriter
from esque.messages.message import FileWriter, PlainTextFileWriter


class AbstractConsumer(ABC):
def __init__(self, group_id: str, topic_name: str, last: bool):
offset_reset = "earliest"
if last:
offset_reset = "latest"

self._config = Config().create_confluent_config()
self._config.update(
{
"group.id": group_id,
"error_cb": raise_for_kafka_error,
# We need to commit offsets manually once we"re sure it got saved
# to the sink
"enable.auto.commit": True,
"enable.partition.eof": False,
# We need this to start at the last committed offset instead of the
# latest when subscribing for the first time
"default.topic.config": {"auto.offset.reset": offset_reset},
}
)
self._consumer = confluent_kafka.Consumer(self._config)
self._subscribe(topic_name)

def _subscribe(self, topic: str) -> None:
self._consumer.subscribe([topic])

@abstractmethod
def consume(self, amount: int) -> int:
pass

def _consume_single_message(self, timeout=30) -> Optional[Message]:
message = self._consumer.poll(timeout=timeout)
raise_for_message(message)
return message


class PingConsumer(AbstractConsumer):
def consume(self, amount: int) -> Optional[Tuple[str, int]]:
message = self._consume_single_message()

msg_sent_at = pendulum.from_timestamp(float(message.value()))
delta_sent = pendulum.now() - msg_sent_at
return message.key(), delta_sent.microseconds / 1000


class FileConsumer(AbstractConsumer):
def __init__(self, group_id: str, topic_name: str, working_dir: pathlib.Path, last: bool):
super().__init__(group_id, topic_name, last)
self.working_dir = working_dir
offset_reset = "earliest"
if last:
offset_reset = "latest"

self._config.update({"default.topic.config": {"auto.offset.reset": offset_reset}})
self._consumer = confluent_kafka.Consumer(self._config)
self._subscribe(topic_name)

def consume(self, amount: int) -> int:
counter = 0
file_writers = {}
with ExitStack() as stack:
while counter < amount:
try:
message = self._consume_single_message()
except MessageEmptyException:
return counter

if message.partition() not in file_writers:
partition = message.partition()
file_writer = self.get_file_writer(partition)
stack.enter_context(file_writer)
file_writers[partition] = file_writer

file_writer = file_writers[partition]
file_writer.write_message_to_file(message)
counter += 1

return counter

def get_file_writer(self, partition: int) -> FileWriter:
return PlainTextFileWriter((self.working_dir / f"partition_{partition}"))


class AvroFileConsumer(FileConsumer):
def __init__(self, group_id: str, topic_name: str, working_dir: pathlib.Path, last: bool):
super().__init__(group_id, topic_name, working_dir, last)
self.schema_registry_client = SchemaRegistryClient(Config().schema_registry)

def get_file_writer(self, partition: int) -> FileWriter:
return AvroFileWriter((self.working_dir / f"partition_{partition}"), self.schema_registry_client)
Loading

0 comments on commit 7129c8a

Please sign in to comment.