Skip to content

Commit

Permalink
Merge cf9d803 into b925f1d
Browse files Browse the repository at this point in the history
  • Loading branch information
Madison Bahmer committed Sep 27, 2018
2 parents b925f1d + cf9d803 commit e85cfe9
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 19 deletions.
31 changes: 13 additions & 18 deletions rest/rest_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,26 +545,21 @@ def _feed_to_kafka(self, json_item):
:param json_item: The json item to send
:returns: A boolean indicating whther the data was sent successfully or not
"""
@MethodTimer.timeout(self.settings['KAFKA_FEED_TIMEOUT'], False)
def _feed(json_item):
try:
self.logger.debug("Sending json to kafka at " +
str(self.settings['KAFKA_PRODUCER_TOPIC']))
future = self.producer.send(self.settings['KAFKA_PRODUCER_TOPIC'],
json_item)
future.add_callback(self._kafka_success)
future.add_errback(self._kafka_failure)

self.producer.flush()

return True
self.logger.debug("Sending json to kafka at " +
str(self.settings['KAFKA_PRODUCER_TOPIC']))
future = self.producer.send(self.settings['KAFKA_PRODUCER_TOPIC'],
json_item)
future.add_callback(self._kafka_success)
future.add_errback(self._kafka_failure)

except Exception as e:
self.logger.error("Lost connection to Kafka")
self._spawn_kafka_connection_thread()
return False
try:
record_metadata = future.get(timeout=self.settings['KAFKA_FEED_TIMEOUT'])
except KafkaError:
self.logger.error("Lost connection to Kafka")
self._spawn_kafka_connection_thread()
return False

return _feed(json_item)
return True

# Routes --------------------

Expand Down
7 changes: 6 additions & 1 deletion rest/tests/test_rest_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from kafka.common import OffsetOutOfRangeError
from kafka.conn import ConnectionStates
from kafka.common import KafkaError
from redis.exceptions import ConnectionError

class Override(RestService):
Expand Down Expand Up @@ -337,7 +338,11 @@ def test_feed_to_kafka(self):
# test bad
self.rest_service._spawn_kafka_connection_thread = MagicMock()
self.rest_service.logger.error = MagicMock()
self.rest_service.producer.send = MagicMock(side_effect=Exception)

bad_future = MagicMock()
bad_future.get = MagicMock(side_effect=KafkaError)

self.rest_service.producer.send = MagicMock(return_value=bad_future)
self.assertFalse(self.rest_service._feed_to_kafka({}))
self.assertTrue(self.rest_service.logger.error.called)
self.assertTrue(self.rest_service._spawn_kafka_connection_thread.called)
Expand Down

0 comments on commit e85cfe9

Please sign in to comment.