Skip to content

Commit

Permalink
chore: add tests for aiokafka consumer (#4372)
Browse files Browse the repository at this point in the history
  • Loading branch information
lvrach authored and atzoum committed Feb 12, 2024
1 parent 3acd5e6 commit dba6d31
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 0 deletions.
82 changes: 82 additions & 0 deletions services/streammanager/kafka/kafkamanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,88 @@ func TestIntegration(t *testing.T) {
})
}

func TestAIOKafka(t *testing.T) {
kafkaBatchingEnabled = true
kafkaCompression = client.CompressionZstd
t.Cleanup(func() {
kafkaBatchingEnabled = false
kafkaCompression = client.CompressionNone
})
ctrl := gomock.NewController(t)
kafkaStats.creationTime = getMockedTimer(t, ctrl, true)
kafkaStats.produceTime = getMockedTimer(t, ctrl, true)
kafkaStats.prepareBatchTime = getMockedTimer(t, ctrl, true)
kafkaStats.publishTime = getMockedTimer(t, ctrl, true)
kafkaStats.batchSize = mock_stats.NewMockMeasurement(ctrl)
kafkaStats.batchSize.(*mock_stats.MockMeasurement).EXPECT().Observe(3.0).Times(1)

pool, err := dockertest.NewPool("")
require.NoError(t, err)

kafkaNetwork, err := pool.CreateNetwork("kafka_network_" + misc.FastUUID().String())
require.NoError(t, err)
t.Cleanup(func() {
if err := pool.RemoveNetwork(kafkaNetwork); err != nil {
t.Logf("Error while removing Docker network: %v", err)
}
})

kafkaContainer, err := dockerKafka.Setup(pool, &testCleanup{t},
dockerKafka.WithBrokers(1),
dockerKafka.WithNetwork(kafkaNetwork.Network),
)
require.NoError(t, err)

kafkaTopic := "some-topic"

destConfig := map[string]interface{}{
"topic": kafkaTopic,
"hostname": "localhost",
"port": kafkaContainer.Ports[0],
}
dest := backendconfig.DestinationT{Config: destConfig}

p, err := NewProducer(&dest, common.Opts{})
require.NotNilf(t, p, "expected producer to be created, got nil: %v", err)
require.NoError(t, err)

var statusCode int
var returnMessage, errMessage string
require.Eventually(t, func() bool {
statusCode, returnMessage, errMessage = p.Produce(json.RawMessage(`[
{"message":"one","topic":"foo-bar","userId":"1234"},
{"message":"two","topic":"foo-bar","userId":"1234"},
{"message":"three","topic":"foo-bar","userId":"1234"}
]`), destConfig)
return statusCode == http.StatusOK
}, 30*time.Second, 100*time.Millisecond)
require.Equal(t, "Kafka: Message delivered in batch", returnMessage)
require.Equal(t, "Kafka: Message delivered in batch", errMessage)

consumerContainer, err := pool.BuildAndRunWithOptions("./testdata/aiokafka/Dockerfile", &dockertest.RunOptions{
Name: fmt.Sprintf("aiokafka-%s", misc.FastUUID().String()),
NetworkID: kafkaNetwork.Network.ID,
Cmd: []string{"tail", "-f", "/dev/null"},
})
require.NoError(t, err)
t.Cleanup(func() {
if err := pool.Purge(consumerContainer); err != nil {
t.Logf("Error while purging Docker container: %v", err)
}
})

buf := bytes.NewBuffer(nil)
code, err := consumerContainer.Exec([]string{"python", "consumer.py"}, dockertest.ExecOptions{
Env: []string{"KAFKA_BROKER=kafka1:9090", "KAFKA_TOPIC=foo-bar", "EXPECTED_MESSAGE_COUNT=3"},
StdOut: buf,
StdErr: os.Stderr,
})

require.NoError(t, err)
require.Equal(t, 0, code)
require.JSONEq(t, `[{"key": "1234", "value": "\"one\""}, {"key": "1234", "value": "\"two\""}, {"key": "1234", "value": "\"three\""}]`, buf.String())
}

func TestNewProducerForAzureEventHubs(t *testing.T) {
t.Run("missing configuration data", func(t *testing.T) {
t.Run("missing topic", func(t *testing.T) {
Expand Down
17 changes: 17 additions & 0 deletions services/streammanager/kafka/testdata/aiokafka/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Use an official Python runtime as a parent image
FROM python:3.9-slim-buster

ENV KAFKA_BROKER=kafka1:9094
ENV EXPECTED_MESSAGE_COUNT=3

# Set the working directory to /app
WORKDIR /app

# Copy the current directory contents into the container at /app
COPY . /app

# Install any needed packages specified in requirements.txt
RUN pip install --trusted-host pypi.python.org -r requirements.txt

# Run app.py when the container launches
CMD ["tail", "-f", "/dev/null"]
39 changes: 39 additions & 0 deletions services/streammanager/kafka/testdata/aiokafka/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from aiokafka import AIOKafkaConsumer
import asyncio
import os
import uuid
import sys
import json

async def consume():

kafkaBroker = os.environ.get('KAFKA_BROKER')
kafkaTopic = os.environ.get('KAFKA_TOPIC')

msg_count = int(os.environ.get('EXPECTED_MESSAGE_COUNT', 0), 10)

print(f"Kafka Broker: {kafkaBroker}", file=sys.stderr)
print(f"Kafka Topic: {kafkaTopic}", file=sys.stderr)
print(f"Expected Message Count: {msg_count}", file=sys.stderr)

consumer = AIOKafkaConsumer(
kafkaTopic,
bootstrap_servers=kafkaBroker,
group_id="aiokafka-test-"+uuid.uuid4().__str__(),
auto_offset_reset="earliest",
)
# print(await consumer.topics(), file=sys.stderr)

output = list()
await consumer.start()
for _ in range(msg_count):
msg = await consumer.getone()
output.append({
"key": msg.key.decode('utf-8'),
"value": msg.value.decode('utf-8'),
})

await consumer.stop()
print(json.dumps(output), file=sys.stdout)

asyncio.run(consume())
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
cramjam==2.8.0
aiokafka==0.10.0

0 comments on commit dba6d31

Please sign in to comment.