Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
php-version: 7.3
coverage: none
tools: pecl
extensions: json, mbstring, rdkafka-3.1.2
extensions: json, mbstring, rdkafka

- name: Get Composer Cache Directory
id: composer-cache
Expand Down Expand Up @@ -68,7 +68,7 @@ jobs:
php-version: 7.3
coverage: none
tools: pecl
extensions: json, mbstring, rdkafka-3.1.2
extensions: json, mbstring, rdkafka

- name: Get Composer Cache Directory
id: composer-cache
Expand Down Expand Up @@ -110,7 +110,7 @@ jobs:
php-version: 7.3
coverage: pcov
tools: pecl
extensions: json, mbstring, rdkafka-3.1.2
extensions: json, mbstring, rdkafka

- name: Get Composer Cache Directory
id: composer-cache
Expand Down Expand Up @@ -140,7 +140,7 @@ jobs:
runs-on: ubuntu-18.04
strategy:
matrix:
php: [7.3]
php: [7.3, 7.4]
env: [
'DEPENDENCIES=--prefer-lowest',
'',
Expand All @@ -164,7 +164,7 @@ jobs:
php-version: ${{ matrix.php }}
coverage: none
tools: pecl
extensions: json, mbstring, rdkafka-3.1.2
extensions: json, mbstring, rdkafka

- name: Get Composer Cache Directory
id: composer-cache
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/infection.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
php-version: 7.3
coverage: pcov
tools: pecl
extensions: json, mbstring, rdkafka-3.1.2
extensions: json, mbstring, rdkafka

- name: Install Dependencies
run: composer install --prefer-dist --no-progress --no-suggest
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/shepherd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
php-version: 7.3
coverage: none
tools: pecl
extensions: json, mbstring, rdkafka-3.1.2
extensions: json, mbstring, rdkafka

- name: Install dependencies
run: composer install --prefer-dist --no-progress --no-suggest
Expand Down
9 changes: 4 additions & 5 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,21 @@
"require": {
"php": "^7.2",
"ext-pcntl": "*",
"ext-rdkafka": "*",
"psr/log": "^1.1",
"symfony/polyfill-php73": "^1.11"
"ext-rdkafka": "^4",
"psr/log": "^1.1"
},
"require-dev": {
"doctrine/coding-standard": "^7.0",
"infection/infection": "^0.16.3",
"kwn/php-rdkafka-stubs": "^1.0",
"kwn/php-rdkafka-stubs": "^2.0",
"phpstan/extension-installer": "^1.0",
"phpstan/phpstan": "0.12.25",
"phpstan/phpstan-phpunit": "0.12.8",
"phpstan/phpstan-strict-rules": "0.12.2",
"phpunit/phpunit": "^9.1",
"psalm/plugin-phpunit": "^0.10.0",
"simpod/php-coveralls-mirror": "^3.0",
"vimeo/psalm": "dev-master#47cf69d as 3.11.4"
"vimeo/psalm": "^3.12"
},
"suggest": {
"kwn/php-rdkafka-stubs": "Support and autocompletion for RDKafka in IDE | require as dev dependency"
Expand Down
4 changes: 2 additions & 2 deletions src/Clients/Consumer/KafkaConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ function (RdKafkaConsumer $kafka, int $err, string $reason) : void {
}
);

$rebalanceCb =
$rebalanceCallback =
/** @param array<string, TopicPartition>|null $partitions */
function (RdKafkaConsumer $kafka, int $err, ?array $partitions = null) : void {
switch ($err) {
Expand Down Expand Up @@ -77,7 +77,7 @@ static function (TopicPartition $partition) : string {
$kafka->assign();
}
};
$config->getConf()->setRebalanceCb($rebalanceCb);
$config->getConf()->setRebalanceCb($rebalanceCallback);

parent::__construct($config->getConf());
}
Expand Down
64 changes: 56 additions & 8 deletions src/Clients/Producer/KafkaProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,78 @@

namespace SimPod\Kafka\Clients\Producer;

use InvalidArgumentException;
use RdKafka\Producer;
use RdKafka\ProducerTopic;
use RuntimeException;
use function sprintf;
use const RD_KAFKA_PARTITION_UA;
use const RD_KAFKA_RESP_ERR_NO_ERROR;

class KafkaProducer extends Producer
{
private const RD_KAFKA_MSG_F_COPY = 0;

public function __construct(ProducerConfig $config)
/** @var callable(KafkaProducer):void|null */
private $exitCallback;

/** @param callable(KafkaProducer):void|null $exitCallback */
public function __construct(ProducerConfig $config, ?callable $exitCallback = null)
{
$this->exitCallback = $exitCallback;

parent::__construct($config->getConf());
}

public function produce(ProducerRecord $record) : void
public function __destruct()
{
$topic = $this->newTopic($record->topic);
/** @psalm-suppress UndefinedMethod https://github.com/vimeo/psalm/issues/3406 */
$topic->produce($record->partition ?? RD_KAFKA_PARTITION_UA, self::RD_KAFKA_MSG_F_COPY, $record->value, $record->key);
if ($this->exitCallback === null) {
return;
}

($this->exitCallback)($this);
}

/** @param array<string, string>|null $headers */
public function produce(
string $topicName,
?int $partition,
string $value,
?string $key = null,
?array $headers = null,
?int $timestampMs = null
) : void {
if ($partition < 0) {
throw new InvalidArgumentException(
sprintf('Invalid partition: %d. Partition number should always be non-negative or null.', $partition)
);
}

/** @psalm-var ProducerTopic $topic Psalm thinks this is a Topic https://github.com/vimeo/psalm/issues/3406 */
$topic = $this->newTopic($topicName);
$topic->producev(
$partition ?? RD_KAFKA_PARTITION_UA,
self::RD_KAFKA_MSG_F_COPY,
$value,
$key,
$headers,
$timestampMs
);
$this->poll(0);
}

public function flush() : void
public function flushMessages(int $timeoutMs = 10000) : void
{
while ($this->getOutQLen() > 0) {
$this->poll(1);
$result = null;
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
$result = $this->flush($timeoutMs);
if ($result === RD_KAFKA_RESP_ERR_NO_ERROR) {
break;
}
}

if ($result !== RD_KAFKA_RESP_ERR_NO_ERROR) {
throw new RuntimeException('Was unable to flush, messages might be lost!');
}
}
}
35 changes: 0 additions & 35 deletions src/Clients/Producer/ProducerRecord.php

This file was deleted.

16 changes: 7 additions & 9 deletions tests/Clients/Consumer/TestProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

use SimPod\Kafka\Clients\Producer\KafkaProducer;
use SimPod\Kafka\Clients\Producer\ProducerConfig;
use SimPod\Kafka\Clients\Producer\ProducerRecord;
use function gethostname;

final class TestProducer
Expand All @@ -16,18 +15,17 @@ final class TestProducer

public function __construct()
{
$this->producer = new KafkaProducer($this->getConfig());
$this->producer = new KafkaProducer(
$this->getConfig(),
static function (KafkaProducer $producer) : void {
$producer->flushMessages(5000);
}
);
}

public function run(string $payload) : void
{
$record = new ProducerRecord(KafkaBatchConsumerTest::TOPIC, null, $payload);
$this->producer->produce($record);
}

public function flush() : void
{
$this->producer->flush();
$this->producer->produce(KafkaBatchConsumerTest::TOPIC, null, $payload);
}

private function getConfig() : ProducerConfig
Expand Down