/
TestUtil.php
107 lines (92 loc) · 3.61 KB
/
TestUtil.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
<?php
declare(strict_types=1);
namespace longlang\phpkafka\Test;
use longlang\phpkafka\Client\ClientInterface;
use longlang\phpkafka\Client\SyncClient;
use longlang\phpkafka\Config\CommonConfig;
use longlang\phpkafka\Protocol\Metadata\MetadataRequest;
use longlang\phpkafka\Protocol\Metadata\MetadataRequestTopic;
use longlang\phpkafka\Protocol\Metadata\MetadataResponse;
class TestUtil
{
private function __construct()
{
}
public static function getHost(): string
{
return getenv('KAFKA_HOST') ?: '127.0.0.1';
}
public static function getPort(): int
{
return (int) (getenv('KAFKA_PORT') ?: 9092);
}
public static function getSasl(): array
{
$result = getenv('KAFKA_SASL') ?: '{}';
return json_decode($result, true);
}
public static function createKafkaClient(string $class = null): ClientInterface
{
$config = new CommonConfig();
$config->setSendTimeout(10);
$config->setRecvTimeout(10);
$config->setSasl(self::getSasl());
if (null === $class) {
$class = getenv('KAFKA_CLIENT_CLASS') ?: SyncClient::class;
}
return new $class(self::getHost(), self::getPort(), $config);
}
public static function getControllerClient(): ClientInterface
{
$client = self::createKafkaClient();
$client->connect();
$request = new MetadataRequest();
/** @var MetadataResponse $response */
$response = $client->sendRecv($request);
$client->close();
$config = new CommonConfig();
$config->setSendTimeout(10);
$config->setRecvTimeout(10);
$config->setSasl(self::getSasl());
$class = getenv('KAFKA_CLIENT_CLASS') ?: SyncClient::class;
$nodeId = $response->getControllerId();
foreach ($response->getBrokers() as $broker) {
if ($broker->getNodeId() === $nodeId) {
return new $class($broker->getHost(), $broker->getPort(), $config);
}
}
throw new \RuntimeException('getControllerClient failed');
}
public static function getLeaderBrokerClient(string $topic, int $partition): ClientInterface
{
$client = self::createKafkaClient();
$client->connect();
$request = new MetadataRequest();
$topicsArray = [];
$topicsArray[] = (new MetadataRequestTopic())->setName($topic);
$request->setTopics($topicsArray);
/** @var MetadataResponse $response */
$response = $client->sendRecv($request);
$client->close();
foreach ($response->getTopics() as $topicItem) {
if ($topicItem->getName() === $topic) {
foreach ($topicItem->getPartitions() as $partitionItem) {
if ($partitionItem->getPartitionIndex() === $partition) {
$config = new CommonConfig();
$config->setSendTimeout(10);
$config->setRecvTimeout(10);
$config->setSasl(self::getSasl());
$class = getenv('KAFKA_CLIENT_CLASS') ?: SyncClient::class;
$nodeId = $partitionItem->getLeaderId();
foreach ($response->getBrokers() as $broker) {
if ($broker->getNodeId() === $nodeId) {
return new $class($broker->getHost(), $broker->getPort(), $config);
}
}
}
}
}
}
throw new \RuntimeException(sprintf('getLeaderBrokerClient %s-%s failed', $topic, $partition));
}
}