diff --git a/.flake8 b/.flake8 index 52c1e80b..cc0c6d75 100644 --- a/.flake8 +++ b/.flake8 @@ -1,5 +1,5 @@ [flake8] ignore = E203, E266, E501, W503 -max-line-length = 80 +max-line-length = 119 max-complexity = 18 select = B,C,E,F,W,T4,B9 \ No newline at end of file diff --git a/.travis.yml b/.travis.yml index 6dd56648..3a4787e2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,14 +2,20 @@ dist: xenial language: python cache: pip env: -- ESQUE_ENV=dev TEST_CMD="pytest tests/ --integration --cov=esque --local" +- > + ESQUE_ENV=dev TEST_CMD="pytest tests/ --integration --cov=esque --local" + BROKER_URL="localhost:9092" + ZOOKEEPER_URL="localhost:2181" + SCHEMA_REGISTRY_URL="localhost:8081" before_install: -- wget https://mirror.netcologne.de/apache.org/kafka/2.2.0/kafka_2.12-2.2.0.tgz -O kafka.tgz -- mkdir -p kafka && tar xzf kafka.tgz -C kafka --strip-components 1 -- nohup bash -c "cd kafka && bin/zookeeper-server-start.sh config/zookeeper.properties &" -- nohup bash -c "cd kafka && bin/kafka-server-start.sh config/server.properties &" -- scripts/wait-for-it.sh localhost:9092 -- scripts/wait-for-it.sh localhost:2181 +- wget http://packages.confluent.io/archive/5.2/confluent-community-5.2.1-2.12.tar.gz -O confluent-community.tgz +- mkdir -p confluent-community && tar xzf confluent-community.tgz -C confluent-community --strip-components 1 +- confluent-community/bin/zookeeper-server-start -daemon confluent-community/etc/kafka/zookeeper.properties +- bash -c "scripts/wait-for-it.sh ${ZOOKEEPER_URL} -t 60" +- confluent-community/bin/kafka-server-start -daemon confluent-community/etc/kafka/server.properties +- bash -c "scripts/wait-for-it.sh ${BROKER_URL} -t 60" +- confluent-community/bin/schema-registry-start -daemon confluent-community/etc/schema-registry/schema-registry.properties +- bash -c "scripts/wait-for-it.sh ${SCHEMA_REGISTRY_URL} -t 60" install: - pip install coverage coveralls flake8 pipenv - pipenv install --system --dev --deploy @@ -26,11 +32,13 @@ notifications: matrix: include: - name: black + before_install: [] python: 3.7 env: - TEST_CMD="black --check --verbose ." - name: flake8 python: 3.7 + before_install: [] env: - TEST_CMD="flake8 esque/" - name: '3.6' diff --git a/Pipfile.lock b/Pipfile.lock index 3b05e3dd..5d06845f 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -38,6 +38,12 @@ ], "version": "==19.1.0" }, + "avro-python3": { + "hashes": [ + "sha256:6163c1507621f5bdafab3370df7f82166c3c7ba53763a5637313639dcef02ae9" + ], + "version": "==1.9.0" + }, "black": { "hashes": [ "sha256:09a9dcb7c46ed496a9850b76e4e825d6049ecd38b611f1224857a79bd985a8cf", @@ -45,6 +51,20 @@ ], "version": "==19.3b0" }, + "certifi": { + "hashes": [ + "sha256:046832c04d4e752f37383b628bc601a7ea7211496b4638f6514d0e5b9acc4939", + "sha256:945e3ba63a0b9f577b1395204e13c3a231f9bc0223888be653286534e5873695" + ], + "version": "==2019.6.16" + }, + "chardet": { + "hashes": [ + "sha256:84ab92ed1c4d4f16916e05906b6b75a6c0fb5db821cc65e70cbd64a3e2a5eaae", + "sha256:fc323ffcaeaed0e0a02bf4d117757b98aed530d9ed4531e3e15460124c106691" + ], + "version": "==3.0.4" + }, "click": { "hashes": [ "sha256:2335065e6395b9e67ca716de5f7526736bfa6ceead690adf616d925bdc622b13", @@ -122,12 +142,42 @@ ], "version": "==0.3" }, + "fastavro": { + "hashes": [ + "sha256:02e5e2a1a6cd8b94703071a3afa1968ddb8f0dadc889ef56160773ac6029e066", + "sha256:14b35cf46ff86d26ecb998afe98c984c3520a2191f5db67e624a2aab7dc22808", + "sha256:231332c5ef48c06e44bef6e993abea76182efb88134cfdad72cb02b0a01bc75f", + "sha256:3bb0d71087a3e52c76deb179bb6c4363ed6f8c4e671b445e605bf7652f551b32", + "sha256:6888f376183a5f28ad9acbd1ed57ba260c9e4d10eef5c29aa90a89c9ad88fe9d", + "sha256:6b6d9c338ff95e113ac53661dcc005cf65641336c259125bb448c3ccd5b33017", + "sha256:88bc5bfe8cb2c39d79805b9af06165d6ecccbd70fb27753ab65ecc77b65bbc08", + "sha256:8bd421c541c196f16053beffbd5e494bdbc6425fe3de9920ddc4208083e86552", + "sha256:9d015ff0dc109daa65f0f5735ed49cf699ce5e68ba99651aab4d1f3351971d0a", + "sha256:a0ad23c8e9b213169d7d4cfac5485d3844f2f4c43a7ad950e46fd3c39972d0e1", + "sha256:a862dcb93f81bd14d03b54506139f04066a96956d06c75c9fd46059443d44f31", + "sha256:abb3cdb8e395d31e90e304090df1f32ca81ed7652eb72eea4730aea0c7d6da5f", + "sha256:b94bb7aa7d0901b6a672b34dcec2eec4b83d3df1a3f09802b16d3d933e0caefd", + "sha256:cd70878dd240a4e1c127e489a707de60afc8ffa8602f994a109ed16f077a30af", + "sha256:d5d716b8f01f206313d2fbb9c82e1e533eeb8cbeb0c8a98db44a971a5faa8143", + "sha256:d6a307c2af45900a06917b41222fe89af6d39dc20d1dff88295095273786de53", + "sha256:dfd0bafd563593e22e285eea63894b8f0d0d7a47b3743ec76d36b0de5d0e4c09", + "sha256:f8af6f955c05ae7f73e409cb73e504e6af3a247219f94520dc3ddbcb3d97e484" + ], + "version": "==0.22.3" + }, "flake8": { "hashes": [ - "sha256:859996073f341f2670741b51ec1e67a01da142831aa1fdc6242dbf88dffbe661", - "sha256:a796a115208f5c03b18f332f7c11729812c8c3ded6c46319c59b53efd3819da8" + "sha256:19241c1cbc971b9962473e4438a2ca19749a7dd002dd1a946eaba171b4114548", + "sha256:8e9dfa3cecb2400b3738a42c54c3043e821682b9c840b0448c0503f781130696" ], - "version": "==3.7.7" + "version": "==3.7.8" + }, + "idna": { + "hashes": [ + "sha256:c357b3f628cf53ae2c4c05627ecc484553142ca23264e593d327bcde5e9c3407", + "sha256:ea8b7f6188e6fa117537c3df7da9fc686d485087abf6ac197f9c46432f7e4a3c" + ], + "version": "==2.8" }, "importlib-metadata": { "hashes": [ @@ -160,11 +210,10 @@ }, "more-itertools": { "hashes": [ - "sha256:2112d2ca570bb7c3e53ea1a35cd5df42bb0fd10c45f0fb97178679c3c03d64c7", - "sha256:c3e4748ba1aad8dba30a4886b0b1a2004f9a863837b8654e7059eebf727afa5a" + "sha256:3ad685ff8512bf6dc5a8b82ebf73543999b657eded8c11803d9ba6b648986f4d", + "sha256:8bb43d1f51ecef60d81854af61a3a880555a14643691cc4b64a6ee269c78f09a" ], - "markers": "python_version > '2.7'", - "version": "==7.0.0" + "version": "==7.1.0" }, "packaging": { "hashes": [ @@ -175,15 +224,14 @@ }, "pendulum": { "hashes": [ - "sha256:0f43d963b27e92b04047ce8352e4c277db99f20d0b513df7d0ceafe674a2f727", - "sha256:14e60d26d7400980123dbb6e3f2a90b70d7c18c63742ffe5bd6d6a643f8c6ef1", - "sha256:5035a4e17504814a679f138374269cc7cc514aeac7ba6d9dc020abc224f25dbc", - "sha256:8c0b3d655c1e9205d4dacf42fffc929cde3b19b5fb544a7f7561e6896eb8a000", - "sha256:bfc7b33ae193a204ec0bec12ad0d2d3300cd7e51d91d992da525ba3b28f0d265", - "sha256:cd70b75800439794e1ad8dbfa24838845e171918df81fa98b68d0d5a6f9b8bf2", - "sha256:cf535d36c063575d4752af36df928882b2e0e31541b4482c97d63752785f9fcb" + "sha256:1cde6e3c6310fb882c98f373795f807cb2bd6af01f34d2857e6e283b5ee91e09", + "sha256:485aef2089defee88607d37d5bc238934d0b90993d7bf9ceb36e481af41e9c66", + "sha256:57801754e05f30e8a7e4d24734c9fad82c6c3ec489151555f0fc66bb32ba6d6d", + "sha256:7ee344bc87cb425b04717b90d14ffde14c1dd64eaa73060b3772edcf57f3e866", + "sha256:c460f4d8dc41ec3c4377ac1807678cd72fe5e973cc2943c104ffdeaac32dacb7", + "sha256:d3078e007315a959989c41cee5cfd63cfeeca21dd3d8295f4bc24199489e9b6c" ], - "version": "==2.0.4" + "version": "==2.0.5" }, "pluggy": { "hashes": [ @@ -228,10 +276,10 @@ }, "pytest": { "hashes": [ - "sha256:4a784f1d4f2ef198fe9b7aef793e9fa1a3b2f84e822d9b3a64a181293a572d45", - "sha256:926855726d8ae8371803f7b2e6ec0a69953d9c6311fa7c3b6c1b929ff92d27da" + "sha256:6ef6d06de77ce2961156013e9dff62f1b2688aa04d0dc244299fe7d67e09370d", + "sha256:a736fed91c12681a7b34617c8fcefe39ea04599ca72c608751c31d89579a3f77" ], - "version": "==4.6.3" + "version": "==5.0.1" }, "pytest-cov": { "hashes": [ @@ -256,10 +304,10 @@ }, "pytzdata": { "hashes": [ - "sha256:778db26940e38cf6547d6574f49375570f7d697970461de531c56cf8400958a3", - "sha256:f0469062f799c66480fcc7eae69a8270dc83f0e6522c0e70db882d6bd708d378" + "sha256:c0c8316eaf6c25ba45816390a1a45c39790767069b3275c5f7de3ddf773eb810", + "sha256:e8a91952afd853642a49f0713caac3e15a5306855ff4a47af4ddec5b7dd23a09" ], - "version": "==2019.1" + "version": "==2019.2" }, "pyyaml": { "hashes": [ @@ -277,6 +325,13 @@ ], "version": "==5.1.1" }, + "requests": { + "hashes": [ + "sha256:11e007a8a2aa0323f5a921e9e6a2d7e4e67d9877e85773fba9ba6419025cbeb4", + "sha256:9cf5292fcd0f598c671cfc1e0d7d1a7f13bb8085e9a590f48c010551dc6c4b31" + ], + "version": "==2.22.0" + }, "six": { "hashes": [ "sha256:3350809f0555b11f552448330d0b52d5f24c91a322ea4a15ef22629740f3761c", @@ -297,6 +352,13 @@ ], "version": "==0.10.0" }, + "urllib3": { + "hashes": [ + "sha256:b246607a25ac80bedac05c6f282e3cdaf3afb65420fd024ac94435cabe6e18d1", + "sha256:dbe59173209418ae49d485b87d1681aefa36252ee85884c31346debd19463232" + ], + "version": "==1.25.3" + }, "virtualenv": { "hashes": [ "sha256:b7335cddd9260a3dd214b73a2521ffc09647bde3e9457fcca31dc3be3999d04a", @@ -313,10 +375,10 @@ }, "zipp": { "hashes": [ - "sha256:8c1019c6aad13642199fbe458275ad6a84907634cc9f0989877ccc4a2840139d", - "sha256:ca943a7e809cc12257001ccfb99e3563da9af99d52f261725e96dfe0f9275bc3" + "sha256:4970c3758f4e89a7857a973b1e2a5d75bcdc47794442f2e2dd4fe8e0466e809a", + "sha256:8a5712cfd3bb4248015eb3b0b3c54a5f6ee3f2425963ef2a0125b8bc40aafaec" ], - "version": "==0.5.1" + "version": "==0.5.2" } } } diff --git a/README.md b/README.md index ee7ecb97..2f8c15dd 100644 --- a/README.md +++ b/README.md @@ -134,6 +134,17 @@ While this `docker-compose` stack is up, you can run the tests from the CLI via Alternatively, you can also run the entire test suite, without needing to setup the development environment, in docker compose via `docker-compose -f docker-compose.yml -f docker-compose.test.yml` +### Pre Commit Hooks + +To install pre commit hooks run: + +``` +pip install pre-commit +pre-commit install +pre-commit install-hooks +``` + + ## Alternatives - [LinkedIn KafkaTools](https://github.com/linkedin/kafka-tools) diff --git a/docker-compose.test.yml b/docker-compose.test.yml index b7e4631f..4d3ebc44 100644 --- a/docker-compose.test.yml +++ b/docker-compose.test.yml @@ -8,7 +8,8 @@ services: ESQUE_TEST_ENV: "ci" depends_on: - kafka + - schema_registry command: > -c "(until (kafkacat -b kafka:9093 -X debug=all -L); do sleep 5s; done) \ - && pytest tests/ --integration" \ No newline at end of file + && python3 -u -m pytest -v -x tests/ --integration" \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index dbafd86a..4d92cc6e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -37,5 +37,15 @@ services: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 - - + schema_registry: + image: confluentinc/cp-schema-registry:5.2.2 + container_name: schema_registry + environment: + - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181 + - SCHEMA_REGISTRY_HOST_NAME=schema-registry + - SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081 + - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=PLAINTEXT://kafka:9093 + depends_on: + - kafka + ports: + - 8081:8081 \ No newline at end of file diff --git a/esque/avromessage.py b/esque/avromessage.py new file mode 100644 index 00000000..fb9f9198 --- /dev/null +++ b/esque/avromessage.py @@ -0,0 +1,109 @@ +import json +import pathlib +import pickle +import struct +from io import BytesIO +from typing import Optional, Tuple, Dict, Iterable, NamedTuple, Any +import itertools as it + +import fastavro +from confluent_kafka.cimpl import Message +from confluent_kafka.avro import loads as load_schema + +from esque.message import FileWriter, FileReader, KafkaMessage +from esque.schemaregistry import SchemaRegistryClient + + +class DecodedAvroMessage(NamedTuple): + key: Any + value: Any + partition: int + key_schema_id: int + value_schema_id: int + + +class AvroFileWriter(FileWriter): + def __init__(self, directory: pathlib.Path, schema_registry_client: SchemaRegistryClient): + super().__init__(directory) + self.directory = directory + self.schema_registry_client = schema_registry_client + self.current_key_schema_id = None + self.current_value_schema_id = None + self.schema_dir_name = None + self.schema_version = it.count(1) + self.open_mode = "wb+" + + def write_message_to_file(self, message: Message): + key_schema_id, decoded_key = self.decode_bytes(message.key()) + value_schema_id, decoded_value = self.decode_bytes(message.value()) + decoded_message = DecodedAvroMessage( + decoded_key, decoded_value, message.partition(), key_schema_id, value_schema_id + ) + + if self.schema_changed(decoded_message) or self.schema_dir_name is None: + self.schema_dir_name = f"{next(self.schema_version):04}_{key_schema_id}_{value_schema_id}" + self.current_key_schema_id = key_schema_id + self.current_value_schema_id = value_schema_id + self._dump_schemata(key_schema_id, value_schema_id) + + serializable_message = { + "key": decoded_message.key, + "value": decoded_message.value, + "partition": decoded_message.partition, + "schema_directory_name": self.schema_dir_name, + } + pickle.dump(serializable_message, self.file) + + def _dump_schemata(self, key_schema_id, value_schema_id): + directory = self.directory / self.schema_dir_name + directory.mkdir() + (directory / "key_schema.avsc").write_text( + json.dumps(self.schema_registry_client.get_schema_from_id(key_schema_id).original_schema), encoding="utf-8" + ) + (directory / "value_schema.avsc").write_text( + json.dumps(self.schema_registry_client.get_schema_from_id(value_schema_id).original_schema), + encoding="utf-8", + ) + + def decode_bytes(self, raw_data: Optional[bytes]) -> Tuple[int, Optional[Dict]]: + if raw_data is None: + return -1, None + + with BytesIO(raw_data) as fake_stream: + schema_id = extract_schema_id(fake_stream.read(5)) + parsed_schema = self.schema_registry_client.get_schema_from_id(schema_id).parsed_schema + record = fastavro.schemaless_reader(fake_stream, parsed_schema) + return schema_id, record + + def schema_changed(self, decoded_message: DecodedAvroMessage) -> bool: + return ( + self.current_value_schema_id != decoded_message.value_schema_id and decoded_message.value is not None + ) or self.current_key_schema_id != decoded_message.key_schema_id + + +class AvroFileReader(FileReader): + def __init__(self, directory: pathlib.Path): + super().__init__(directory) + self.open_mode = "rb" + + def read_from_file(self) -> Iterable[KafkaMessage]: + while True: + try: + record = pickle.load(self.file) + except EOFError: + return + + schema_directory = self.directory / record["schema_directory_name"] + + key_schema = load_schema((schema_directory / "key_schema.avsc").read_text(encoding="utf-8")) + value_schema = load_schema((schema_directory / "value_schema.avsc").read_text(encoding="utf-8")) + + yield KafkaMessage( + json.dumps(record["key"]), json.dumps(record["value"]), record["partition"], key_schema, value_schema + ) + + +def extract_schema_id(message: bytes) -> int: + magic_byte, schema_id = struct.unpack(">bI", message[:5]) + assert magic_byte == 0, f"Wrong magic byte ({magic_byte}), no AVRO message." + return schema_id diff --git a/esque/cli/commands.py b/esque/cli/commands.py index c587c08a..6136e7ec 100644 --- a/esque/cli/commands.py +++ b/esque/cli/commands.py @@ -1,24 +1,22 @@ +import pathlib +import time +from pathlib import Path from time import sleep import click -from click import version_option - import yaml +from click import version_option from esque.__version__ import __version__ from esque.broker import Broker -from esque.cli.helpers import ensure_approval +from esque.cli.helpers import ensure_approval, HandleFileOnFinished from esque.cli.options import State, no_verify_option, pass_state -from esque.cli.output import bold, pretty, pretty_topic_diffs, get_output_new_topics -from esque.clients import Consumer, Producer +from esque.cli.output import bold, pretty, pretty_topic_diffs, get_output_new_topics, blue_bold, green_bold +from esque.clients import FileConsumer, FileProducer, AvroFileProducer, AvroFileConsumer, PingConsumer, PingProducer from esque.cluster import Cluster -from esque.config import PING_TOPIC, Config +from esque.config import PING_TOPIC, Config, PING_GROUP_ID from esque.consumergroup import ConsumerGroupController -from esque.errors import ( - ConsumerGroupDoesNotExistException, - ContextNotDefinedException, - TopicAlreadyExistsException, -) +from esque.errors import ConsumerGroupDoesNotExistException, ContextNotDefinedException, TopicAlreadyExistsException from esque.topic import TopicController @@ -56,19 +54,12 @@ def edit(): # TODO: Figure out how to pass the state object def list_topics(ctx, args, incomplete): cluster = Cluster() - return [ - topic["name"] - for topic in TopicController(cluster).list_topics(search_string=incomplete) - ] + return [topic["name"] for topic in TopicController(cluster).list_topics(search_string=incomplete)] def list_contexts(ctx, args, incomplete): config = Config() - return [ - context - for context in config.available_contexts - if context.startswith(incomplete) - ] + return [context for context in config.available_contexts if context.startswith(incomplete)] @edit.command("topic") @@ -109,9 +100,7 @@ def ctx(state, context): def create_topic(state: State, topic_name: str): if ensure_approval("Are you sure?", no_verify=state.no_verify): topic_controller = TopicController(state.cluster) - TopicController(state.cluster).create_topics( - [(topic_controller.get_topic(topic_name))] - ) + TopicController(state.cluster).create_topics([(topic_controller.get_topic(topic_name))]) @esque.command("apply", help="Apply a configuration") @@ -132,29 +121,16 @@ def apply(state: State, file: str): ) ) editable_topics = topic_controller.filter_existing_topics(topics) - topics_to_be_changed = [ - topic for topic in editable_topics if topic.config_diff() != {} - ] - topic_config_diffs = { - topic.name: topic.config_diff() for topic in topics_to_be_changed - } + topics_to_be_changed = [topic for topic in editable_topics if topic.config_diff() != {}] + topic_config_diffs = {topic.name: topic.config_diff() for topic in topics_to_be_changed} if len(topic_config_diffs) > 0: click.echo(pretty_topic_diffs(topic_config_diffs)) - if ensure_approval( - "Are you sure to change configs?", no_verify=state.no_verify - ): + if ensure_approval("Are you sure to change configs?", no_verify=state.no_verify): topic_controller.alter_configs(topics_to_be_changed) click.echo( click.style( - pretty( - { - "Successfully changed topics": [ - topic.name for topic in topics_to_be_changed - ] - } - ), - fg="green", + pretty({"Successfully changed topics": [topic.name for topic in topics_to_be_changed]}), fg="green" ) ) else: @@ -163,30 +139,17 @@ def apply(state: State, file: str): new_topics = [topic for topic in topics if topic not in editable_topics] if len(new_topics) > 0: click.echo(get_output_new_topics(new_topics)) - if ensure_approval( - "Are you sure to create the new topics?", no_verify=state.no_verify - ): + if ensure_approval("Are you sure to create the new topics?", no_verify=state.no_verify): topic_controller.create_topics(new_topics) click.echo( - click.style( - pretty( - { - "Successfully created topics": [ - topic.name for topic in new_topics - ] - } - ), - fg="green", - ) + click.style(pretty({"Successfully created topics": [topic.name for topic in new_topics]}), fg="green") ) else: click.echo("No new topics to create.") @delete.command("topic") -@click.argument( - "topic-name", required=True, type=click.STRING, autocompletion=list_topics -) +@click.argument("topic-name", required=True, type=click.STRING, autocompletion=list_topics) @no_verify_option @pass_state def delete_topic(state: State, topic_name: str): @@ -198,9 +161,7 @@ def delete_topic(state: State, topic_name: str): @describe.command("topic") -@click.argument( - "topic-name", required=True, type=click.STRING, autocompletion=list_topics -) +@click.argument("topic-name", required=True, type=click.STRING, autocompletion=list_topics) @pass_state def describe_topic(state, topic_name): partitions, config = TopicController(state.cluster).get_topic(topic_name).describe() @@ -214,17 +175,13 @@ def describe_topic(state, topic_name): @get.command("offsets") -@click.argument( - "topic-name", required=False, type=click.STRING, autocompletion=list_topics -) +@click.argument("topic-name", required=False, type=click.STRING, autocompletion=list_topics) @pass_state def get_offsets(state, topic_name): # TODO: Gathering of all offsets takes super long topics = TopicController(state.cluster).list_topics(search_string=topic_name) - offsets = { - topic.name: max([v for v in topic.get_offsets().values()]) for topic in topics - } + offsets = {topic.name: max([v for v in topic.get_offsets().values()]) for topic in topics} click.echo(pretty(offsets)) @@ -239,15 +196,11 @@ def describe_broker(state, broker_id): @describe.command("consumergroup") @click.argument("consumer-id", required=False) -@click.option( - "-v", "--verbose", help="More detailed information.", default=False, is_flag=True -) +@click.option("-v", "--verbose", help="More detailed information.", default=False, is_flag=True) @pass_state def describe_consumergroup(state, consumer_id, verbose): try: - consumer_group = ConsumerGroupController(state.cluster).get_consumergroup( - consumer_id - ) + consumer_group = ConsumerGroupController(state.cluster).get_consumergroup(consumer_id) consumer_group_desc = consumer_group.describe(verbose=verbose) click.echo(pretty(consumer_group_desc, break_lists=True)) @@ -274,12 +227,85 @@ def get_consumergroups(state): @get.command("topics") @click.argument("topic", required=False, type=click.STRING, autocompletion=list_topics) @pass_state -def get_topics(state, topic, o): +def get_topics(state, topic): topics = TopicController(state.cluster).list_topics(search_string=topic) for topic in topics: click.echo(topic.name) +@esque.command("transfer", help="Transfer messages of a topic from one environment to another.") +@click.argument("topic", required=True) +@click.option("-f", "--from", "from_context", help="Source Context", type=click.STRING, required=True) +@click.option("-t", "--to", "to_context", help="Destination context", type=click.STRING, required=True) +@click.option("-n", "--numbers", help="Number of messages", type=click.INT, required=True) +@click.option("--last/--first", help="Start consuming from the earliest or latest offset in the topic.", default=False) +@click.option("-a", "--avro", help="Set this flag if the topic contains avro data", default=False, is_flag=True) +@click.option( + "-k", + "--keep", + "keep_file", + help="Set this flag if the file with consumed messages should be kept.", + default=False, + is_flag=True, +) +@pass_state +def transfer( + state: State, topic: str, from_context: str, to_context: str, numbers: int, last: bool, avro: bool, keep_file: bool +): + current_timestamp_milliseconds = int(round(time.time() * 1000)) + unique_name = topic + "_" + str(current_timestamp_milliseconds) + group_id = "group_for_" + unique_name + directory_name = "message_" + unique_name + base_dir = Path(directory_name) + state.config.context_switch(from_context) + + with HandleFileOnFinished(base_dir, keep_file) as working_dir: + number_consumed_messages = _consume_to_file(working_dir, topic, group_id, from_context, numbers, avro, last) + + if number_consumed_messages == 0: + click.echo(click.style("Execution stopped, because no messages consumed.", fg="red")) + return + + click.echo("\nReady to produce to context " + blue_bold(to_context) + " and target topic " + blue_bold(topic)) + + if not ensure_approval("Do you want to proceed?\n", no_verify=state.no_verify): + return + + state.config.context_switch(to_context) + _produce_from_file(topic, to_context, working_dir, avro) + + +def _produce_from_file(topic: str, to_context: str, working_dir: pathlib.Path, avro: bool): + if avro: + producer = AvroFileProducer(working_dir) + else: + producer = FileProducer(working_dir) + click.echo("\nStart producing to topic " + blue_bold(topic) + " in target context " + blue_bold(to_context)) + number_produced_messages = producer.produce(topic) + click.echo( + green_bold(str(number_produced_messages)) + + " messages successfully produced to context " + + green_bold(to_context) + + " and topic " + + green_bold(topic) + + "." + ) + + +def _consume_to_file( + working_dir: pathlib.Path, topic: str, group_id: str, from_context: str, numbers: int, avro: bool, last: bool +) -> int: + if avro: + consumer = AvroFileConsumer(group_id, topic, working_dir, last) + else: + consumer = FileConsumer(group_id, topic, working_dir, last) + click.echo("\nStart consuming from topic " + blue_bold(topic) + " in source context " + blue_bold(from_context)) + number_consumed_messages = consumer.consume(int(numbers)) + click.echo(blue_bold(str(number_consumed_messages)) + " messages consumed.") + + return number_consumed_messages + + @esque.command("ping", help="Tests the connection to the kafka cluster.") @click.option("-t", "--times", help="Number of pings.", default=10) @click.option("-w", "--wait", help="Seconds to wait between pings.", default=1) @@ -293,14 +319,14 @@ def ping(state, times, wait): except TopicAlreadyExistsException: click.echo("Topic already exists.") - producer = Producer() - consumer = Consumer() + producer = PingProducer() + consumer = PingConsumer(PING_GROUP_ID, PING_TOPIC, True) click.echo(f"Ping with {state.cluster.bootstrap_servers}") for i in range(times): - producer.produce_ping() - _, delta = consumer.consume_ping() + producer.produce(PING_TOPIC) + _, delta = consumer.consume() deltas.append(delta) click.echo(f"m_seq={i} time={delta:.2f}ms") sleep(wait) @@ -310,6 +336,4 @@ def ping(state, times, wait): topic_controller.delete_topic(topic_controller.get_topic(PING_TOPIC)) click.echo("--- statistics ---") click.echo(f"{len(deltas)} messages sent/received") - click.echo( - f"min/avg/max = {min(deltas):.2f}/{(sum(deltas)/len(deltas)):.2f}/{max(deltas):.2f} ms" - ) + click.echo(f"min/avg/max = {min(deltas):.2f}/{(sum(deltas)/len(deltas)):.2f}/{max(deltas):.2f} ms") diff --git a/esque/cli/helpers.py b/esque/cli/helpers.py index d0ef2fd8..bb798c7a 100644 --- a/esque/cli/helpers.py +++ b/esque/cli/helpers.py @@ -1,3 +1,6 @@ +import pathlib +import shutil + import click @@ -5,3 +8,17 @@ def ensure_approval(question: str, *, no_verify: bool = False) -> bool: if no_verify: return True return click.confirm(question) + + +class HandleFileOnFinished: + def __init__(self, dir_: pathlib.Path, keep_file: bool): + self.keep_file = keep_file + self._dir = dir_ + self._dir.mkdir(parents=True) + + def __enter__(self) -> pathlib.Path: + return self._dir + + def __exit__(self, exc_type, exc_val, exc_tb): + if not self.keep_file and self._dir.exists(): + shutil.rmtree(self._dir) diff --git a/esque/cli/options.py b/esque/cli/options.py index d631c238..a2636901 100644 --- a/esque/cli/options.py +++ b/esque/cli/options.py @@ -34,9 +34,7 @@ def cluster(self): self._cluster = Cluster() return self._cluster except NoBrokersAvailableError: - raise ClickException( - f"Could not reach Kafka Brokers {self.config.bootstrap_servers}" - ) + raise ClickException(f"Could not reach Kafka Brokers {self.config.bootstrap_servers}") pass_state = make_pass_decorator(State, ensure=True) diff --git a/esque/cli/output.py b/esque/cli/output.py index 3e660bb8..2cfc0b4f 100644 --- a/esque/cli/output.py +++ b/esque/cli/output.py @@ -39,9 +39,7 @@ def pretty_list(l: List[Any], *, break_lists=False, list_separator: str = ", ") break_lists = True if break_lists: - sub_elements = ( - "\n ".join(elem.splitlines(keepends=False)) for elem in list_output - ) + sub_elements = ("\n ".join(elem.splitlines(keepends=False)) for elem in list_output) return "- " + "\n- ".join(sub_elements) else: return list_separator.join(list_output) @@ -124,21 +122,13 @@ def pretty_duration(value: Any, *, multiplier: int = 1) -> str: return pendulum.duration(milliseconds=value).in_words() -def pretty_topic_diffs( - topics_config_diff: Dict[str, Dict[str, Tuple[str, str]]] -) -> str: +def pretty_topic_diffs(topics_config_diff: Dict[str, Dict[str, Tuple[str, str]]]) -> str: output = [] for name, diff in topics_config_diff.items(): config_diff_attributes = {} for attribute, value in diff.items(): config_diff_attributes[attribute] = value[0] + " -> " + value[1] - output.append( - { - click.style(name, bold=True, fg="yellow"): { - "Config Diff": config_diff_attributes - } - } - ) + output.append({click.style(name, bold=True, fg="yellow"): {"Config Diff": config_diff_attributes}}) return pretty({"Topics to change": output}) @@ -151,9 +141,7 @@ def get_output_new_topics(new_topics: List[Topic]) -> str: "replication_factor: ": topic.replication_factor, "config": topic.config, } - new_topic_configs.append( - {click.style(topic.name, bold=True, fg="green"): new_topic_config} - ) + new_topic_configs.append({click.style(topic.name, bold=True, fg="green"): new_topic_config}) return pretty({"New topics to create": new_topic_configs}) @@ -180,7 +168,11 @@ def bold(s: str) -> str: def blue_bold(s: str) -> str: - return click.style(s, fg="blue", bold=True) + return bold(click.style(s, fg="blue")) + + +def green_bold(s: str) -> str: + return bold(click.style(s, fg="green")) STYLE_MAPPING = { diff --git a/esque/clients.py b/esque/clients.py index af69cdc2..a0295ace 100644 --- a/esque/clients.py +++ b/esque/clients.py @@ -1,23 +1,34 @@ +import json +import pathlib +from contextlib import ExitStack +from glob import glob from typing import Optional, Tuple import click import confluent_kafka import pendulum -from confluent_kafka import TopicPartition +from confluent_kafka import Message +from confluent_kafka.avro import AvroProducer -from esque.config import Config, PING_GROUP_ID, PING_TOPIC -from esque.errors import raise_for_kafka_error, raise_for_message +from esque.avromessage import AvroFileReader, AvroFileWriter +from esque.config import Config +from esque.errors import raise_for_kafka_error, raise_for_message, MessageEmptyException from esque.helpers import delivery_callback, delta_t +from esque.message import KafkaMessage, PlainTextFileReader, PlainTextFileWriter, FileReader, FileWriter +from esque.schemaregistry import SchemaRegistryClient +from abc import ABC, abstractmethod -DEFAULT_RETENTION_MS = 7 * 24 * 60 * 60 * 1000 +class AbstractConsumer(ABC): + def __init__(self, group_id: str, topic_name: str, last: bool): + offset_reset = "earliest" + if last: + offset_reset = "latest" -class Consumer: - def __init__(self): self._config = Config().create_confluent_config() self._config.update( { - "group.id": PING_GROUP_ID, + "group.id": group_id, "error_cb": raise_for_kafka_error, # We need to commit offsets manually once we"re sure it got saved # to the sink @@ -25,42 +36,152 @@ def __init__(self): "enable.partition.eof": False, # We need this to start at the last committed offset instead of the # latest when subscribing for the first time - "default.topic.config": {"auto.offset.reset": "latest"}, + "default.topic.config": {"auto.offset.reset": offset_reset}, } ) self._consumer = confluent_kafka.Consumer(self._config) - self._assign_exact_partitions(PING_TOPIC) + self._subscribe(topic_name) + + def _subscribe(self, topic: str) -> None: + self._consumer.subscribe([topic]) + + @abstractmethod + def consume(self, amount: int) -> int: + pass - def consume_ping(self) -> Optional[Tuple[str, int]]: - msg = self._consumer.consume(timeout=10)[0] + def _consume_single_message(self, timeout=10) -> Optional[Message]: + message = self._consumer.poll(timeout=timeout) + raise_for_message(message) + return message - raise_for_message(msg) - msg_sent_at = pendulum.from_timestamp(float(msg.value())) +class PingConsumer(AbstractConsumer): + def consume(self, amount: int) -> Optional[Tuple[str, int]]: + message = self._consume_single_message() + + msg_sent_at = pendulum.from_timestamp(float(message.value())) delta_sent = pendulum.now() - msg_sent_at - return msg.key(), delta_sent.microseconds / 1000 + return message.key(), delta_sent.microseconds / 1000 + + +class FileConsumer(AbstractConsumer): + def __init__(self, group_id: str, topic_name: str, working_dir: pathlib.Path, last: bool): + super().__init__(group_id, topic_name, last) + self.working_dir = working_dir + offset_reset = "earliest" + if last: + offset_reset = "latest" + + self._config.update({"default.topic.config": {"auto.offset.reset": offset_reset}}) + self._consumer = confluent_kafka.Consumer(self._config) + self._subscribe(topic_name) + + def consume(self, amount: int) -> int: + counter = 0 + file_writers = {} + with ExitStack() as stack: + while counter < amount: + try: + message = self._consume_single_message() + except MessageEmptyException: + return counter + + if message.partition() not in file_writers: + partition = message.partition() + file_writer = self.get_file_writer(partition) + stack.enter_context(file_writer) + file_writers[partition] = file_writer + + file_writer = file_writers[partition] + file_writer.write_message_to_file(message) + counter += 1 - def _assign_exact_partitions(self, topic: str) -> None: - self._consumer.assign([TopicPartition(topic=topic, partition=0, offset=0)]) + return counter + def get_file_writer(self, partition: int) -> FileWriter: + return PlainTextFileWriter((self.working_dir / f"partition_{partition}")) -class Producer: + +class AvroFileConsumer(FileConsumer): + def __init__(self, group_id: str, topic_name: str, working_dir: pathlib.Path, last: bool): + super().__init__(group_id, topic_name, working_dir, last) + self.schema_registry_client = SchemaRegistryClient(Config().schema_registry) + + def get_file_writer(self, partition: int) -> FileWriter: + return AvroFileWriter((self.working_dir / f"partition_{partition}"), self.schema_registry_client) + + +class Producer(ABC): def __init__(self): self._config = Config().create_confluent_config() - self._config.update( - {"on_delivery": delivery_callback, "error_cb": raise_for_kafka_error} - ) + self._config.update({"on_delivery": delivery_callback, "error_cb": raise_for_kafka_error}) + + @abstractmethod + def produce(self, topic_name: str) -> int: + pass + + +class PingProducer(Producer): + def __init__(self): + super().__init__() self._producer = confluent_kafka.Producer(self._config) - def produce_ping(self): + def produce(self, topic_name: str) -> int: start = pendulum.now() - self._producer.produce( - topic=PING_TOPIC, key=str(0), value=str(pendulum.now().timestamp()) - ) + self._producer.produce(topic=topic_name, key=str(0), value=str(pendulum.now().timestamp())) + while True: + left_messages = self._producer.flush(1) + if left_messages == 0: + break + click.echo(f"{delta_t(start)} | Still {left_messages} messages left, flushing...") + return 1 + + +class FileProducer(Producer): + def __init__(self, working_dir: pathlib.Path): + super().__init__() + self._producer = confluent_kafka.Producer(self._config) + self.working_dir = working_dir + + def produce(self, topic_name: str) -> int: + path_list = glob(str(self.working_dir / "partition_*")) + counter = 0 + for partition_path in path_list: + with self.get_file_reader(pathlib.Path(partition_path)) as file_reader: + for message in file_reader.read_from_file(): + self.produce_message(topic_name, message) + counter += 1 + while True: left_messages = self._producer.flush(1) if left_messages == 0: break - click.echo( - f"{delta_t(start)} | Still {left_messages} messages left, flushing..." - ) + click.echo(f"Still {left_messages} messages left, flushing...") + + return counter + + def get_file_reader(self, directory: pathlib.Path) -> FileReader: + return PlainTextFileReader(directory) + + def produce_message(self, topic_name: str, message: KafkaMessage): + self._producer.produce(topic=topic_name, key=message.key, value=message.value, partition=message.partition) + + +class AvroFileProducer(FileProducer): + def __init__(self, working_dir: pathlib.Path): + super().__init__(working_dir) + self._config.update({"schema.registry.url": Config().schema_registry}) + self._producer = AvroProducer(self._config) + + def get_file_reader(self, directory: pathlib.Path) -> FileReader: + return AvroFileReader(directory) + + def produce_message(self, topic_name: str, message: KafkaMessage): + self._producer.produce( + topic=topic_name, + key=json.loads(message.key), + value=json.loads(message.value), + key_schema=message.key_schema, + value_schema=message.value_schema, + partition=message.partition, + ) diff --git a/esque/cluster.py b/esque/cluster.py index 26a7ba10..22dc44d2 100644 --- a/esque/cluster.py +++ b/esque/cluster.py @@ -24,10 +24,7 @@ def bootstrap_servers(self): def brokers(self): metadata = self.confluent_client.list_topics(timeout=1) return sorted( - [ - {"id": broker.id, "host": broker.host, "port": broker.port} - for broker in metadata.brokers.values() - ], + [{"id": broker.id, "host": broker.host, "port": broker.port} for broker in metadata.brokers.values()], key=operator.itemgetter("id"), ) diff --git a/esque/config.py b/esque/config.py index a1e02a9c..8b09a77f 100644 --- a/esque/config.py +++ b/esque/config.py @@ -41,13 +41,7 @@ def __init__(self): @property def available_contexts(self): - return sorted( - [ - key.split(".")[1] - for key in self._cfg.keys() - if key.startswith("Context.") - ] - ) + return sorted([key.split(".")[1] for key in self._cfg.keys() if key.startswith("Context.")]) @property def current_context(self): @@ -60,8 +54,7 @@ def _current_section(self): @property def current_context_dict(self) -> Dict[str, Any]: return { - option: self._cfg.get(self._current_section, option) - for option in self._cfg.options(self._current_section) + option: self._cfg.get(self._current_section, option) for option in self._cfg.options(self._current_section) } @property @@ -82,23 +75,21 @@ def bootstrap_hosts(self) -> List[str]: config_dict = self.current_context_dict return config_dict["bootstrap_hosts"].split(",") + @property + def schema_registry(self) -> str: + config_dict = self.current_context_dict + return config_dict["schema_registry"] + @property def bootstrap_servers(self): if self.bootstrap_domain: - return [ - f"{host_name}.{self.bootstrap_domain}:{self.bootstrap_port}" - for host_name in self.bootstrap_hosts - ] - return [ - f"{host_name}:{self.bootstrap_port}" for host_name in self.bootstrap_hosts - ] + return [f"{host_name}.{self.bootstrap_domain}:{self.bootstrap_port}" for host_name in self.bootstrap_hosts] + return [f"{host_name}:{self.bootstrap_port}" for host_name in self.bootstrap_hosts] def context_switch(self, context: str): click.echo((f"Switched to context: {context}")) if context not in self.available_contexts: - raise ContextNotDefinedException( - f"{context} not defined in {config_path()}" - ) + raise ContextNotDefinedException(f"{context} not defined in {config_path()}") self._update_config("Context", "current", context) def _update_config(self, section: str, key: str, value: str): @@ -110,37 +101,19 @@ def create_pykafka_config(self) -> Dict[str, str]: return {"hosts": ",".join(self.bootstrap_servers)} def create_confluent_config( - self, - *, - debug: bool = False, - ssl: bool = False, - auth: Optional[Tuple[str, str]] = None, + self, *, debug: bool = False, ssl: bool = False, auth: Optional[Tuple[str, str]] = None ) -> Dict[str, str]: - base_config = { - "bootstrap.servers": ",".join(self.bootstrap_servers), - "security.protocol": "PLAINTEXT", - } + base_config = {"bootstrap.servers": ",".join(self.bootstrap_servers), "security.protocol": "PLAINTEXT"} config = base_config.copy() if debug: config.update({"debug": "all", "log_level": "2"}) if ssl: - config.update( - { - "ssl.ca.location": "/etc/ssl/certs/GlobalSign_Root_CA.pem", - "security.protocol": "SSL", - } - ) + config.update({"ssl.ca.location": "/etc/ssl/certs/GlobalSign_Root_CA.pem", "security.protocol": "SSL"}) if auth: user, pw = auth - config.update( - { - "sasl.mechanisms": "SCRAM-SHA-512", - "sasl.username": user, - "sasl.password": pw, - } - ) + config.update({"sasl.mechanisms": "SCRAM-SHA-512", "sasl.username": user, "sasl.password": pw}) config["security.protocol"] = "SASL_" + config["security.protocol"] return config diff --git a/esque/config/sample_config.cfg b/esque/config/sample_config.cfg index 4d134d11..d9244248 100644 --- a/esque/config/sample_config.cfg +++ b/esque/config/sample_config.cfg @@ -5,8 +5,10 @@ current = docker bootstrap_hosts = localhost bootstrap_port = 9092 security_protocol = LOCAL +schema_registry = http://localhost:8081 [Context.docker] bootstrap_hosts = kafka bootstrap_port = 9093 security_protocol = PLAINTEXT +schema_registry = http://schema_registry:8081 diff --git a/esque/consumergroup.py b/esque/consumergroup.py index 6831bb06..b8e363a6 100644 --- a/esque/consumergroup.py +++ b/esque/consumergroup.py @@ -24,8 +24,7 @@ def _pykafka_group_coordinator(self) -> pykafka.Broker: consumer_id = self.id.encode("UTF-8") if not self._pykafka_group_coordinator_instance: self._pykafka_group_coordinator_instance: pykafka.Broker = cast( - pykafka.Broker, - self.cluster.pykafka_client.cluster.get_group_coordinator(consumer_id), + pykafka.Broker, self.cluster.pykafka_client.cluster.get_group_coordinator(consumer_id) ) return self._pykafka_group_coordinator_instance @@ -40,10 +39,7 @@ def describe(self, *, verbose=False): topic_assignment = self._get_member_assignment(meta["members"]) consumer_offsets = self.get_consumer_offsets( - self._pykafka_group_coordinator, - consumer_id, - topic_assignment, - verbose=verbose, + self._pykafka_group_coordinator, consumer_id, topic_assignment, verbose=verbose ) return { @@ -54,9 +50,7 @@ def describe(self, *, verbose=False): } raise ConsumerGroupDoesNotExistException() - def get_consumer_offsets( - self, group_coordinator, consumer_id, topic_assignment, verbose - ): + def get_consumer_offsets(self, group_coordinator, consumer_id, topic_assignment, verbose): consumer_offsets = self._unpack_offset_response( group_coordinator.fetch_consumer_group_offsets(consumer_id, preqs=[]) ) @@ -69,8 +63,7 @@ def get_consumer_offsets( "consumer_offset": consumer_offset, "topic_low_watermark": topic_offsets[partition_id][0], "topic_high_watermark": topic_offsets[partition_id][1], - "consumer_lag": topic_offsets[partition_id][1] - - consumer_offset, + "consumer_lag": topic_offsets[partition_id][1] - consumer_offset, } return consumer_offsets for topic in consumer_offsets.keys(): @@ -84,10 +77,7 @@ def get_consumer_offsets( for partition_id, consumer_offset in consumer_offsets[topic].items(): current_offset = consumer_offset old_min, old_max = new_consumer_offsets["consumer_offset"] - new_consumer_offsets["consumer_offset"] = ( - min(old_min, current_offset), - max(old_max, current_offset), - ) + new_consumer_offsets["consumer_offset"] = (min(old_min, current_offset), max(old_max, current_offset)) old_min, old_max = new_consumer_offsets["topic_low_watermark"] new_consumer_offsets["topic_low_watermark"] = ( @@ -109,9 +99,7 @@ def get_consumer_offsets( return new_consumer_offsets - def _get_member_assignment( - self, member_assignment: Dict[str, Any] - ) -> List[PartitionOffsetFetchRequest]: + def _get_member_assignment(self, member_assignment: Dict[str, Any]) -> List[PartitionOffsetFetchRequest]: """ Creates a list of style [PartitionOffsetFetchRequest('topic', partition_id)] """ @@ -126,15 +114,12 @@ def _unpack_offset_response(self, resp: OffsetFetchResponseV1) -> Dict[str, Any] return { topic_name: { - partition_id: partition_data._asdict()["offset"] - for partition_id, partition_data in partitions.items() + partition_id: partition_data._asdict()["offset"] for partition_id, partition_data in partitions.items() } for topic_name, partitions in resp.topics.items() } - def _unpack_consumer_group_response( - self, resp: DescribeGroupResponse - ) -> Dict[str, Any]: + def _unpack_consumer_group_response(self, resp: DescribeGroupResponse) -> Dict[str, Any]: return { "group_id": resp.group_id, "protocol": resp.protocol, @@ -148,9 +133,7 @@ def _unpack_consumer_group_response( "client_host": member.client_host, "member_metadata": { # "version": member.member_metadata.version, - "subscription": [ - topic for topic in member.member_metadata.topic_names - ], + "subscription": [topic for topic in member.member_metadata.topic_names], # "client": member.member_metadata.user_data, }, "member_assignment": { @@ -174,13 +157,7 @@ def get_consumergroup(self, consumer_id) -> ConsumerGroup: return ConsumerGroup(consumer_id, self.cluster) def list_consumer_groups(self) -> List[str]: - brokers: Dict[ - int, pykafka.broker.Broker - ] = self.cluster.pykafka_client.cluster.brokers + brokers: Dict[int, pykafka.broker.Broker] = self.cluster.pykafka_client.cluster.brokers return list( - set( - group.decode("UTF-8") - for _, broker in brokers.items() - for group in broker.list_groups().groups - ) + set(group.decode("UTF-8") for _, broker in brokers.items() for group in broker.list_groups().groups) ) diff --git a/esque/errors.py b/esque/errors.py index 81a690f0..0d90b427 100644 --- a/esque/errors.py +++ b/esque/errors.py @@ -49,7 +49,8 @@ class ContextNotDefinedException(Exception): class MessageEmptyException(KafkaException): - pass + def __init__(self): + super().__init__(-185, None) class TopicAlreadyExistsException(KafkaException): @@ -68,10 +69,7 @@ class TopicDoesNotExistException(Exception): pass -ERROR_LOOKUP: Dict[int, Type[KafkaException]] = { - 36: TopicAlreadyExistsException, - -191: EndOfPartitionReachedException, -} +ERROR_LOOKUP: Dict[int, Type[KafkaException]] = {36: TopicAlreadyExistsException, -191: EndOfPartitionReachedException} # BROKER_NOT_AVAILABLE = 8 # CLUSTER_AUTHORIZATION_FAILED = 31 diff --git a/esque/message.py b/esque/message.py new file mode 100644 index 00000000..8d0503ab --- /dev/null +++ b/esque/message.py @@ -0,0 +1,83 @@ +import json +import pathlib +from typing import Iterable, NamedTuple, Any + +from confluent_kafka.cimpl import Message + + +class DecodedMessage(NamedTuple): + key: str + value: str + partition: int + + +class KafkaMessage(NamedTuple): + key: Any + value: Any + partition: int + key_schema: str = None + value_schema: str = None + + +class IOHandler: + def __init__(self, directory: pathlib.Path): + self.directory = directory + self.file_name = "data" + self.open_mode = "w+" + + def __enter__(self): + if not self.directory.exists(): + self.directory.mkdir() + self.file = (self.directory / self.file_name).open(self.open_mode) + + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.file.close() + + +class FileWriter(IOHandler): + def __init__(self, directory: pathlib.Path): + super().__init__(directory) + self.open_mode = "w+" + + def write_message_to_file(self, message: Message): + pass + + +class FileReader(IOHandler): + def __init__(self, directory: pathlib.Path): + super().__init__(directory) + self.open_mode = "r" + + def read_from_file(self) -> Iterable[KafkaMessage]: + pass + + +class PlainTextFileWriter(FileWriter): + def write_message_to_file(self, message: Message): + decoded_message = decode_message(message) + serializable_message = { + "key": decoded_message.key, + "value": decoded_message.value, + "partition": decoded_message.partition, + } + self.file.write(json.dumps(serializable_message) + "\n") + + +class PlainTextFileReader(FileReader): + def read_from_file(self) -> Iterable[KafkaMessage]: + for line in self.file: + try: + record = json.loads(line) + except EOFError: + return + + yield KafkaMessage(record["key"], record["value"], record["partition"]) + + +def decode_message(message: Message) -> DecodedMessage: + decoded_key = message.key().decode("utf-8") + decoded_value = message.value().decode("utf-8") + + return DecodedMessage(decoded_key, decoded_value, message.partition()) diff --git a/esque/schemaregistry.py b/esque/schemaregistry.py new file mode 100644 index 00000000..9f0f5933 --- /dev/null +++ b/esque/schemaregistry.py @@ -0,0 +1,22 @@ +import json +from collections import namedtuple +from functools import lru_cache +from typing import Dict + +import fastavro +import requests + +SchemaPair = namedtuple("SchemaPair", ["original_schema", "parsed_schema"]) + + +class SchemaRegistryClient: + def __init__(self, schema_registry_uri: str): + self.schema_registry_uri = schema_registry_uri + + @lru_cache(maxsize=100) + def get_schema_from_id(self, schema_id: int) -> SchemaPair: + url = f"{self.schema_registry_uri}/schemas/ids/{schema_id}" + response = requests.get(url) + response.raise_for_status() + schema: Dict = json.loads(response.json()["schema"]) + return SchemaPair(schema, fastavro.schema.parse_schema(schema)) diff --git a/esque/topic.py b/esque/topic.py index 9a31127a..0e245c42 100644 --- a/esque/topic.py +++ b/esque/topic.py @@ -8,11 +8,7 @@ from esque.cluster import Cluster from esque.errors import TopicDoesNotExistException, raise_for_kafka_exception -from esque.helpers import ( - ensure_kafka_futures_done, - invalidate_cache_after, - unpack_confluent_config, -) +from esque.helpers import ensure_kafka_futures_done, invalidate_cache_after, unpack_confluent_config class Topic: @@ -29,9 +25,7 @@ def __init__( self._pykafka_topic_instance = None self._confluent_topic_instance = None self.num_partitions = num_partitions if num_partitions is not None else 1 - self.replication_factor = ( - replication_factor if replication_factor is not None else 1 - ) + self.replication_factor = replication_factor if replication_factor is not None else 1 self.config = config if config is not None else {} def as_dict(self) -> Dict[str, Union[int, Dict[str, str]]]: @@ -52,9 +46,7 @@ def from_yaml(self, data) -> None: @property def _pykafka_topic(self) -> pykafka.Topic: if not self._pykafka_topic_instance: - self._pykafka_topic_instance = self.cluster.pykafka_client.cluster.topics[ - self.name - ] + self._pykafka_topic_instance = self.cluster.pykafka_client.cluster.topics[self.name] return self._pykafka_topic_instance @property @@ -141,9 +133,7 @@ def __init__(self, cluster: Cluster): self.cluster: Cluster = cluster @raise_for_kafka_exception - def list_topics( - self, *, search_string: str = None, sort=True, hide_internal=True - ) -> List[Topic]: + def list_topics(self, *, search_string: str = None, sort=True, hide_internal=True) -> List[Topic]: self.cluster.confluent_client.poll(timeout=1) topics = self.cluster.confluent_client.list_topics().topics topics = [self.get_topic(t.topic) for t in topics.values()] @@ -180,9 +170,7 @@ def create_topics(self, topics: List[Topic]): @invalidate_cache_after def alter_configs(self, topics: List[Topic]): for topic in topics: - config_resource = ConfigResource( - ConfigResource.Type.TOPIC, topic.name, topic.config - ) + config_resource = ConfigResource(ConfigResource.Type.TOPIC, topic.name, topic.config) future_list = self.cluster.confluent_client.alter_configs([config_resource]) ensure_kafka_futures_done(list(future_list.values())) @@ -199,6 +187,4 @@ def get_topic( replication_factor: int = None, config: Dict[str, str] = None, ) -> Topic: - return Topic( - topic_name, self.cluster, num_partitions, replication_factor, config - ) + return Topic(topic_name, self.cluster, num_partitions, replication_factor, config) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 00000000..3af9d6c6 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,4 @@ +[tool.black] +line-length = 119 +target_version = ['py36'] +include = '\.pyi?$' \ No newline at end of file diff --git a/scripts/wait-for-it.sh b/scripts/wait-for-it.sh index 6bf80c51..fcd761e8 100755 --- a/scripts/wait-for-it.sh +++ b/scripts/wait-for-it.sh @@ -137,7 +137,7 @@ if [[ "$WAITFORIT_HOST" == "" || "$WAITFORIT_PORT" == "" ]]; then usage fi -WAITFORIT_TIMEOUT=${WAITFORIT_TIMEOUT:-15} +WAITFORIT_TIMEOUT=${WAITFORIT_TIMEOUT:-60} WAITFORIT_STRICT=${WAITFORIT_STRICT:-0} WAITFORIT_CHILD=${WAITFORIT_CHILD:-0} WAITFORIT_QUIET=${WAITFORIT_QUIET:-0} diff --git a/setup.py b/setup.py index f4c92668..dc77aef9 100644 --- a/setup.py +++ b/setup.py @@ -31,6 +31,9 @@ "pykafka", "pendulum", "pyyaml", + "requests", + "fastavro>=0.22.3", + "avro-python3", ] @@ -58,10 +61,7 @@ def run(self): python_requires=">=3.6", setup_requires=[], install_requires=required, - extras_require={ - "test": ["pytest", "pytest-mock", "pytest-cov"], - "dev": ["black", "flake8"], - }, + extras_require={"test": ["pytest", "pytest-mock", "pytest-cov"], "dev": ["black", "flake8"]}, include_package_data=True, license="MIT", classifiers=[ diff --git a/tests/conftest.py b/tests/conftest.py index 613d052d..b054e191 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,12 +2,13 @@ from concurrent.futures import Future from pathlib import Path from string import ascii_letters +from typing import Iterable, Tuple, Callable import confluent_kafka import pytest from confluent_kafka.admin import AdminClient, NewTopic -from confluent_kafka.cimpl import TopicPartition -from pykafka import Producer +from confluent_kafka.avro import AvroProducer +from confluent_kafka.cimpl import TopicPartition, Producer from pykafka.exceptions import NoBrokersAvailableError from esque.cluster import Cluster @@ -18,12 +19,7 @@ def pytest_addoption(parser): - parser.addoption( - "--integration", - action="store_true", - default=False, - help="run integration tests", - ) + parser.addoption("--integration", action="store_true", default=False, help="run integration tests") parser.addoption( "--local", action="store_true", @@ -67,36 +63,60 @@ def topic_id(confluent_admin_client) -> str: @pytest.fixture() -def topic_object(cluster, topic): +def topic_object(cluster, topic: str): yield TopicController(cluster).get_topic(topic) @pytest.fixture() -def changed_topic_object(cluster, topic): +def changed_topic_object(cluster, topic: str): yield TopicController(cluster).get_topic(topic, 1, 3, {"cleanup.policy": "compact"}) @pytest.fixture() -def topic(confluent_admin_client: AdminClient, topic_id: str) -> str: - """ - Creates a kafka topic consisting of a random 5 character string. +def topic(topic_factory: Callable[[int, str], Tuple[str, int]]) -> Iterable[str]: + topic_id = "".join(random.choices(ascii_letters, k=5)) + for topic, _ in topic_factory(1, topic_id): + yield topic + + +@pytest.fixture() +def source_topic( + num_partitions: int, topic_factory: Callable[[int, str], Tuple[str, int]] +) -> Iterable[Tuple[str, int]]: + topic_id = "".join(random.choices(ascii_letters, k=5)) + yield from topic_factory(num_partitions, topic_id) - :return: Topic (str) - """ - future: Future = confluent_admin_client.create_topics( - [NewTopic(topic_id, num_partitions=1, replication_factor=1)] - )[topic_id] - while not future.done() or future.cancelled(): - if future.result(): - raise RuntimeError - confluent_admin_client.poll(timeout=1) - yield topic_id +@pytest.fixture() +def target_topic( + num_partitions: int, topic_factory: Callable[[int, str], Tuple[str, int]] +) -> Iterable[Tuple[str, int]]: + topic_id = "".join(random.choices(ascii_letters, k=5)) + yield from topic_factory(num_partitions, topic_id) + + +@pytest.fixture(params=[1, 10], ids=["num_partitions=1", "num_partitions=10"]) +def num_partitions(request) -> int: + return request.param + + +@pytest.fixture() +def topic_factory(confluent_admin_client: AdminClient) -> Callable[[int, str], Iterable[Tuple[str, int]]]: + def factory(partitions: int, topic_id: str) -> Iterable[Tuple[str, int]]: + future: Future = confluent_admin_client.create_topics( + [NewTopic(topic_id, num_partitions=partitions, replication_factor=1)] + )[topic_id] + while not future.done() or future.cancelled(): + if future.result(): + raise RuntimeError + confluent_admin_client.poll(timeout=1) + + yield (topic_id, partitions) - topics = confluent_admin_client.list_topics(timeout=5).topics.keys() - if topic_id in topics: confluent_admin_client.delete_topics([topic_id]).popitem() + return factory + @pytest.fixture() def confluent_admin_client(test_config: Config) -> AdminClient: @@ -106,13 +126,15 @@ def confluent_admin_client(test_config: Config) -> AdminClient: @pytest.fixture() -def producer(topic_object: Topic): - # Send messages synchronously so we can be sure offset has been commited in tests. - yield Producer( - topic_object.cluster.pykafka_client.cluster, - topic_object._pykafka_topic, - sync=True, - ) +def producer(test_config: Config): + yield Producer(test_config.create_confluent_config()) + + +@pytest.fixture() +def avro_producer(test_config: Config): + producer_config = test_config.create_confluent_config() + producer_config.update({"schema.registry.url": Config().schema_registry}) + yield AvroProducer(producer_config) @pytest.fixture() @@ -121,9 +143,7 @@ def consumergroup_controller(cluster: Cluster): @pytest.fixture() -def consumergroup_instance( - partly_read_consumer_group: str, consumergroup_controller: ConsumerGroupController -): +def consumergroup_instance(partly_read_consumer_group: str, consumergroup_controller: ConsumerGroupController): yield consumergroup_controller.get_consumergroup(partly_read_consumer_group) @@ -156,14 +176,14 @@ def consumer(topic_object: Topic, consumer_group): @pytest.fixture() def filled_topic(producer, topic_object): for _ in range(10): - producer.produce("".join(random.choices(ascii_letters, k=5)).encode("utf-8")) + random_value = "".join(random.choices(ascii_letters, k=5)).encode("utf-8") + producer.produce(topic=topic_object.name, key=random_value, value=random_value) + producer.flush() yield topic_object @pytest.fixture() -def partly_read_consumer_group( - consumer: confluent_kafka.Consumer, filled_topic, consumer_group -): +def partly_read_consumer_group(consumer: confluent_kafka.Consumer, filled_topic, consumer_group): for i in range(5): msg = consumer.consume(timeout=10)[0] consumer.commit(msg, asynchronous=False) diff --git a/tests/integration/test_clients.py b/tests/integration/test_clients.py new file mode 100644 index 00000000..c9af4537 --- /dev/null +++ b/tests/integration/test_clients.py @@ -0,0 +1,159 @@ +import json +import pathlib +import random +from contextlib import ExitStack +from glob import glob +from typing import Iterable, List, Tuple +from string import ascii_letters + +import pytest +from confluent_kafka.cimpl import Producer as ConfluenceProducer + +from esque.avromessage import AvroFileReader +from esque.clients import FileConsumer, AvroFileConsumer, FileProducer, AvroFileProducer +from esque.message import PlainTextFileReader, KafkaMessage +from confluent_kafka.avro import loads as load_schema, AvroProducer + + +@pytest.mark.integration +def test_plain_text_consume_to_file( + consumer_group, producer: ConfluenceProducer, source_topic: Tuple[str, int], tmpdir_factory +): + source_topic_id, _ = source_topic + working_dir = tmpdir_factory.mktemp("working_directory") + produced_messages = produce_test_messages(producer, source_topic) + file_consumer = FileConsumer(consumer_group, source_topic_id, working_dir, False) + number_of_consumer_messages = file_consumer.consume(10) + + consumed_messages = get_consumed_messages(working_dir, False) + + assert number_of_consumer_messages == 10 + assert produced_messages == consumed_messages + + +@pytest.mark.integration +def test_avro_consume_to_file( + consumer_group, avro_producer: AvroProducer, source_topic: Tuple[str, int], tmpdir_factory +): + source_topic_id, _ = source_topic + working_dir = tmpdir_factory.mktemp("working_directory") + produced_messages = produce_test_messages_with_avro(avro_producer, source_topic) + file_consumer = AvroFileConsumer(consumer_group, source_topic_id, working_dir, False) + number_of_consumer_messages = file_consumer.consume(10) + + consumed_messages = get_consumed_messages(working_dir, True) + + assert number_of_consumer_messages == 10 + assert produced_messages == consumed_messages + + +@pytest.mark.integration +def test_plain_text_consume_and_produce( + consumer_group, + producer: ConfluenceProducer, + source_topic: Tuple[str, int], + target_topic: Tuple[str, int], + tmpdir_factory, +): + source_topic_id, _ = source_topic + target_topic_id, _ = target_topic + working_dir = tmpdir_factory.mktemp("working_directory") + produced_messages = produce_test_messages(producer, source_topic) + file_consumer = FileConsumer(consumer_group, source_topic_id, working_dir, False) + file_consumer.consume(10) + + producer = FileProducer(working_dir) + producer.produce(target_topic_id) + + # Check assertions: + assertion_check_directory = tmpdir_factory.mktemp("assertion_check_directory") + file_consumer = FileConsumer( + (consumer_group + "assertion_check"), target_topic_id, assertion_check_directory, False + ) + file_consumer.consume(10) + + consumed_messages = get_consumed_messages(assertion_check_directory, False) + + assert produced_messages == consumed_messages + + +@pytest.mark.integration +def test_avro_consume_and_produce( + consumer_group, + avro_producer: AvroProducer, + source_topic: Tuple[str, int], + target_topic: Tuple[str, int], + tmpdir_factory, +): + source_topic_id, _ = source_topic + target_topic_id, _ = target_topic + working_dir = tmpdir_factory.mktemp("working_directory") + produced_messages = produce_test_messages_with_avro(avro_producer, source_topic) + file_consumer = AvroFileConsumer(consumer_group, source_topic_id, working_dir, False) + file_consumer.consume(10) + + producer = AvroFileProducer(working_dir) + producer.produce(target_topic_id) + + # Check assertions: + assertion_check_directory = tmpdir_factory.mktemp("assertion_check_directory") + file_consumer = AvroFileConsumer( + (consumer_group + "assertion_check"), target_topic_id, assertion_check_directory, False + ) + file_consumer.consume(10) + + consumed_messages = get_consumed_messages(assertion_check_directory, True) + + assert produced_messages == consumed_messages + + +def produce_test_messages(producer: ConfluenceProducer, topic: Tuple[str, int]) -> Iterable[KafkaMessage]: + topic_name, num_partitions = topic + messages = [] + for i in range(10): + partition = random.randrange(0, num_partitions) + random_value = "".join(random.choices(ascii_letters, k=5)) + message = KafkaMessage(str(i), random_value, partition) + messages.append(message) + producer.produce(topic=topic_name, key=message.key, value=message.value, partition=message.partition) + producer.flush() + return messages + + +def produce_test_messages_with_avro(avro_producer: AvroProducer, topic: Tuple[str, int]) -> Iterable[KafkaMessage]: + topic_name, num_partitions = topic + with open("tests/test_samples/key_schema.avsc", "r") as file: + key_schema = load_schema(file.read()) + with open("tests/test_samples/value_schema.avsc", "r") as file: + value_schema = load_schema(file.read()) + messages = [] + for i in range(10): + partition = random.randrange(0, num_partitions) + key = {"id": str(i)} + value = {"first": "Firstname", "last": "Lastname"} + messages.append(KafkaMessage(json.dumps(key), json.dumps(value), partition, key_schema, value_schema)) + avro_producer.produce( + topic=topic_name, + key=key, + value=value, + key_schema=key_schema, + value_schema=value_schema, + partition=partition, + ) + avro_producer.flush() + return messages + + +def get_consumed_messages(directory, avro: bool) -> List[KafkaMessage]: + consumed_messages = [] + path_list = glob(str(directory / "partition_*")) + with ExitStack() as stack: + for partition_path in path_list: + if avro: + file_reader = AvroFileReader(pathlib.Path(partition_path)) + else: + file_reader = PlainTextFileReader(pathlib.Path(partition_path)) + stack.enter_context(file_reader) + for message in file_reader.read_from_file(): + consumed_messages.append(message) + return sorted(consumed_messages, key=(lambda msg: msg.key)) diff --git a/tests/integration/test_consumergroup_controller.py b/tests/integration/test_consumergroup_controller.py index 3729b5d6..26536db1 100644 --- a/tests/integration/test_consumergroup_controller.py +++ b/tests/integration/test_consumergroup_controller.py @@ -4,16 +4,12 @@ @pytest.mark.integration -def test_get_consumer_group( - partly_read_consumer_group: str, consumergroup_controller: ConsumerGroupController -): +def test_get_consumer_group(partly_read_consumer_group: str, consumergroup_controller: ConsumerGroupController): instance = consumergroup_controller.get_consumergroup(partly_read_consumer_group) assert isinstance(instance, ConsumerGroup) @pytest.mark.integration -def test_list_consumer_groups( - partly_read_consumer_group: str, consumergroup_controller: ConsumerGroupController -): +def test_list_consumer_groups(partly_read_consumer_group: str, consumergroup_controller: ConsumerGroupController): groups = consumergroup_controller.list_consumer_groups() assert partly_read_consumer_group in groups diff --git a/tests/integration/test_topic_controller.py b/tests/integration/test_topic_controller.py index 46f2cf76..3030172e 100644 --- a/tests/integration/test_topic_controller.py +++ b/tests/integration/test_topic_controller.py @@ -11,15 +11,11 @@ def topic_controller(cluster): @pytest.mark.integration def test_topic_creation_works( - topic_controller: TopicController, - confluent_admin_client: confluent_kafka.admin.AdminClient, - topic_id: str, + topic_controller: TopicController, confluent_admin_client: confluent_kafka.admin.AdminClient, topic_id: str ): topics = confluent_admin_client.list_topics(timeout=5).topics.keys() assert topic_id not in topics - topic_controller.create_topics( - [topic_controller.get_topic(topic_id, replication_factor=1)] - ) + topic_controller.create_topics([topic_controller.get_topic(topic_id, replication_factor=1)]) # invalidate cache confluent_admin_client.poll(timeout=1) topics = confluent_admin_client.list_topics(timeout=5).topics.keys() @@ -27,27 +23,21 @@ def test_topic_creation_works( @pytest.mark.integration -def test_alter_topic_config_works(topic_controller: TopicController, topic_id: str): - initial_topic = topic_controller.get_topic( - topic_id, config={"cleanup.policy": "delete"} - ) +def test_alter_topic_config_works(topic_controller: TopicController, topic: str): + initial_topic = topic_controller.get_topic(topic, config={"cleanup.policy": "delete"}) topic_controller.create_topics([initial_topic]) replicas, config = initial_topic.describe() assert config.get("Config").get("cleanup.policy") == "delete" - change_topic = topic_controller.get_topic( - topic_id, config={"cleanup.policy": "compact"} - ) + change_topic = topic_controller.get_topic(topic, config={"cleanup.policy": "compact"}) topic_controller.alter_configs([change_topic]) - after_changes_applied_topic = topic_controller.get_topic(topic_id) + after_changes_applied_topic = topic_controller.get_topic(topic) replicas, final_config = after_changes_applied_topic.describe() assert final_config.get("Config").get("cleanup.policy") == "compact" @pytest.mark.integration def test_topic_deletion_works( - topic_controller: TopicController, - confluent_admin_client: confluent_kafka.admin.AdminClient, - topic: str, + topic_controller: TopicController, confluent_admin_client: confluent_kafka.admin.AdminClient, topic: str ): topics = confluent_admin_client.list_topics(timeout=5).topics.keys() assert topic in topics diff --git a/tests/test_samples/key_schema.avsc b/tests/test_samples/key_schema.avsc new file mode 100644 index 00000000..ea5741f1 --- /dev/null +++ b/tests/test_samples/key_schema.avsc @@ -0,0 +1,8 @@ +{ + "type": "record", + "namespace": "com.example", + "name": "Identifier", + "fields": [ + { "name": "id", "type": "string" } + ] +} \ No newline at end of file diff --git a/tests/test_samples/value_schema.avsc b/tests/test_samples/value_schema.avsc new file mode 100644 index 00000000..16567e60 --- /dev/null +++ b/tests/test_samples/value_schema.avsc @@ -0,0 +1,9 @@ +{ + "type": "record", + "namespace": "com.example", + "name": "FullName", + "fields": [ + { "name": "first", "type": "string" }, + { "name": "last", "type": "string" } + ] +} \ No newline at end of file diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 3a8b82f0..41e94e4d 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -55,11 +55,7 @@ def test_current_context(config: Config): def test_current_context_dict(config: Config): - expected = { - "bootstrap_hosts": "localhost", - "bootstrap_port": "9091", - "security_protocol": "PLAINTEXT", - } + expected = {"bootstrap_hosts": "localhost", "bootstrap_port": "9091", "security_protocol": "PLAINTEXT"} assert config.current_context_dict == expected