diff --git a/src/Broker.php b/src/Broker.php index c4f909d9..881deee0 100644 --- a/src/Broker.php +++ b/src/Broker.php @@ -225,4 +225,27 @@ private function getSaslMechanismProvider() : SaslMechanism } return $provider; } + + /** + * @param array $data + * @param array $partNums + * @return int + */ + public function getPartitionId($data) + { + $topicInfos = $this->getTopics(); + $topicMeta = $topicInfos[$data['topic']]; + $partNums = array_keys($topicMeta); + if (isset($data['key']) && trim($data['key'])) { + $partId = $partNums[crc32($data['key']) % count($partNums)]; + } else { + if (! isset($data['partId']) || ! isset($topicMeta[$data['partId']])) { + shuffle($partNums); + $partId = $partNums[0]; + } else { + $partId = $data['partId']; + } + } + return $partId; + } } diff --git a/src/Producer/Process.php b/src/Producer/Process.php index 1a8247ff..e121a2ce 100644 --- a/src/Producer/Process.php +++ b/src/Producer/Process.php @@ -293,14 +293,7 @@ protected function convertMessage($data) } $topicMeta = $topicInfos[$value['topic']]; - $partNums = array_keys($topicMeta); - shuffle($partNums); - $partId = 0; - if (! isset($value['partId']) || ! isset($topicMeta[$value['partId']])) { - $partId = $partNums[0]; - } else { - $partId = $value['partId']; - } + $partId = $broker->getPartitionId($value); $brokerId = $topicMeta[$partId]; $topicData = []; diff --git a/src/Producer/SyncProcess.php b/src/Producer/SyncProcess.php index d09096f9..c2a51cf3 100644 --- a/src/Producer/SyncProcess.php +++ b/src/Producer/SyncProcess.php @@ -133,14 +133,7 @@ protected function convertMessage($data) } $topicMeta = $topicInfos[$value['topic']]; - $partNums = array_keys($topicMeta); - shuffle($partNums); - $partId = 0; - if (! isset($value['partId']) || ! isset($topicMeta[$value['partId']])) { - $partId = $partNums[0]; - } else { - $partId = $value['partId']; - } + $partId = $broker->getPartitionId($value); $brokerId = $topicMeta[$partId]; $topicData = []; diff --git a/tests/Base/BrokerTest.php b/tests/Base/BrokerTest.php index 46e21a4b..4f6d0b7d 100644 --- a/tests/Base/BrokerTest.php +++ b/tests/Base/BrokerTest.php @@ -174,4 +174,60 @@ public function testGetSocketNotSetConfig() $this->assertInstanceOf(\Kafka\SocketSync::class, $socket); } + + /** + * testGetPartitionId + * + * @access public + * @return void + */ + public function testGetPartitionId() + { + $broker = \Kafka\Broker::getInstance(); + $data = [ + 'brokers' => [ + [ + 'host' => '127.0.0.1', + 'port' => '9092', + 'nodeId' => '0', + ], + [ + 'host' => '127.0.0.1', + 'port' => '9192', + 'nodeId' => '1', + ], + [ + 'host' => '127.0.0.1', + 'port' => '9292', + 'nodeId' => '2', + ], + ], + 'topics' => [ + [ + 'topicName' => 'test', + 'errorCode' => 0, + 'partitions' => [ + [ + 'partitionId' => 0, + 'errorCode' => 0, + 'leader' => 0, + ], + [ + 'partitionId' => 1, + 'errorCode' => 0, + 'leader' => 2, + ], + ], + ], + ], + ]; + $broker->setData($data['topics'], $data['brokers']); + $data = [ + 'partId' => '1', + 'topic' => 'test', + 'value' => 'test message' + ]; + $partId = $broker->getPartitionId($data); + $this->assertEquals('1', $partId); + } }