Skip to content
This repository was archived by the owner on Jun 10, 2022. It is now read-only.
Closed
23 changes: 23 additions & 0 deletions src/Broker.php
Original file line number Diff line number Diff line change
Expand Up @@ -225,4 +225,27 @@ private function getSaslMechanismProvider() : SaslMechanism
}
return $provider;
}

/**
* @param array $data
* @param array $partNums
* @return int
*/
public function getPartitionId($data)
{
$topicInfos = $this->getTopics();
$topicMeta = $topicInfos[$data['topic']];
$partNums = array_keys($topicMeta);
if (isset($data['key']) && trim($data['key'])) {
$partId = $partNums[crc32($data['key']) % count($partNums)];
} else {
if (! isset($data['partId']) || ! isset($topicMeta[$data['partId']])) {
shuffle($partNums);
$partId = $partNums[0];
} else {
$partId = $data['partId'];
}
}
return $partId;
}
}
9 changes: 1 addition & 8 deletions src/Producer/Process.php
Original file line number Diff line number Diff line change
Expand Up @@ -293,14 +293,7 @@ protected function convertMessage($data)
}

$topicMeta = $topicInfos[$value['topic']];
$partNums = array_keys($topicMeta);
shuffle($partNums);
$partId = 0;
if (! isset($value['partId']) || ! isset($topicMeta[$value['partId']])) {
$partId = $partNums[0];
} else {
$partId = $value['partId'];
}
$partId = $broker->getPartitionId($value);

$brokerId = $topicMeta[$partId];
$topicData = [];
Expand Down
9 changes: 1 addition & 8 deletions src/Producer/SyncProcess.php
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,7 @@ protected function convertMessage($data)
}

$topicMeta = $topicInfos[$value['topic']];
$partNums = array_keys($topicMeta);
shuffle($partNums);
$partId = 0;
if (! isset($value['partId']) || ! isset($topicMeta[$value['partId']])) {
$partId = $partNums[0];
} else {
$partId = $value['partId'];
}
$partId = $broker->getPartitionId($value);

$brokerId = $topicMeta[$partId];
$topicData = [];
Expand Down
56 changes: 56 additions & 0 deletions tests/Base/BrokerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,60 @@ public function testGetSocketNotSetConfig()

$this->assertInstanceOf(\Kafka\SocketSync::class, $socket);
}

/**
* testGetPartitionId
*
* @access public
* @return void
*/
public function testGetPartitionId()
{
$broker = \Kafka\Broker::getInstance();
$data = [
'brokers' => [
[
'host' => '127.0.0.1',
'port' => '9092',
'nodeId' => '0',
],
[
'host' => '127.0.0.1',
'port' => '9192',
'nodeId' => '1',
],
[
'host' => '127.0.0.1',
'port' => '9292',
'nodeId' => '2',
],
],
'topics' => [
[
'topicName' => 'test',
'errorCode' => 0,
'partitions' => [
[
'partitionId' => 0,
'errorCode' => 0,
'leader' => 0,
],
[
'partitionId' => 1,
'errorCode' => 0,
'leader' => 2,
],
],
],
],
];
$broker->setData($data['topics'], $data['brokers']);
$data = [
'partId' => '1',
'topic' => 'test',
'value' => 'test message'
];
$partId = $broker->getPartitionId($data);
$this->assertEquals('1', $partId);
}
}