Skip to content

Commit

Permalink
Merge pull request #36 from ah-/change_logging
Browse files Browse the repository at this point in the history
Implement basic state change logging to kafka topic
  • Loading branch information
ah- committed Jan 2, 2018
2 parents 1b5d75d + 774d62e commit 2bfd47c
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 8 deletions.
1 change: 1 addition & 0 deletions examples/wordcount/config.properties
@@ -1,3 +1,4 @@
application.id = wordcount-example
bootstrap.servers = localhost:9092
auto.offset.reset = earliest
value.serde = winton_kafka_streams.processor.serialization.serdes.BytesSerde
Expand Down
9 changes: 5 additions & 4 deletions examples/wordcount/example.py
Expand Up @@ -11,7 +11,7 @@
import collections

from winton_kafka_streams.processor import BaseProcessor, TopologyBuilder
from winton_kafka_streams.state import InMemoryKeyValueStore
from winton_kafka_streams.state import InMemoryKeyValueStore, ChangeLoggingKeyValueStore
import winton_kafka_streams.kafka_config as kafka_config
import winton_kafka_streams.kafka_streams as kafka_streams

Expand All @@ -33,8 +33,8 @@ def process(self, key, value):
words = value.split()
log.debug(f'words list ({words})')
for word in words:
count = self.word_count_store.get(word, 0)
self.word_count_store[word] = count + 1
count = self.word_count_store.get(word, '0')
self.word_count_store[word] = str(int(count) + 1)
self.dirty_words |= set(words)

def punctuate(self, timestamp):
Expand All @@ -48,11 +48,12 @@ def punctuate(self, timestamp):
def run(config_file):
kafka_config.read_local_config(config_file)

count_store = lambda name: ChangeLoggingKeyValueStore(name, InMemoryKeyValueStore)
with TopologyBuilder() as topology_builder:
topology_builder. \
source('input-value', ['wks-wordcount-example-topic']). \
processor('count', WordCount, 'input-value'). \
state_store('counts', InMemoryKeyValueStore, 'count'). \
state_store('counts', count_store, 'count'). \
sink('output-count', 'wks-wordcount-example-count', 'count')

wks = kafka_streams.KafkaStreams(topology_builder, kafka_config)
Expand Down
8 changes: 7 additions & 1 deletion tests/processor/test_base_processor.py
Expand Up @@ -3,16 +3,22 @@
custom processor implementations
"""

import unittest.mock as mock

import winton_kafka_streams.processor as wks_processor
from winton_kafka_streams.processor.processor_context import ProcessorContext
from winton_kafka_streams.processor.task_id import TaskId


def test_createBaseProcessor():
wks_processor.BaseProcessor()


def test_initialiseBaseProcessor():
mock_context = ProcessorContext(None, None, {})
mock_task = mock.Mock()
mock_task.application_id = 'test_id'
mock_task_id = TaskId('test_group', 0)
mock_context = ProcessorContext(mock_task_id, mock_task, None, {})
bp = wks_processor.BaseProcessor()
bp.initialise('my-name', mock_context)

Expand Down
6 changes: 5 additions & 1 deletion tests/processor/test_sink_processor.py
Expand Up @@ -5,6 +5,7 @@
import unittest.mock as mock

import winton_kafka_streams.processor as wks_processor
from winton_kafka_streams.processor.task_id import TaskId

_expected_timestamp = 1234567890

Expand All @@ -22,7 +23,10 @@ def test_sinkProcessorProcess():

with mock.patch('winton_kafka_streams.processor.ProcessorContext.timestamp', new_callable=mock.PropertyMock) as mock_timestamp:
mock_timestamp.return_value = _expected_timestamp
processor_context = wks_processor.ProcessorContext(None, None, {})
mock_task = mock.Mock()
mock_task.application_id = 'test_id'
mock_task_id = TaskId('test_group', 0)
processor_context = wks_processor.ProcessorContext(mock_task_id, mock_task, None, {})
processor_context.recordCollector = mock.MagicMock()

sink = wks_processor.SinkProcessor('topic1')
Expand Down
9 changes: 8 additions & 1 deletion winton_kafka_streams/processor/_stream_task.py
Expand Up @@ -69,7 +69,8 @@ def __init__(self, _task_id, _application_id, _partitions, _topology_builder, _c
self.recordCollector = RecordCollector(self.producer, self.key_serde, self.value_serde)

self.queue = queue.Queue()
self.context = ProcessorContext(self, self.recordCollector, self.topology.state_stores)
self.context = ProcessorContext(self.task_id, self,
self.recordCollector, self.topology.state_stores)

self.punctuation_queue = PunctuationQueue(self.punctuate)
# TODO: use the configured timestamp extractor.
Expand All @@ -80,8 +81,14 @@ def __init__(self, _task_id, _application_id, _partitions, _topology_builder, _c
self.commitOffsetNeeded = False
self.consumedOffsets = {}

self._init_state_stores()
self._init_topology(self.context)

def _init_state_stores(self):
self.log.debug(f'Initialising state stores')
for store in self.topology.state_stores.values():
store.initialise(self.context, store)

def _init_topology(self, context):
for node in self.topology.nodes.values():
try:
Expand Down
4 changes: 3 additions & 1 deletion winton_kafka_streams/processor/processor_context.py
Expand Up @@ -16,10 +16,12 @@ class ProcessorContext(_context.Context):
values to downstream processors.
"""
def __init__(self, _task, _recordCollector, _state_stores):
def __init__(self, _task_id, _task, _recordCollector, _state_stores):

super().__init__(_state_stores)

self.application_id = _task.application_id
self.task_id = _task_id
self.task = _task
self.recordCollector = _recordCollector

Expand Down
1 change: 1 addition & 0 deletions winton_kafka_streams/state/__init__.py
Expand Up @@ -5,3 +5,4 @@

from .simple import SimpleStore
from .in_memory_key_value_store import InMemoryKeyValueStore
from .change_logging_key_value_store import ChangeLoggingKeyValueStore
29 changes: 29 additions & 0 deletions winton_kafka_streams/state/change_logging_key_value_store.py
@@ -0,0 +1,29 @@
from .store_change_logger import StoreChangeLogger

class ChangeLoggingKeyValueStore:
def __init__(self, name, inner):
self.inner = inner(name)


def initialise(self, context, root):
self.inner.initialise(context, root)
self.change_logger = StoreChangeLogger(self.inner.name, context)


def __setitem__(self, key, value):
self.inner.__setitem__(key, value)
self.change_logger.log_change(key, value)


def __getitem__(self, key):
return self.inner.__getitem__(key)


def get(self, key, default=None):
return self.inner.get(key, default)


def __delitem__(self, key):
v = self.inner.__delitem__(key)
self.change_logger.log_change(key, None)
return v
4 changes: 4 additions & 0 deletions winton_kafka_streams/state/in_memory_key_value_store.py
Expand Up @@ -3,6 +3,10 @@ def __init__(self, name):
self.name = name
self.dict = {}

def initialise(self, context, root):
pass
# TODO: register with context, passing restore callback

def __setitem__(self, key, value):
self.dict[key] = value

Expand Down
11 changes: 11 additions & 0 deletions winton_kafka_streams/state/store_change_logger.py
@@ -0,0 +1,11 @@
class StoreChangeLogger:
def __init__(self, store_name, context):
self.topic = f'{context.application_id}-{store_name}-changelog'
self.context = context
self.partition = context.task_id.partition;
self.record_collector = context.recordCollector

def log_change(self, key, value):
if self.record_collector:
self.record_collector.send(self.topic, key, value,
self.context.timestamp, partition=self.partition)

0 comments on commit 2bfd47c

Please sign in to comment.