Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
smueller18 committed Apr 2, 2017
1 parent 3a2406d commit 999f440
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 15 deletions.
31 changes: 21 additions & 10 deletions kafka_connector/avro_loop_producer.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
# -*- coding: utf-8 -*-

import logging
import time
import requests.exceptions

from avro.schema import SchemaParseException
from confluent_kafka import avro, KafkaError, KafkaException
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer

from kafka_connector.timer import Timer, Begin, Unit
Expand All @@ -18,12 +20,10 @@
'log_level': 0,
'api.version.request': True,
'queue.buffering.max.messages': 100000,
'queue.buffering.max.ms': 1,
'queue.buffering.max.ms': 10,
'message.send.max.retries': 200,
'session.timeout.ms': 10,
'default.topic.config':
{
'message.timeout.ms': 300000,
'produce.offset.report': True
}
}
Expand All @@ -39,12 +39,10 @@ class AvroLoopProducer(AvroProducer):
... 'log_level': 0,
... 'api.version.request': True,
... 'queue.buffering.max.messages': 100000,
... 'queue.buffering.max.ms': 1,
... 'queue.buffering.max.ms': 10,
... 'message.send.max.retries': 200,
... 'session.timeout.ms': 10,
... 'default.topic.config':
... {
... 'message.timeout.ms': 300000,
... 'produce.offset.report': True
... }
... }
Expand All @@ -67,6 +65,8 @@ def __init__(self, bootstrap_servers, schema_registry_url, topic, key_schema, va
:type value_schema: str
:param config:
:type config: dict
:param error_callback: function that handles occurring error events
:type error_callback: lambda err: function(err)
:raise SchemaParseException:
"""
Expand Down Expand Up @@ -124,7 +124,11 @@ def produce(self, key, value, timestamp=None, on_delivery=lambda err, msg: AvroL
if timestamp is not None:
kwargs.update({"timestamp": timestamp})

super().produce(topic=self._topic, key=key, value=value, **kwargs)
try:
super().produce(topic=self._topic, key=key, value=value, **kwargs)
except requests.exceptions.ConnectionError as e:
logger.error(e)
time.sleep(1)

if type(poll_timeout) != bool:
super().poll(timeout=poll_timeout)
Expand All @@ -133,14 +137,21 @@ def loop(self, get_data_function, interval=1, unit=Unit.SECOND, begin=Begin.FULL

get_data_kwargs = get_data_function()

timer = Timer(lambda **kwargs: self.produce(**get_data_kwargs), interval, unit, begin)
self._timer = Timer(lambda **kwargs: self.produce(**get_data_kwargs), interval, unit, begin)
try:
timer.start()
self._timer.start()
except KeyboardInterrupt:
super().flush(0.1)
# todo handle KeyboardInterrupt
return

def stop(self):
"""
Stops the timer if it is running
"""
if self._timer is not None and not self._timer.is_stopped():
self._timer.stop()

@staticmethod
def on_delivery(err, msg):
if err is not None:
Expand Down
14 changes: 9 additions & 5 deletions tests/avro_loop_producer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import time
import datetime as dt
import logging
import os

from kafka_connector.avro_loop_producer import AvroLoopProducer
from kafka_connector.timer import Begin
Expand All @@ -9,12 +10,15 @@
logging.basicConfig(level=logging.INFO, format=LOGGING_FORMAT)
logger = logging.getLogger(__name__)

__dirname__ = os.path.dirname(os.path.abspath(__file__))


def get_data():
return {'key': time.time(), 'value': {'name': 'abc '}}
return {'key': time.time(), 'value': {'name': 'abc', 'number': int(time.time())}}


stcs = AvroLoopProducer("localhost:9092", "http://localhost:8081", "testtopic",
"schema/key_schema.avsc", "schema/value_schema.avsc")
stcs.loop(get_data)
# stcs.loop(get_data, begin=[dt.time(19, 4, 20), dt.time(19, 4, 40)])
producer = AvroLoopProducer("localhost:9092", "http://localhost:8081", "testtopic",
__dirname__ + "/schema/key_schema.avsc",
__dirname__ + "/schema/value_schema.avsc")
producer.loop(get_data)
# producer.loop(get_data, begin=[dt.time(19, 4, 20), dt.time(19, 4, 40)])

0 comments on commit 999f440

Please sign in to comment.