-
Notifications
You must be signed in to change notification settings - Fork 297
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
chore: add tests for aiokafka consumer (#4372)
- Loading branch information
Showing
4 changed files
with
140 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
# Use an official Python runtime as a parent image | ||
FROM python:3.9-slim-buster | ||
|
||
ENV KAFKA_BROKER=kafka1:9094 | ||
ENV EXPECTED_MESSAGE_COUNT=3 | ||
|
||
# Set the working directory to /app | ||
WORKDIR /app | ||
|
||
# Copy the current directory contents into the container at /app | ||
COPY . /app | ||
|
||
# Install any needed packages specified in requirements.txt | ||
RUN pip install --trusted-host pypi.python.org -r requirements.txt | ||
|
||
# Run app.py when the container launches | ||
CMD ["tail", "-f", "/dev/null"] |
39 changes: 39 additions & 0 deletions
39
services/streammanager/kafka/testdata/aiokafka/consumer.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
from aiokafka import AIOKafkaConsumer | ||
import asyncio | ||
import os | ||
import uuid | ||
import sys | ||
import json | ||
|
||
async def consume(): | ||
|
||
kafkaBroker = os.environ.get('KAFKA_BROKER') | ||
kafkaTopic = os.environ.get('KAFKA_TOPIC') | ||
|
||
msg_count = int(os.environ.get('EXPECTED_MESSAGE_COUNT', 0), 10) | ||
|
||
print(f"Kafka Broker: {kafkaBroker}", file=sys.stderr) | ||
print(f"Kafka Topic: {kafkaTopic}", file=sys.stderr) | ||
print(f"Expected Message Count: {msg_count}", file=sys.stderr) | ||
|
||
consumer = AIOKafkaConsumer( | ||
kafkaTopic, | ||
bootstrap_servers=kafkaBroker, | ||
group_id="aiokafka-test-"+uuid.uuid4().__str__(), | ||
auto_offset_reset="earliest", | ||
) | ||
# print(await consumer.topics(), file=sys.stderr) | ||
|
||
output = list() | ||
await consumer.start() | ||
for _ in range(msg_count): | ||
msg = await consumer.getone() | ||
output.append({ | ||
"key": msg.key.decode('utf-8'), | ||
"value": msg.value.decode('utf-8'), | ||
}) | ||
|
||
await consumer.stop() | ||
print(json.dumps(output), file=sys.stdout) | ||
|
||
asyncio.run(consume()) |
2 changes: 2 additions & 0 deletions
2
services/streammanager/kafka/testdata/aiokafka/requirements.txt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
cramjam==2.8.0 | ||
aiokafka==0.10.0 |