Skip to content
This repository has been archived by the owner on Jun 10, 2022. It is now read-only.

Commit

Permalink
Implement functional tests
Browse files Browse the repository at this point in the history
Running the basic sync/async producer and consumer against the
supported Kafka servers (+0.9 because of the consumers).
  • Loading branch information
lcobucci committed Dec 10, 2017
1 parent 43728a7 commit 5f81328
Show file tree
Hide file tree
Showing 6 changed files with 292 additions and 1 deletion.
71 changes: 70 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ cache:

before_install:
- mv ~/.phpenv/versions/$(phpenv version-name)/etc/conf.d/xdebug.ini{,.disabled} || echo "xdebug not available"
- composer self-update

install: travis_retry composer install --no-interaction

Expand Down Expand Up @@ -53,6 +52,76 @@ jobs:
- if [[ ! $(php -m | grep -si xdebug) ]]; then echo "xdebug required for coverage"; exit 1; fi
- ./vendor/bin/infection --min-msi=41 --min-covered-msi=77 --threads=4

- stage: Functional tests
services:
- docker
addons:
apt:
packages:
- docker-ce
env: KAFKA_VERSION=0.9.0.1
before_install:
- docker-compose up -d kafka-$KAFKA_VERSION
- mv ~/.phpenv/versions/$(phpenv version-name)/etc/conf.d/xdebug.ini{,.disabled} || echo "xdebug not available"
script:
- sleep 10 # add some arbitrary time to ensure that kafka is running fine
- docker-compose exec kafka-$KAFKA_VERSION /opt/kafka_2.11-0.9.0.1/bin/kafka-topics.sh --zookeeper zookeeper:2181 --create --partitions 3 --replication-factor 1 --topic test
- docker-compose run -e KAFKA_BROKERS=kafka-$KAFKA_VERSION:9092 -e KAFKA_VERSION -e KAFKA_TOPIC=test test-runner

- stage: Functional tests
services:
- docker
addons:
apt:
packages:
- docker-ce
env: KAFKA_VERSION=0.10.2.1
before_install:
- docker-compose up -d kafka-$KAFKA_VERSION
- mv ~/.phpenv/versions/$(phpenv version-name)/etc/conf.d/xdebug.ini{,.disabled} || echo "xdebug not available"
script:
- sleep 10 # add some arbitrary time to ensure that kafka is running fine
- docker-compose exec kafka-$KAFKA_VERSION /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper:2181 --create --partitions 3 --replication-factor 1 --topic test
- docker-compose run -e KAFKA_BROKERS=kafka-$KAFKA_VERSION:9092 -e KAFKA_VERSION -e KAFKA_TOPIC=test test-runner
after_script:
- docker-compose down

- stage: Functional tests
services:
- docker
addons:
apt:
packages:
- docker-ce
env: KAFKA_VERSION=0.11.0.1
before_install:
- docker-compose up -d kafka-$KAFKA_VERSION
- mv ~/.phpenv/versions/$(phpenv version-name)/etc/conf.d/xdebug.ini{,.disabled} || echo "xdebug not available"
script:
- sleep 10 # add some arbitrary time to ensure that kafka is running fine
- docker-compose exec kafka-$KAFKA_VERSION /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper:2181 --create --partitions 3 --replication-factor 1 --topic test
- docker-compose run -e KAFKA_BROKERS=kafka-$KAFKA_VERSION:9092 -e KAFKA_VERSION -e KAFKA_TOPIC=test test-runner
after_script:
- docker-compose down

- stage: Functional tests
services:
- docker
addons:
apt:
packages:
- docker-ce
env: KAFKA_VERSION=1.0.0
before_install:
- docker-compose up -d kafka-$KAFKA_VERSION
- mv ~/.phpenv/versions/$(phpenv version-name)/etc/conf.d/xdebug.ini{,.disabled} || echo "xdebug not available"
script:
- sleep 10 # add some arbitrary time to ensure that kafka is running fine
- docker-compose exec kafka-$KAFKA_VERSION /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper:2181 --create --partitions 3 --replication-factor 1 --topic test
- docker-compose run -e KAFKA_BROKERS=kafka-$KAFKA_VERSION:9092 -e KAFKA_VERSION -e KAFKA_TOPIC=test test-runner
after_script:
- docker-compose down

notifications:
email:
- swansoft_team@groups.163.com
5 changes: 5 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
FROM php:7.1-alpine

WORKDIR /opt/kafka-php

CMD ["./vendor/bin/phpunit", "--testsuite", "functional"]
52 changes: 52 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
version: '3.2'

services:
zookeeper:
image: zookeeper:3.4

kafka-0.9.0.1:
image: wurstmeister/kafka:0.9.0.1
depends_on:
- zookeeper
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_HOST_NAME: kafka-0.9.0.1
KAFKA_ADVERTISED_PORT: 9092
ports:
- "9092"

kafka-0.10.2.1:
image: wurstmeister/kafka:0.10.2.1
depends_on:
- zookeeper
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_HOST_NAME: kafka-0.10.2.1
KAFKA_ADVERTISED_PORT: 9092
ports:
- "9092"

kafka-0.11.0.1:
image: wurstmeister/kafka:0.11.0.1
depends_on:
- zookeeper
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_HOST_NAME: kafka-0.11.0.1
ports:
- "9092"

kafka-1.0.0:
image: wurstmeister/kafka:1.0.0
depends_on:
- zookeeper
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_HOST_NAME: kafka-1.0.0
ports:
- "9092"

test-runner:
build: .
volumes:
- .:/opt/kafka-php
40 changes: 40 additions & 0 deletions tests/Functional/AsyncProducerTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<?php
declare(strict_types=1);

namespace KafkaTest\Functional;

use Kafka\Protocol;

final class AsyncProducerTest extends ProducerTest
{
/**
* @test
*
* @runInSeparateProcess
*/
public function sendAsyncMessages(): void
{
$this->configureProducer();

$messagesSent = false;
$error = null;

$producer = new \Kafka\Producer([$this, 'createMessages']);
$producer->success(
function () use (&$messagesSent): void {
$messagesSent = true;
}
);
$producer->error(
function (int $errorCode) use (&$error): void {
$error = $errorCode;
}
);
$producer->send();

self::assertTrue(
$messagesSent,
'It was not possible to send the messages, reason: ' . Protocol::getError($error)
);
}
}
99 changes: 99 additions & 0 deletions tests/Functional/ProducerTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
<?php
declare(strict_types=1);

namespace KafkaTest\Functional;

use Kafka\Consumer\StopStrategy\Callback;

abstract class ProducerTest extends \PHPUnit\Framework\TestCase
{
private const MESSAGES_TO_SEND = 30;

/**
* @var string
*/
private $version;

/**
* @var string
*/
private $brokers;

/**
* @var string
*/
private $topic;

/**
* @before
*/
public function prepareEnvironment(): void
{
$this->version = getenv('KAFKA_VERSION');
$this->brokers = getenv('KAFKA_BROKERS');
$this->topic = getenv('KAFKA_TOPIC');

if (! $this->version || ! $this->brokers || ! $this->topic) {
self::markTestSkipped(
'Environment variables "KAFKA_VERSION", "KAFKA_TOPIC", and "KAFKA_BROKERS" must be provided'
);
}
}

protected function configureProducer(): void
{
$config = \Kafka\ProducerConfig::getInstance();
$config->setMetadataBrokerList($this->brokers);
$config->setBrokerVersion($this->version);
}

/**
* @test
*
* @runInSeparateProcess
*/
public function consumeProducedMessages(): void
{
$this->configureConsumer();

$consumedMessages = 0;
$executionEnd = new \DateTimeImmutable('+1 minute');

$consumer = new \Kafka\Consumer(
new Callback(
function () use (&$consumedMessages, $executionEnd): bool {
return $consumedMessages >= self::MESSAGES_TO_SEND || new \DateTimeImmutable() > $executionEnd;
}
)
);

$consumer->start(
function () use (&$consumedMessages) {
++$consumedMessages;
}
);

self::assertSame(self::MESSAGES_TO_SEND, $consumedMessages);
}

private function configureConsumer(): void
{
$config = \Kafka\ConsumerConfig::getInstance();
$config->setMetadataBrokerList($this->brokers);
$config->setBrokerVersion($this->version);
$config->setGroupId('kafka-php-tests');
$config->setOffsetReset('earliest');
$config->setTopics([$this->topic]);
}

public function createMessages(int $amount = self::MESSAGES_TO_SEND): array
{
$messages = [];

for ($i = 0; $i < $amount; ++$i) {
$messages[] = ['topic' => $this->topic, 'value' => 'msg-' . $i];
}

return $messages;
}
}
26 changes: 26 additions & 0 deletions tests/Functional/SyncProducerTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php
declare(strict_types=1);

namespace KafkaTest\Functional;

final class SyncProducerTest extends ProducerTest
{
/**
* @test
*
* @runInSeparateProcess
*/
public function sendSyncMessages(): void
{
$this->configureProducer();

$producer = new \Kafka\Producer();
$messages = $this->createMessages();

foreach ($messages as $message) {
$result = $producer->send([$message]);

self::assertNotEmpty($result);
}
}
}

0 comments on commit 5f81328

Please sign in to comment.