From 005b5319cb78f6225ea872a48113f38ff7c45c62 Mon Sep 17 00:00:00 2001 From: Pavel Batanov Date: Mon, 1 Mar 2021 20:05:16 +0300 Subject: [PATCH] Support RDKafka 4 --- composer.json | 3 ++- src/Extension/KafkaModule.php | 6 ++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/composer.json b/composer.json index ff4d4fc..b960efe 100644 --- a/composer.json +++ b/composer.json @@ -12,7 +12,8 @@ ], "require": { "php": ">=7.1", - "codeception/codeception": "~2.5" + "ext-rdkafka": "^3 || ^4", + "codeception/codeception": "^2.5 | ^3.0 | ^4.0" }, "autoload": { "psr-4": { diff --git a/src/Extension/KafkaModule.php b/src/Extension/KafkaModule.php index 52fe53d..30106bd 100644 --- a/src/Extension/KafkaModule.php +++ b/src/Extension/KafkaModule.php @@ -6,6 +6,7 @@ use Codeception\Module; use Exception; +use Lamoda\Codeception\Extension\MessageSerializer\ArrayMessageSerializer; use Lamoda\Codeception\Extension\MessageSerializer\MessageSerializerInterface; use RdKafka\Conf; use RdKafka\Consumer; @@ -17,6 +18,7 @@ class KafkaModule extends Module { protected const DEFAULT_PARTITION = 0; + protected const FLUSH_TIMEOUT_MS = 3000; /** * @var MessageSerializerInterface @@ -83,6 +85,10 @@ public function putMessageInTopic(string $topicName, string $message, ?int $part $topic = $producer->newTopic($topicName, $this->topicConf); $topic->produce($partition ?? static::DEFAULT_PARTITION, 0, $message); + + if (method_exists($producer, 'flush')) { + $producer->flush(self::FLUSH_TIMEOUT_MS); + } } public function putMessageListInTopic(string $topicName, array $messages, ?int $partition = null): void