Skip to content

Commit

Permalink
Merge pull request #298 from mcasado/KAFKA_TRANSPORT
Browse files Browse the repository at this point in the history
Improved kafka test
  • Loading branch information
josegonzalez committed Mar 3, 2015
2 parents bae29fa + eeb7459 commit 63f06bb
Showing 1 changed file with 20 additions and 3 deletions.
23 changes: 20 additions & 3 deletions beaver/tests/test_kafka_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import tempfile
import logging

from kafka import KafkaClient, MultiProcessConsumer

import beaver
from beaver.config import BeaverConfig
Expand Down Expand Up @@ -49,8 +50,14 @@ def test_builtin_kafka(cls):
cls.beaver_config.set('kafka_hosts', cls.server.host + ":" + str(cls.server.port))

transport = create_transport(cls.beaver_config, logger=cls.logger)

cls.assertIsInstance(transport, beaver.transports.kafka_transport.KafkaTransport)

data = {}
lines = ['log1\n', 'log2\n']
lines = []
n=100
for i in range(n):
lines.append('log' + str(i) + '\n')
new_lines = []
for line in lines:
message = unicode_dammit(line)
Expand All @@ -60,8 +67,18 @@ def test_builtin_kafka(cls):
data['lines'] = new_lines
data['fields'] = []
transport.callback("test.log", **data)
transport.interrupt()
cls.assertIsInstance(transport, beaver.transports.kafka_transport.KafkaTransport)

messages = cls._consume_messages(cls.server.host, cls.server.port)
cls.assertEqual(n, messages.__len__())
for message in messages:
cls.assertIn('"file": "test.log", "message": "log', message.message.value);
print(message)
print('\n')

transport.interrupt()

def _consume_messages(cls, host, port):
kafka = KafkaClient(cls.server.host + ":" + str(cls.server.port))
consumer = MultiProcessConsumer(kafka, None, cls.beaver_config.get('kafka_topic'), num_procs=5)
return consumer.get_messages(count=100, block=True, timeout=5)

0 comments on commit 63f06bb

Please sign in to comment.