Skip to content

Commit

Permalink
Fix subject name strategy (#5)
Browse files Browse the repository at this point in the history
* Add tests to subject-name-sctrategy

* Update docs

* Bump version
  • Loading branch information
saabeilin committed Sep 24, 2018
1 parent c075a7f commit d5fad87
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 17 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ producer = Producer(
},
key_serializer=AvroStringKeySerializer(schema_registry_url=config.SCHEMA_REGISTRY_URL),
value_serializer=AvroSerializer(schema_registry_url=config.SCHEMA_REGISTRY_URL,
value_subject_name_strategy=SubjectNameStrategy.RecordNameStrategy)
subject_name_strategy=SubjectNameStrategy.RecordNameStrategy)
)

```
Expand Down
25 changes: 10 additions & 15 deletions kafkian/serde/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,47 +26,42 @@ class AvroSerializer(Serializer):
def __init__(self,
schema_registry_url: str,
auto_register_schemas: bool = True,
key_subject_name_strategy: SubjectNameStrategy = SubjectNameStrategy.TopicNameStrategy,
value_subject_name_strategy: SubjectNameStrategy = SubjectNameStrategy.TopicRecordNameStrategy,
subject_name_strategy: SubjectNameStrategy = SubjectNameStrategy.RecordNameStrategy,
**kwargs):
super().__init__(**kwargs)
schema_registry_url = schema_registry_url
self.schema_registry = CachedSchemaRegistryClient(schema_registry_url)
self.auto_register_schemas = auto_register_schemas
self.key_subject_name_strategy = key_subject_name_strategy
self.value_subject_name_strategy = value_subject_name_strategy
self.subject_name_strategy = subject_name_strategy
self._serializer_impl = MessageSerializer(self.schema_registry)

def _get_subject(self, topic, schema, strategy, is_key=False):
if strategy == SubjectNameStrategy.TopicNameStrategy:
def _get_subject(self, topic, schema, is_key=False):
if self.subject_name_strategy == SubjectNameStrategy.TopicNameStrategy:
subject = topic
elif strategy == SubjectNameStrategy.RecordNameStrategy:
elif self.subject_name_strategy == SubjectNameStrategy.RecordNameStrategy:
subject = schema.fullname
elif strategy == SubjectNameStrategy.TopicRecordNameStrategy:
subject = '%{}-%{}'.format(topic, schema.fullname)
elif self.subject_name_strategy == SubjectNameStrategy.TopicRecordNameStrategy:
subject = '{}-{}'.format(topic, schema.fullname)
else:
raise ValueError('Unknown SubjectNameStrategy')

subject += '-key' if is_key else '-value'
return subject

def _ensure_schema(self, topic, schema, is_key=False):
subject = self._get_subject(
topic, schema, self.key_subject_name_strategy, is_key)
subject = self._get_subject(topic, schema, is_key)

if self.auto_register_schemas:
schema_id = self.schema_registry.register(subject, schema)
schema = self.schema_registry.get_by_id(schema_id)
else:
schema_id, schema, _ = self.schema_registry.get_latest_schema(
subject)
schema_id, schema, _ = self.schema_registry.get_latest_schema(subject)

return schema_id, schema

def serialize(self, value, topic, is_key=False, **kwargs):
schema_id, _ = self._ensure_schema(topic, value._schema, is_key)
return self._serializer_impl.encode_record_with_schema_id(
schema_id, value, is_key)
return self._serializer_impl.encode_record_with_schema_id(schema_id, value, is_key)


class AvroStringKeySerializer(AvroSerializer):
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ def readme():
return f.read()

setup(name="kafkian",
version="0.7.1",
version="0.7.2",
description="Opinionated Kafka Python client on top of Confluent python library",
long_description=readme(),
long_description_content_type="text/markdown",
Expand Down
35 changes: 35 additions & 0 deletions tests/unit/test_subject_name_strategy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import pytest
from confluent_kafka import avro

from kafkian.serde.serialization import SubjectNameStrategy, AvroSerializer

value_schema_str = """
{
"namespace": "my.test",
"name": "Value",
"type": "record",
"fields" : [
{
"name" : "name",
"type" : "string"
}
]
}
"""

schema = avro.loads(value_schema_str)

@pytest.mark.parametrize(
'strategy,is_key,subject',
(
(SubjectNameStrategy.TopicNameStrategy, False, 'topic-value'),
(SubjectNameStrategy.TopicNameStrategy, True, 'topic-key'),
(SubjectNameStrategy.RecordNameStrategy, False, 'my.test.Value-value'),
(SubjectNameStrategy.RecordNameStrategy, True, 'my.test.Value-key'),
(SubjectNameStrategy.TopicRecordNameStrategy, False, 'topic-my.test.Value-value'),
(SubjectNameStrategy.TopicRecordNameStrategy, True, 'topic-my.test.Value-key'),
)
)
def test_subject_name_strategy(strategy, is_key, subject):
ser = AvroSerializer('', subject_name_strategy=strategy)
assert ser._get_subject("topic", schema, is_key) == subject

0 comments on commit d5fad87

Please sign in to comment.