Skip to content

Commit

Permalink
Merge pull request #42 from sambott/avro
Browse files Browse the repository at this point in the history
Add support for Avro Schema Registry
  • Loading branch information
sambott committed Jan 2, 2018
2 parents 2bfd47c + 585fba5 commit a05037a
Show file tree
Hide file tree
Showing 7 changed files with 293 additions and 2 deletions.
4 changes: 3 additions & 1 deletion setup.py
Expand Up @@ -12,8 +12,10 @@
readme = readme_file.read()

requirements = [
'javaproperties',
'confluent-kafka',
'javaproperties'
'requests',
'avro-python3'
]

test_requirements = [
Expand Down
149 changes: 149 additions & 0 deletions tests/processor/serde/mock_schema_registry.py
@@ -0,0 +1,149 @@
#!/usr/bin/env python
#
# Copyright 2016 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


#
# derived from https://github.com/verisign/python-confluent-schemaregistry.git
#

from confluent_kafka.avro import ClientError


class MockSchemaRegistryClient(object):
"""
A client that acts as a schema registry locally.
Compatibiity related methods are not implemented at this time.
"""

def __init__(self, max_schemas_per_subject=1000):
self.max_schemas_per_subject = max_schemas_per_subject
# subj => { schema => id }
self.subject_to_schema_ids = {}
# id => avro_schema
self.id_to_schema = {}
# subj => { schema => version }
self.subject_to_schema_versions = {}

self.subject_to_latest_schema = {}

# counters
self.next_id = 1
self.schema_to_id = {}

def _get_next_id(self, schema):
if schema in self.schema_to_id:
return self.schema_to_id[schema]
result = self.next_id
self.next_id += 1
self.schema_to_id[schema] = result
return result

def _get_next_version(self, subject):
if subject not in self.subject_to_schema_versions:
self.subject_to_schema_versions[subject] = {}
return len(self.subject_to_schema_versions[subject])

def _get_all_versions(self, subject):
versions = self.subject_to_schema_versions.get(subject, {})
return sorted(versions)

def _add_to_cache(self, cache, subject, schema, value):
if subject not in cache:
cache[subject] = {}
sub_cache = cache[subject]
sub_cache[schema] = value

def _cache_schema(self, schema, schema_id, subject, version):
# don't overwrite anything
if schema_id in self.id_to_schema:
schema = self.id_to_schema[schema_id]
else:
self.id_to_schema[schema_id] = schema

self._add_to_cache(self.subject_to_schema_ids,
subject, schema, schema_id)

self._add_to_cache(self.subject_to_schema_versions,
subject, schema, version)

if subject in self.subject_to_latest_schema:
si, s, v = self.subject_to_latest_schema[subject]
if v > version:
return
self.subject_to_latest_schema[subject] = (schema_id, schema, version)

def register(self, subject, avro_schema):
"""
Register a schema with the registry under the given subject
and receive a schema id.
avro_schema must be a parsed schema from the python avro library
Multiple instances of the same schema will result in inconsistencies.
"""
schemas_to_id = self.subject_to_schema_ids.get(subject, {})
schema_id = schemas_to_id.get(avro_schema, -1)
if schema_id != -1:
return schema_id

# add it
version = self._get_next_version(subject)
schema_id = self._get_next_id(avro_schema)

# cache it
self._cache_schema(avro_schema, schema_id, subject, version)
return schema_id

def get_by_id(self, schema_id):
"""Retrieve a parsed avro schema by id or None if not found"""
return self.id_to_schema.get(schema_id, None)

def get_latest_schema(self, subject):
"""
Return the latest 3-tuple of:
(the schema id, the parsed avro schema, the schema version)
for a particular subject.
If the subject is not found, (None,None,None) is returned.
"""
return self.subject_to_latest_schema.get(subject, (None, None, None))

def get_version(self, subject, avro_schema):
"""
Get the version of a schema for a given subject.
Returns -1 if not found.
"""
schemas_to_version = self.subject_to_schema_versions.get(subject, {})
return schemas_to_version.get(avro_schema, -1)

def get_id_for_schema(self, subject, avro_schema):
"""
Get the ID of a parsed schema
"""
schemas_to_id = self.subject_to_schema_ids.get(subject, {})
return schemas_to_id.get(avro_schema, -1)

def test_compatibility(self, subject, avro_schema, version='latest'):
raise ClientError("not implemented")

def update_compatibility(self, level, subject=None):
raise ClientError("not implemented")

def get_compatibility(self, subject=None):
raise ClientError("not implemented")
45 changes: 45 additions & 0 deletions tests/processor/serde/test_avro_serde.py
@@ -0,0 +1,45 @@
import io
import struct
from confluent_kafka.avro import loads as avro_loads
from .mock_schema_registry import MockSchemaRegistryClient
from winton_kafka_streams.processor.serialization.serdes import AvroSerde
import winton_kafka_streams.kafka_config as config

string_avro = '{"type": "string"}'


def create_serde(registry, schema):
serde = AvroSerde()
config.AVRO_SCHEMA_REGISTRY = 'nowhere'
config.KEY_AVRO_SCHEMA = schema

serde.configure(config, True)
serde.serializer._avro_helper._set_serializer(registry)
serde.deserializer._avro_helper._set_serializer(registry)

serde.test_registry = registry
return serde


def test_serialize_avro():
registry = MockSchemaRegistryClient()
serde = create_serde(registry, string_avro)

message = serde.serializer.serialize('topic', 'data')
message_io = io.BytesIO(message)
magic, schema_id, length, string = struct.unpack('>bIb4s', message_io.read(10))
assert(0 == magic)
assert(schema_id in registry.id_to_schema)
assert(8 == length) # (==4) uses variable-length zig-zag encoding
assert(b'data' == string)
message_io.close()


def test_deserialize_avro():
registry = MockSchemaRegistryClient()
serde = create_serde(registry, string_avro)
schema_id = registry.register('topic-value', avro_loads(string_avro))

serialized = b'\0' + schema_id.to_bytes(4, 'big') + b'\x08data'
message = serde.deserializer.deserialize('ignored', serialized)
assert('data' == message)
10 changes: 9 additions & 1 deletion winton_kafka_streams/kafka_config.py
Expand Up @@ -4,7 +4,7 @@
Configuration may either be set inline in your application using:
import kafka_config
kafka_config.BOOSTRAP_SERVERS = 'localhost:9092'
kafka_config.BOOTSTRAP_SERVERS = 'localhost:9092'
or as a file in java properties format. The property names are identical to
those used in the Java implementation for ease of sharing between both.
Expand Down Expand Up @@ -273,6 +273,14 @@
KEY_SERIALIZER_INT_SIZE = None
VALUE_SERIALIZER_INT_SIZE = None

# AvroSerde
AVRO_SCHEMA_REGISTRY = None
AVRO_SCHEMA = None
KEY_AVRO_SCHEMA_REGISTRY = None
KEY_AVRO_SCHEMA = None
VALUE_AVRO_SCHEMA_REGISTRY = None
VALUE_AVRO_SCHEMA = None


def read_local_config(config_file):
if not os.path.exists(config_file):
Expand Down
68 changes: 68 additions & 0 deletions winton_kafka_streams/processor/serialization/avro.py
@@ -0,0 +1,68 @@
from confluent_kafka.avro import CachedSchemaRegistryClient, MessageSerializer
from confluent_kafka.avro import loads as avro_loads

from ._serde import extract_config_property
from ._deserializer import Deserializer
from ._serializer import Serializer


class AvroHelper:
def __init__(self):
self._is_key = False
self._schema_registry = None
self._serializer = None
self._schema = None

def _set_serializer(self, schema_registry):
self._schema_registry = schema_registry
self._serializer = MessageSerializer(registry_client=self._schema_registry)

def configure(self, configs, is_key):
self._is_key = is_key
schema_registry_url = extract_config_property(configs, is_key, 'AVRO_SCHEMA_REGISTRY')
schema = extract_config_property(configs, is_key, 'AVRO_SCHEMA')

if schema_registry_url is None:
raise Exception("Missing Avro Schema Registry Url")
else:
self._set_serializer(CachedSchemaRegistryClient(url=schema_registry_url))

if schema:
self._schema = avro_loads(schema)

def serialize(self, topic, data):
if self._schema is None:
raise Exception("Missing Avro Schema")

return self._serializer.encode_record_with_schema(topic, self._schema, data, is_key=self._is_key)

def deserialize(self, topic, data):
return self._serializer.decode_message(data)


class AvroSerializer(Serializer):
def __init__(self):
self._avro_helper = AvroHelper()

def serialize(self, topic, data):
return self._avro_helper.serialize(topic, data)

def configure(self, configs, is_key):
self._avro_helper.configure(configs, is_key)

def close(self):
pass


class AvroDeserializer(Deserializer):
def __init__(self):
self._avro_helper = AvroHelper()

def deserialize(self, topic, data):
return self._avro_helper.deserialize(topic, data)

def configure(self, configs, is_key):
self._avro_helper.configure(configs, is_key)

def close(self):
pass
Expand Up @@ -4,6 +4,7 @@
from .integer import IntegerSerde
from .string import StringSerde
from .json import JsonSerde
from .avro import AvroSerde

from ._serdes import serde_from_string
from ._serdes import serde_as_string
18 changes: 18 additions & 0 deletions winton_kafka_streams/processor/serialization/serdes/avro.py
@@ -0,0 +1,18 @@
"""
Avro Serde
"""
from ..avro import AvroSerializer, AvroDeserializer
from ._wrapper_serde import WrapperSerde


class AvroSerde(WrapperSerde):
"""
Avro Serde that will use Avro and a schema registry
for serialization and deserialization
"""

def __init__(self):
serializer = AvroSerializer()
deserializer = AvroDeserializer()
super().__init__(serializer, deserializer)

0 comments on commit a05037a

Please sign in to comment.