From d5fad87e5dea6bad77c9c7159ad18dce8e5d4f85 Mon Sep 17 00:00:00 2001 From: Sergei Beilin Date: Mon, 24 Sep 2018 22:04:18 +0200 Subject: [PATCH] Fix subject name strategy (#5) * Add tests to subject-name-sctrategy * Update docs * Bump version --- README.md | 2 +- kafkian/serde/serialization.py | 25 +++++++---------- setup.py | 2 +- tests/unit/test_subject_name_strategy.py | 35 ++++++++++++++++++++++++ 4 files changed, 47 insertions(+), 17 deletions(-) create mode 100644 tests/unit/test_subject_name_strategy.py diff --git a/README.md b/README.md index 2024d48..3955b08 100644 --- a/README.md +++ b/README.md @@ -44,7 +44,7 @@ producer = Producer( }, key_serializer=AvroStringKeySerializer(schema_registry_url=config.SCHEMA_REGISTRY_URL), value_serializer=AvroSerializer(schema_registry_url=config.SCHEMA_REGISTRY_URL, - value_subject_name_strategy=SubjectNameStrategy.RecordNameStrategy) + subject_name_strategy=SubjectNameStrategy.RecordNameStrategy) ) ``` diff --git a/kafkian/serde/serialization.py b/kafkian/serde/serialization.py index 61704a8..33de230 100644 --- a/kafkian/serde/serialization.py +++ b/kafkian/serde/serialization.py @@ -26,24 +26,22 @@ class AvroSerializer(Serializer): def __init__(self, schema_registry_url: str, auto_register_schemas: bool = True, - key_subject_name_strategy: SubjectNameStrategy = SubjectNameStrategy.TopicNameStrategy, - value_subject_name_strategy: SubjectNameStrategy = SubjectNameStrategy.TopicRecordNameStrategy, + subject_name_strategy: SubjectNameStrategy = SubjectNameStrategy.RecordNameStrategy, **kwargs): super().__init__(**kwargs) schema_registry_url = schema_registry_url self.schema_registry = CachedSchemaRegistryClient(schema_registry_url) self.auto_register_schemas = auto_register_schemas - self.key_subject_name_strategy = key_subject_name_strategy - self.value_subject_name_strategy = value_subject_name_strategy + self.subject_name_strategy = subject_name_strategy self._serializer_impl = MessageSerializer(self.schema_registry) - def _get_subject(self, topic, schema, strategy, is_key=False): - if strategy == SubjectNameStrategy.TopicNameStrategy: + def _get_subject(self, topic, schema, is_key=False): + if self.subject_name_strategy == SubjectNameStrategy.TopicNameStrategy: subject = topic - elif strategy == SubjectNameStrategy.RecordNameStrategy: + elif self.subject_name_strategy == SubjectNameStrategy.RecordNameStrategy: subject = schema.fullname - elif strategy == SubjectNameStrategy.TopicRecordNameStrategy: - subject = '%{}-%{}'.format(topic, schema.fullname) + elif self.subject_name_strategy == SubjectNameStrategy.TopicRecordNameStrategy: + subject = '{}-{}'.format(topic, schema.fullname) else: raise ValueError('Unknown SubjectNameStrategy') @@ -51,22 +49,19 @@ def _get_subject(self, topic, schema, strategy, is_key=False): return subject def _ensure_schema(self, topic, schema, is_key=False): - subject = self._get_subject( - topic, schema, self.key_subject_name_strategy, is_key) + subject = self._get_subject(topic, schema, is_key) if self.auto_register_schemas: schema_id = self.schema_registry.register(subject, schema) schema = self.schema_registry.get_by_id(schema_id) else: - schema_id, schema, _ = self.schema_registry.get_latest_schema( - subject) + schema_id, schema, _ = self.schema_registry.get_latest_schema(subject) return schema_id, schema def serialize(self, value, topic, is_key=False, **kwargs): schema_id, _ = self._ensure_schema(topic, value._schema, is_key) - return self._serializer_impl.encode_record_with_schema_id( - schema_id, value, is_key) + return self._serializer_impl.encode_record_with_schema_id(schema_id, value, is_key) class AvroStringKeySerializer(AvroSerializer): diff --git a/setup.py b/setup.py index fbf4627..86891b8 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ def readme(): return f.read() setup(name="kafkian", - version="0.7.1", + version="0.7.2", description="Opinionated Kafka Python client on top of Confluent python library", long_description=readme(), long_description_content_type="text/markdown", diff --git a/tests/unit/test_subject_name_strategy.py b/tests/unit/test_subject_name_strategy.py new file mode 100644 index 0000000..92979cb --- /dev/null +++ b/tests/unit/test_subject_name_strategy.py @@ -0,0 +1,35 @@ +import pytest +from confluent_kafka import avro + +from kafkian.serde.serialization import SubjectNameStrategy, AvroSerializer + +value_schema_str = """ +{ + "namespace": "my.test", + "name": "Value", + "type": "record", + "fields" : [ + { + "name" : "name", + "type" : "string" + } + ] +} +""" + +schema = avro.loads(value_schema_str) + +@pytest.mark.parametrize( + 'strategy,is_key,subject', + ( + (SubjectNameStrategy.TopicNameStrategy, False, 'topic-value'), + (SubjectNameStrategy.TopicNameStrategy, True, 'topic-key'), + (SubjectNameStrategy.RecordNameStrategy, False, 'my.test.Value-value'), + (SubjectNameStrategy.RecordNameStrategy, True, 'my.test.Value-key'), + (SubjectNameStrategy.TopicRecordNameStrategy, False, 'topic-my.test.Value-value'), + (SubjectNameStrategy.TopicRecordNameStrategy, True, 'topic-my.test.Value-key'), + ) +) +def test_subject_name_strategy(strategy, is_key, subject): + ser = AvroSerializer('', subject_name_strategy=strategy) + assert ser._get_subject("topic", schema, is_key) == subject