Skip to content

Commit

Permalink
Merge 1bd1c77 into af1b1c6
Browse files Browse the repository at this point in the history
  • Loading branch information
Bibob7 committed Jul 22, 2019
2 parents af1b1c6 + 1bd1c77 commit 1f5e458
Show file tree
Hide file tree
Showing 30 changed files with 910 additions and 337 deletions.
2 changes: 1 addition & 1 deletion .flake8
@@ -1,5 +1,5 @@
[flake8]
ignore = E203, E266, E501, W503
max-line-length = 80
max-line-length = 119
max-complexity = 18
select = B,C,E,F,W,T4,B9
22 changes: 15 additions & 7 deletions .travis.yml
Expand Up @@ -2,14 +2,20 @@ dist: xenial
language: python
cache: pip
env:
- ESQUE_ENV=dev TEST_CMD="pytest tests/ --integration --cov=esque --local"
- >
ESQUE_ENV=dev TEST_CMD="pytest tests/ --integration --cov=esque --local"
BROKER_URL="localhost:9092"
ZOOKEEPER_URL="localhost:2181"
SCHEMA_REGISTRY_URL="localhost:8081"
before_install:
- wget https://mirror.netcologne.de/apache.org/kafka/2.2.0/kafka_2.12-2.2.0.tgz -O kafka.tgz
- mkdir -p kafka && tar xzf kafka.tgz -C kafka --strip-components 1
- nohup bash -c "cd kafka && bin/zookeeper-server-start.sh config/zookeeper.properties &"
- nohup bash -c "cd kafka && bin/kafka-server-start.sh config/server.properties &"
- scripts/wait-for-it.sh localhost:9092
- scripts/wait-for-it.sh localhost:2181
- wget http://packages.confluent.io/archive/5.2/confluent-community-5.2.1-2.12.tar.gz -O confluent-community.tgz
- mkdir -p confluent-community && tar xzf confluent-community.tgz -C confluent-community --strip-components 1
- confluent-community/bin/zookeeper-server-start -daemon confluent-community/etc/kafka/zookeeper.properties
- bash -c "scripts/wait-for-it.sh ${ZOOKEEPER_URL} -t 60"
- confluent-community/bin/kafka-server-start -daemon confluent-community/etc/kafka/server.properties
- bash -c "scripts/wait-for-it.sh ${BROKER_URL} -t 60"
- confluent-community/bin/schema-registry-start -daemon confluent-community/etc/schema-registry/schema-registry.properties
- bash -c "scripts/wait-for-it.sh ${SCHEMA_REGISTRY_URL} -t 60"
install:
- pip install coverage coveralls flake8 pipenv
- pipenv install --system --dev --deploy
Expand All @@ -26,11 +32,13 @@ notifications:
matrix:
include:
- name: black
before_install: []
python: 3.7
env:
- TEST_CMD="black --check --verbose ."
- name: flake8
python: 3.7
before_install: []
env:
- TEST_CMD="flake8 esque/"
- name: '3.6'
Expand Down
110 changes: 86 additions & 24 deletions Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions README.md
Expand Up @@ -134,6 +134,17 @@ While this `docker-compose` stack is up, you can run the tests from the CLI via
Alternatively, you can also run the entire test suite, without needing to setup the development environment, in docker compose via `docker-compose -f docker-compose.yml -f docker-compose.test.yml`


### Pre Commit Hooks

To install pre commit hooks run:

```
pip install pre-commit
pre-commit install
pre-commit install-hooks
```


## Alternatives

- [LinkedIn KafkaTools](https://github.com/linkedin/kafka-tools)
Expand Down
3 changes: 2 additions & 1 deletion docker-compose.test.yml
Expand Up @@ -8,7 +8,8 @@ services:
ESQUE_TEST_ENV: "ci"
depends_on:
- kafka
- schema_registry
command: >
-c
"(until (kafkacat -b kafka:9093 -X debug=all -L); do sleep 5s; done) \
&& pytest tests/ --integration"
&& python3 -u -m pytest -v -x tests/ --integration"
14 changes: 12 additions & 2 deletions docker-compose.yml
Expand Up @@ -37,5 +37,15 @@ services:
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0



schema_registry:
image: confluentinc/cp-schema-registry:5.2.2
container_name: schema_registry
environment:
- SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
- SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081
- SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=PLAINTEXT://kafka:9093
depends_on:
- kafka
ports:
- 8081:8081
109 changes: 109 additions & 0 deletions esque/avromessage.py
@@ -0,0 +1,109 @@
import json
import pathlib
import pickle
import struct
from io import BytesIO
from typing import Optional, Tuple, Dict, Iterable, NamedTuple, Any
import itertools as it

import fastavro
from confluent_kafka.cimpl import Message
from confluent_kafka.avro import loads as load_schema

from esque.message import FileWriter, FileReader, KafkaMessage
from esque.schemaregistry import SchemaRegistryClient


class DecodedAvroMessage(NamedTuple):
key: Any
value: Any
partition: int
key_schema_id: int
value_schema_id: int


class AvroFileWriter(FileWriter):
def __init__(self, directory: pathlib.Path, schema_registry_client: SchemaRegistryClient):
super().__init__(directory)
self.directory = directory
self.schema_registry_client = schema_registry_client
self.current_key_schema_id = None
self.current_value_schema_id = None
self.schema_dir_name = None
self.schema_version = it.count(1)
self.open_mode = "wb+"

def write_message_to_file(self, message: Message):
key_schema_id, decoded_key = self.decode_bytes(message.key())
value_schema_id, decoded_value = self.decode_bytes(message.value())
decoded_message = DecodedAvroMessage(
decoded_key, decoded_value, message.partition(), key_schema_id, value_schema_id
)

if self.schema_changed(decoded_message) or self.schema_dir_name is None:
self.schema_dir_name = f"{next(self.schema_version):04}_{key_schema_id}_{value_schema_id}"
self.current_key_schema_id = key_schema_id
self.current_value_schema_id = value_schema_id
self._dump_schemata(key_schema_id, value_schema_id)

serializable_message = {
"key": decoded_message.key,
"value": decoded_message.value,
"partition": decoded_message.partition,
"schema_directory_name": self.schema_dir_name,
}
pickle.dump(serializable_message, self.file)

def _dump_schemata(self, key_schema_id, value_schema_id):
directory = self.directory / self.schema_dir_name
directory.mkdir()
(directory / "key_schema.avsc").write_text(
json.dumps(self.schema_registry_client.get_schema_from_id(key_schema_id).original_schema), encoding="utf-8"
)
(directory / "value_schema.avsc").write_text(
json.dumps(self.schema_registry_client.get_schema_from_id(value_schema_id).original_schema),
encoding="utf-8",
)

def decode_bytes(self, raw_data: Optional[bytes]) -> Tuple[int, Optional[Dict]]:
if raw_data is None:
return -1, None

with BytesIO(raw_data) as fake_stream:
schema_id = extract_schema_id(fake_stream.read(5))
parsed_schema = self.schema_registry_client.get_schema_from_id(schema_id).parsed_schema
record = fastavro.schemaless_reader(fake_stream, parsed_schema)
return schema_id, record

def schema_changed(self, decoded_message: DecodedAvroMessage) -> bool:
return (
self.current_value_schema_id != decoded_message.value_schema_id and decoded_message.value is not None
) or self.current_key_schema_id != decoded_message.key_schema_id


class AvroFileReader(FileReader):
def __init__(self, directory: pathlib.Path):
super().__init__(directory)
self.open_mode = "rb"

def read_from_file(self) -> Iterable[KafkaMessage]:
while True:
try:
record = pickle.load(self.file)
except EOFError:
return

schema_directory = self.directory / record["schema_directory_name"]

key_schema = load_schema((schema_directory / "key_schema.avsc").read_text(encoding="utf-8"))
value_schema = load_schema((schema_directory / "value_schema.avsc").read_text(encoding="utf-8"))

yield KafkaMessage(
json.dumps(record["key"]), json.dumps(record["value"]), record["partition"], key_schema, value_schema
)


def extract_schema_id(message: bytes) -> int:
magic_byte, schema_id = struct.unpack(">bI", message[:5])
assert magic_byte == 0, f"Wrong magic byte ({magic_byte}), no AVRO message."
return schema_id

0 comments on commit 1f5e458

Please sign in to comment.