Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transfer command #30

Merged
merged 101 commits into from
Jul 22, 2019
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
101 commits
Select commit Hold shift + click to select a range
8449c22
Add consumer and producer method to write to and read from file
Bibob7 Jul 3, 2019
76a73e3
Merge remote-tracking branch 'origin/master' into transfer-command
Bibob7 Jul 4, 2019
d835438
Add schema registry to config
Bibob7 Jul 5, 2019
05d919e
Merge remote-tracking branch 'origin/master' into transfer-command
Bibob7 Jul 7, 2019
064876f
Add serializer for avro messages
Bibob7 Jul 7, 2019
df51bf1
Set Serializer as super class for all serializer
Bibob7 Jul 7, 2019
801072d
Refactor Producer and Consumer for PlainText and Avro
Bibob7 Jul 8, 2019
d2a936e
Fix coding styles
Bibob7 Jul 8, 2019
7005747
Remove unused imports
Bibob7 Jul 8, 2019
f433564
Add create methods for Producer and Consumer
Bibob7 Jul 8, 2019
4b65ca6
Do not use test topic anymore
Bibob7 Jul 8, 2019
edfa860
Revert config change for testing
Bibob7 Jul 8, 2019
c209d12
Extract methods in commands
Bibob7 Jul 8, 2019
3751aff
Remove unused method
Bibob7 Jul 8, 2019
fafe570
Better process handling of edge cases
Bibob7 Jul 8, 2019
49134b3
Refactor write_to_file method
Bibob7 Jul 8, 2019
14dba5c
Add more output and fix bugs
Bibob7 Jul 8, 2019
8e1777b
Fix coding styles
Bibob7 Jul 9, 2019
b08b0a3
Remove unused method in SchemaRegistryClient
Bibob7 Jul 10, 2019
2b04309
Adjust max line length to 119
Bibob7 Jul 10, 2019
80fa96b
Add flag to keep message file
Bibob7 Jul 10, 2019
1f5578e
Remove factory methods
Bibob7 Jul 10, 2019
cef7be6
Super producer and consumer implement consume and produce
Bibob7 Jul 10, 2019
b1f82bd
Merge remote-tracking branch 'origin/master' into transfer-command
Bibob7 Jul 12, 2019
0af62e1
Update version
Bibob7 Jul 12, 2019
e632360
Update Pipfile.lock
Bibob7 Jul 12, 2019
2fa4884
Update pip files
Bibob7 Jul 12, 2019
87f3788
Add avro-python3
Bibob7 Jul 12, 2019
e4002e9
Subscribe to topic instead of assign to partition 0
Bibob7 Jul 12, 2019
cb022ff
Change to list of topics
Bibob7 Jul 12, 2019
f7c08d4
Make FileWriter and Reader handleable for with
Bibob7 Jul 12, 2019
040ce59
Make FileWriter and FileReader usable with "with".
Bibob7 Jul 13, 2019
a605ad2
Use abc library
Bibob7 Jul 13, 2019
58e5c95
Update travis.yml and setup.py
Bibob7 Jul 13, 2019
9adc0a1
Remove assertions for methods
Bibob7 Jul 13, 2019
2bac25f
Update fixture file_consumer
Bibob7 Jul 13, 2019
87e15da
Update tests
Bibob7 Jul 13, 2019
c55c0f2
Update tests for consume and produce to and from file
Bibob7 Jul 13, 2019
1bc83e7
Fix styling
Bibob7 Jul 13, 2019
583b369
Check written files in tests and bug fixes
Bibob7 Jul 14, 2019
f14e1ec
Fix schema_registry uri
Bibob7 Jul 14, 2019
2070333
Add schema registry to travis
Bibob7 Jul 14, 2019
ee03928
Use confluent package with schema registry for travis
Bibob7 Jul 14, 2019
f22a585
Fix travis.yml
Bibob7 Jul 14, 2019
accb8ce
Fix travis.yml
Bibob7 Jul 14, 2019
dbf6698
Fix travis.yml
Bibob7 Jul 14, 2019
739dc3a
Use newer version of confluent package
Bibob7 Jul 14, 2019
849b6f3
Use higher time out for kafka broker start
Bibob7 Jul 14, 2019
16f031e
Fix ci
Bibob7 Jul 14, 2019
9ce200a
Fix ci
Bibob7 Jul 14, 2019
6b7412f
Fix ci
Bibob7 Jul 14, 2019
5daee5a
Fix ci
Bibob7 Jul 14, 2019
523701f
Update Pipfiles
Bibob7 Jul 14, 2019
b620dd0
Fix docker-compose.test.yml
Bibob7 Jul 14, 2019
85052e8
Add tests and update ci config
Bibob7 Jul 14, 2019
fa310b9
Try alternative ci config
Bibob7 Jul 14, 2019
b9f5931
Revert "Try alternative ci config"
Bibob7 Jul 14, 2019
7d01ea8
Try alternative ci config
Bibob7 Jul 14, 2019
8b9de18
Increase waiting time for ci environment startup
Bibob7 Jul 15, 2019
f146ee7
Update schema registry config for ci
Bibob7 Jul 15, 2019
782c7e1
Revert "Update schema registry config for ci"
Bibob7 Jul 15, 2019
a5a3c07
Update tests to check content of messages
Bibob7 Jul 15, 2019
1d7912f
Merge methods
Bibob7 Jul 15, 2019
ad54d1f
Update extract_schema_id
Bibob7 Jul 15, 2019
deff96c
Use named tuples
Bibob7 Jul 15, 2019
0653c85
Content of DecodedAvroMessage could be anything
Bibob7 Jul 15, 2019
c02b3f2
Update travis ci config
Bibob7 Jul 15, 2019
4974655
Use .avsc ending for avro files
Bibob7 Jul 15, 2019
51f7935
Let KafkaMessage do not rely on str key and value
Bibob7 Jul 15, 2019
95d9994
Update test to check consumed and produced content
Bibob7 Jul 15, 2019
adaa8ae
Update travis.yml
Bibob7 Jul 15, 2019
146bdd6
Update travis.yml
Bibob7 Jul 15, 2019
a33b18b
Fix coding style
Bibob7 Jul 15, 2019
ce7acc6
Optimize tests
Bibob7 Jul 15, 2019
4697e50
Enable partition wise storing of messages
Bibob7 Jul 16, 2019
d03e98f
Add tests for topics with multiple partitions
Bibob7 Jul 16, 2019
081281f
Update topic fixtures
Bibob7 Jul 16, 2019
dbce4d3
Update conftest.py
Bibob7 Jul 16, 2019
7e37a28
Randomly produce messages in partitions
Bibob7 Jul 16, 2019
37bc8c7
Fix tests
Bibob7 Jul 16, 2019
ffdcc18
Remove useless assertion
Bibob7 Jul 16, 2019
905da82
Fix tests
Bibob7 Jul 16, 2019
74fd5a8
Fix tests
Bibob7 Jul 16, 2019
4891849
Fix tests
Bibob7 Jul 16, 2019
60f47d9
Update repo config and fix test
Bibob7 Jul 16, 2019
a9fa37b
Use topic_id fixture
Bibob7 Jul 17, 2019
e334200
Remove unnecessary usage of keys()
Bibob7 Jul 19, 2019
2f7d8d3
Refactor clients.py
Bibob7 Jul 19, 2019
426888f
Remove line-length in travis.yml for black
Bibob7 Jul 19, 2019
4ebcfe8
Fix Readme
Bibob7 Jul 19, 2019
2e5adf1
Simplify consume_single_message
Bibob7 Jul 19, 2019
e113441
Simplify IoHandler
Bibob7 Jul 19, 2019
ee4fb43
Remove unnecessary format
Bibob7 Jul 19, 2019
97fa771
Remove unused import
Bibob7 Jul 19, 2019
35f6c28
Merge remote-tracking branch 'origin/master' into transfer-command
Bibob7 Jul 19, 2019
2b857aa
Add help description for --last/--first
Bibob7 Jul 19, 2019
a3dfbb9
Re-add retry mechanism for consume_single_message
Bibob7 Jul 22, 2019
1bd1c77
Catch MessageEmptyException and return counter for Consumer
Bibob7 Jul 22, 2019
87f1fb6
Add more detailed description if no messages were consumed.
Bibob7 Jul 22, 2019
51ec6a7
Add queue limit and flush before it is reached
Bibob7 Jul 22, 2019
d270ad1
Reduce internal queue limit
Bibob7 Jul 22, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/sample_config.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ current = docker
bootstrap_hosts = localhost
bootstrap_port = 9092
security_protocol = LOCAL
schema_registry = localhost:8081

[Context.docker]
bootstrap_hosts = kafka
bootstrap_port = 9093
security_protocol = PLAINTEXT
schema_registry = schema-registry:8081
40 changes: 40 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,44 @@ services:
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0


schema_registry_source:
image: confluentinc/cp-schema-registry:5.2.2
container_name: schema_registry_source
environment:
- SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
- SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081
- SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=PLAINTEXT://kafka:9092
links:
- zookeeper
- kafka
ports:
- 8081:8081

schema_registry_target:
image: confluentinc/cp-schema-registry:5.2.2
container_name: schema_registry_target
environment:
- SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
- SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8082
- SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=PLAINTEXT://kafka:9092
links:
- zookeeper
- kafka
ports:
- 8082:8081

esque:
build: .
volumes:
- .:/esque
environment:
ESQUE_TEST_ENV: "ci"
depends_on:
- kafka
command: >
-c
"(until (kafkacat -b kafka:9093 -X debug=all -L); do sleep 5s; done) \
&& pytest tests/ --integration"

124 changes: 124 additions & 0 deletions esque/avromessage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import json
import pathlib
import pickle
import struct
from io import BytesIO
from typing import Optional, Tuple, Dict, BinaryIO, Iterable
import itertools as it

import fastavro
from confluent_kafka.cimpl import Message
from confluent_kafka.avro import loads as load_schema

from esque.message import FileWriter, FileReader, KafkaMessage
from esque.schemaregistry import SchemaRegistryClient


class DecodedAvroMessage:
swenzel marked this conversation as resolved.
Show resolved Hide resolved
def __init__(
self,
key: Optional[Dict],
value: Optional[Dict],
key_schema_id: int,
value_schema_id: int,
):
self.key = key
self.value = value
self.key_schema_id = key_schema_id
self.value_schema_id = value_schema_id


class AvroFileWriter(FileWriter):
def __init__(
self, working_dir: pathlib.Path, schema_registry_client: SchemaRegistryClient
):
self.working_dir = working_dir
self.schema_registry_client = schema_registry_client
self.current_key_schema_id = None
self.current_value_schema_id = None
self.schema_dir_name = None
self.schema_version = it.count(1)

def write_message_to_file(self, message: Message, file: BinaryIO):
key_schema_id, decoded_key = self.decode_bytes(message.key())
value_schema_id, decoded_value = self.decode_bytes(message.value())
decoded_message = DecodedAvroMessage(
decoded_key, decoded_value, key_schema_id, value_schema_id
)

if self.schema_changed(decoded_message) or self.schema_dir_name is None:
self.schema_dir_name = (
f"{next(self.schema_version):04}_{key_schema_id}_{value_schema_id}"
)
self.current_key_schema_id = key_schema_id
self.current_value_schema_id = value_schema_id
self._dump_schemata(key_schema_id, value_schema_id)

serializable_message = {
"key": decoded_key,
"value": decoded_value,
"schema_directory_name": self.schema_dir_name,
}
pickle.dump(serializable_message, file)

def _dump_schemata(self, key_schema_id, value_schema_id):
directory = self.working_dir / self.schema_dir_name
directory.mkdir()
(directory / "key_schema.avsc").write_text(
json.dumps(
self.schema_registry_client.get_schema_from_id(
key_schema_id
).original_schema
)
)
(directory / "value_schema.avsc").write_text(
json.dumps(
self.schema_registry_client.get_schema_from_id(
value_schema_id
).original_schema
)
)

def decode_bytes(self, raw_data: Optional[bytes]) -> Tuple[int, Optional[Dict]]:
if raw_data is None:
return -1, None

with BytesIO(raw_data) as fake_stream:
schema_id = extract_schema_id(fake_stream.read(5))
parsed_schema = self.schema_registry_client.get_schema_from_id(
schema_id
).parsed_schema
record = fastavro.schemaless_reader(fake_stream, parsed_schema)
return schema_id, record

def schema_changed(self, decoded_message: DecodedAvroMessage) -> bool:
return (
self.current_value_schema_id != decoded_message.value_schema_id
and decoded_message.value is not None
) or self.current_key_schema_id != decoded_message.key_schema_id


class AvroFileReader(FileReader):
def __init__(self, working_dir: pathlib.Path):
self.working_dir = working_dir

def read_from_file(self, file: BinaryIO) -> Iterable[KafkaMessage]:
while True:
try:
record = pickle.load(file)
except EOFError:
return

schema_directory = self.working_dir / record["schema_directory_name"]

key_schema = load_schema((schema_directory / "key_schema.avsc").read_text())
value_schema = load_schema(
(schema_directory / "value_schema.avsc").read_text()
)

yield KafkaMessage(record["key"], record["value"], key_schema, value_schema)


def extract_schema_id(message: bytes) -> int:
_, schema_id = struct.unpack(">bI", message[:5])
swenzel marked this conversation as resolved.
Show resolved Hide resolved
return schema_id
158 changes: 149 additions & 9 deletions esque/cli/commands.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,34 @@
import pathlib
import time
from pathlib import Path
from time import sleep

import click
from click import version_option

import yaml
from click import version_option

from esque.__version__ import __version__
from esque.broker import Broker
from esque.cli.helpers import ensure_approval
from esque.cli.helpers import ensure_approval, DeleteOnFinished
from esque.cli.options import State, no_verify_option, pass_state
from esque.cli.output import bold, pretty, pretty_topic_diffs, get_output_new_topics
from esque.clients import Consumer, Producer
from esque.cli.output import (
bold,
pretty,
pretty_topic_diffs,
get_output_new_topics,
blue_bold,
green_bold,
)
from esque.clients import (
FileConsumer,
FileProducer,
AvroFileProducer,
AvroFileConsumer,
PingConsumer,
PingProducer,
)
from esque.cluster import Cluster
from esque.config import PING_TOPIC, Config
from esque.config import PING_TOPIC, Config, PING_GROUP_ID
from esque.consumergroup import ConsumerGroupController
from esque.errors import (
ConsumerGroupDoesNotExistException,
Expand Down Expand Up @@ -280,6 +296,130 @@ def get_topics(state, topic, o):
click.echo(topic.name)


@esque.command(
"transfer", help="Transfer messages of a topic from one environment to another."
)
@click.argument("topic", required=True)
@click.option(
"-f",
"--from",
"from_context",
help="Source Context",
type=click.STRING,
required=True,
)
@click.option(
"-t",
"--to",
"to_context",
help="Destination context",
type=click.STRING,
required=True,
)
@click.option(
"-n", "--numbers", help="Number of messages", type=click.INT, required=True
)
@click.option("--last/--first", default=False)
Bibob7 marked this conversation as resolved.
Show resolved Hide resolved
@click.option(
"-a",
"--avro",
help="Set this flag if the topic contains avro data",
default=False,
is_flag=True,
)
@pass_state
def transfer(
state: State,
topic: str,
from_context: str,
to_context: str,
numbers: int,
last: bool,
avro: bool,
):
current_timestamp_milliseconds = int(round(time.time() * 1000))
temp_name = topic + "_" + str(current_timestamp_milliseconds)
group_id = "group_for_" + temp_name
directory_name = "temp-file_" + temp_name
base_dir = Path(directory_name)
state.config.context_switch(from_context)

with DeleteOnFinished(base_dir) as working_dir:
Bibob7 marked this conversation as resolved.
Show resolved Hide resolved
number_consumed_messages = _consume_to_file(
working_dir, topic, group_id, from_context, numbers, avro, last
)

if number_consumed_messages == 0:
click.echo(
click.style(
"Execution stopped, because no messages consumed.", fg="red"
)
)
return

click.echo(
"\nReady to produce to context "
+ blue_bold(to_context)
+ " and target topic "
+ blue_bold(topic)
)

if not ensure_approval("Do you want to proceed?\n", no_verify=state.no_verify):
return

state.config.context_switch(to_context)
_produce_from_file(topic, to_context, working_dir, avro)


def _produce_from_file(
topic: str, to_context: str, working_dir: pathlib.Path, avro: bool
):
if avro:
producer = AvroFileProducer.create(working_dir)
else:
producer = FileProducer.create(working_dir)
click.echo(
"\nStart producing to topic "
+ blue_bold(topic)
+ " in target context "
+ blue_bold(to_context)
)
number_produced_messages = producer.produce_from_file(topic)
click.echo(
green_bold(str(number_produced_messages))
+ " messages successfully produced to context "
+ green_bold(to_context)
+ " and topic "
+ green_bold(topic)
+ "."
)


def _consume_to_file(
working_dir: pathlib.Path,
topic: str,
group_id: str,
from_context: str,
numbers: int,
avro: bool,
last: bool,
) -> int:
if avro:
consumer = AvroFileConsumer.create(group_id, topic, working_dir, last)
else:
consumer = FileConsumer.create(group_id, topic, working_dir, last)
click.echo(
"\nStart consuming from topic "
+ blue_bold(topic)
+ " in source context "
+ blue_bold(from_context)
)
number_consumed_messages = consumer.consume_to_file(int(numbers))
click.echo(blue_bold(str(number_consumed_messages)) + " messages consumed.")

return number_consumed_messages


@esque.command("ping", help="Tests the connection to the kafka cluster.")
@click.option("-t", "--times", help="Number of pings.", default=10)
@click.option("-w", "--wait", help="Seconds to wait between pings.", default=1)
Expand All @@ -293,13 +433,13 @@ def ping(state, times, wait):
except TopicAlreadyExistsException:
click.echo("Topic already exists.")

producer = Producer()
consumer = Consumer()
producer = PingProducer()
consumer = PingConsumer(PING_GROUP_ID, PING_TOPIC, True)

click.echo(f"Ping with {state.cluster.bootstrap_servers}")

for i in range(times):
producer.produce_ping()
producer.produce_ping(PING_TOPIC)
_, delta = consumer.consume_ping()
deltas.append(delta)
click.echo(f"m_seq={i} time={delta:.2f}ms")
Expand Down
16 changes: 16 additions & 0 deletions esque/cli/helpers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,23 @@
import pathlib
import shutil

import click


def ensure_approval(question: str, *, no_verify: bool = False) -> bool:
if no_verify:
return True
return click.confirm(question)


class DeleteOnFinished:
def __init__(self, dir_: pathlib.Path):
self._dir = dir_
self._dir.mkdir(parents=True)

def __enter__(self) -> pathlib.Path:
return self._dir

def __exit__(self, exc_type, exc_val, exc_tb):
if self._dir.exists():
shutil.rmtree(self._dir)