From e11308c66a76b56070cf950f625ce6b37d378eb2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20S=C3=B8by=20Andersen?= Date: Wed, 12 Jun 2019 09:57:51 +0200 Subject: [PATCH 01/10] Added config for auto create topic, allowing produce to topics that does not exist --- src/Config.php | 9 +++++++++ src/Producer/RecordValidator.php | 5 ++++- src/Producer/SyncProcess.php | 29 +++++++++++++++++++++++++++-- 3 files changed, 40 insertions(+), 3 deletions(-) diff --git a/src/Config.php b/src/Config.php index bb3827e2..7fbd5cf3 100644 --- a/src/Config.php +++ b/src/Config.php @@ -84,6 +84,7 @@ abstract class Config 'metadataRequestTimeoutMs' => 60000, 'metadataRefreshIntervalMs' => 300000, 'metadataMaxAgeMs' => -1, + 'autoCreateTopicsEnable' => true, 'securityProtocol' => self::SECURITY_PROTOCOL_PLAINTEXT, 'sslEnable' => false, // this config item will override, don't config it. 'sslLocalCert' => '', @@ -241,6 +242,14 @@ public function setMetadataMaxAgeMs(int $metadataMaxAgeMs): void static::$options['metadataMaxAgeMs'] = $metadataMaxAgeMs; } + /** + * @throws Exception\Config + */ + public function setAutoCreateTopicsEnable(bool $flag = true): void + { + static::$options['autoCreateTopicsEnable'] = $flag; + } + /** * @throws Exception\Config */ diff --git a/src/Producer/RecordValidator.php b/src/Producer/RecordValidator.php index 22e07cee..3ef66f87 100644 --- a/src/Producer/RecordValidator.php +++ b/src/Producer/RecordValidator.php @@ -4,6 +4,7 @@ namespace Kafka\Producer; use Kafka\Exception; +use Kafka\ProducerConfig; use function is_string; use function trim; @@ -30,7 +31,9 @@ public function validate(array $record, array $topicList): void throw Exception\InvalidRecordInSet::missingTopic(); } - if (! isset($topicList[$record['topic']])) { + /** @var ProducerConfig $config */ + $config = ProducerConfig::getInstance(); + if (! isset($topicList[$record['topic']]) && ! $config->getAutoCreateTopicsEnable() && false) { throw Exception\InvalidRecordInSet::nonExististingTopic($record['topic']); } diff --git a/src/Producer/SyncProcess.php b/src/Producer/SyncProcess.php index 71b1cb06..d6e93edd 100644 --- a/src/Producer/SyncProcess.php +++ b/src/Producer/SyncProcess.php @@ -115,7 +115,7 @@ public function syncMeta(): void } shuffle($brokerHost); - $broker = $this->getBroker(); + $broker = $this->getBroker(); foreach ($brokerHost as $host) { $socket = $broker->getMetaConnect($host, true); @@ -160,9 +160,10 @@ protected function convertRecordSet(array $recordSet): array foreach ($recordSet as $record) { $this->recordValidator->validate($record, $topics); - $topicMeta = $topics[$record['topic']]; + $topicMeta = $this->getTopicMeta($record['topic']); $partNums = array_keys($topicMeta); shuffle($partNums); + $partNums = [0]; $partId = isset($record['partId'], $topicMeta[$record['partId']]) ? $record['partId'] : $partNums[0]; @@ -193,6 +194,30 @@ protected function convertRecordSet(array $recordSet): array return $sendData; } + /** + * Get the topic meta. If auto create is on, get a random broker instead of + * a random broker that the topic is on. + * + * @param string $topic + * + * @return array + */ + protected function getTopicMeta($topic) + { + $topics = $this->getBroker()->getTopics(); + + if (isset($topics[$topic])) { + return $topics[$topic]; + } + + // Here we can safely assume that auto create topics are set to true. If + // not, and the topic does not exist, the validate of the record would + // have failed. + + // Default for auto creation. + return [0 => 0]; + } + private function getBroker(): Broker { return Broker::getInstance(); From 0e3682a250dc346ce3f10e2e0afe0d86c6c5753f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20S=C3=B8by=20Andersen?= Date: Wed, 12 Jun 2019 10:35:49 +0200 Subject: [PATCH 02/10] Changes to make SSL and timeout configurable correctly --- src/CommonSocket.php | 76 ++++++++++---------------------------------- src/Config.php | 45 ++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 60 deletions(-) diff --git a/src/CommonSocket.php b/src/CommonSocket.php index 503add43..0772466f 100644 --- a/src/CommonSocket.php +++ b/src/CommonSocket.php @@ -29,34 +29,6 @@ abstract class CommonSocket */ public const MAX_WRITE_BUFFER = 2048; - /** - * Send timeout in seconds. - * - * @var int - */ - protected $sendTimeoutSec = 0; - - /** - * Send timeout in microseconds. - * - * @var int - */ - protected $sendTimeoutUsec = 100000; - - /** - * Recv timeout in seconds - * - * @var int - */ - protected $recvTimeoutSec = 0; - - /** - * Recv timeout in microseconds - * - * @var int - */ - protected $recvTimeoutUsec = 750000; - /** * @var resource */ @@ -95,26 +67,6 @@ public function __construct(string $host, int $port, ?Config $config = null, ?Sa $this->saslProvider = $saslProvider; } - public function setSendTimeoutSec(int $sendTimeoutSec): void - { - $this->sendTimeoutSec = $sendTimeoutSec; - } - - public function setSendTimeoutUsec(int $sendTimeoutUsec): void - { - $this->sendTimeoutUsec = $sendTimeoutUsec; - } - - public function setRecvTimeoutSec(int $recvTimeoutSec): void - { - $this->recvTimeoutSec = $recvTimeoutSec; - } - - public function setRecvTimeoutUsec(int $recvTimeoutUsec): void - { - $this->recvTimeoutUsec = $recvTimeoutUsec; - } - public function setMaxWriteAttempts(int $number): void { $this->maxWriteAttempts = $number; @@ -139,16 +91,20 @@ protected function createStream(): void if ($this->config !== null && $this->config->getSslEnable()) { // ssl connection $remoteSocket = sprintf('ssl://%s:%s', $this->host, $this->port); + $filterFunction = function ($elem) { + return $elem !== ''; + }; $context = stream_context_create( [ - 'ssl' => [ - 'local_cert' => $this->config->getSslLocalCert(), - 'local_pk' => $this->config->getSslLocalPk(), - 'verify_peer' => $this->config->getSslVerifyPeer(), - 'passphrase' => $this->config->getSslPassphrase(), - 'cafile' => $this->config->getSslCafile(), - 'peer_name' => $this->config->getSslPeerName(), - ], + 'ssl' => array_filter([ + 'local_cert' => $this->config->getSslLocalCert(), + 'local_pk' => $this->config->getSslLocalPk(), + 'verify_peer' => true, + 'verify_peer_name' => false, + 'passphrase' => $this->config->getSslPassphrase(), + 'cafile' => $this->config->getSslCafile(), + 'peer_name' => $this->config->getSslPeerName(), + ], $filterFunction), ] ); } @@ -184,7 +140,7 @@ protected function createSocket(string $remoteSocket, $context, ?int &$errno, ?s $remoteSocket, $errno, $errstr, - $this->sendTimeoutSec + ($this->sendTimeoutUsec / 1000000), + $this->config->getSendTimeoutSec() + ($this->config->getSendTimeoutUsec() / 1000000), STREAM_CLIENT_CONNECT, $context ); @@ -248,7 +204,7 @@ public function readBlocking(int $length): string throw Exception\Socket::invalidLength($length, self::READ_MAX_LENGTH); } - $readable = $this->select([$this->stream], $this->recvTimeoutSec, $this->recvTimeoutUsec); + $readable = $this->select([$this->stream], $this->config->getRecvTimeoutSec(), $this->config->getRecvTimeoutUsec()); if ($readable === false) { $this->close(); @@ -279,7 +235,7 @@ public function readBlocking(int $length): string throw Exception\Socket::unexpectedEOF($length); } // Otherwise wait for bytes - $readable = $this->select([$this->stream], $this->recvTimeoutSec, $this->recvTimeoutUsec); + $readable = $this->select([$this->stream], $this->config->getRecvTimeoutSec(), $this->config->getRecvTimeoutUsec()); if ($readable !== 1) { throw Exception\Socket::timedOutWithRemainingBytes($length, $remainingBytes); } @@ -310,7 +266,7 @@ public function writeBlocking(string $buffer): int while ($bytesWritten < $bytesToWrite) { // wait for stream to become available for writing - $writable = $this->select([$this->stream], $this->sendTimeoutSec, $this->sendTimeoutUsec, false); + $writable = $this->select([$this->stream], $this->config->getSendTimeoutSec(), $this->config->getSendTimeoutUsec(), false); if ($writable === false) { throw new Exception\Socket('Could not write ' . $bytesToWrite . ' bytes to stream'); diff --git a/src/Config.php b/src/Config.php index 7fbd5cf3..fa31902b 100644 --- a/src/Config.php +++ b/src/Config.php @@ -90,6 +90,7 @@ abstract class Config 'sslLocalCert' => '', 'sslLocalPk' => '', 'sslVerifyPeer' => false, + 'sslVerifyPeerName' => true, 'sslPassphrase' => '', 'sslCafile' => '', 'sslPeerName' => '', @@ -98,6 +99,10 @@ abstract class Config 'saslPassword' => '', 'saslKeytab' => '', 'saslPrincipal' => '', + 'sendTimeoutSec' => 0, + 'sendTimeoutUsec' => 100000, + 'recvTimeoutSec' => 0, + 'recvTimeoutUsec' => 750000, ]; /** @@ -321,4 +326,44 @@ public function setSaslMechanism(string $mechanism): void static::$options['saslMechanism'] = $mechanism; } + + /** + * @throws Exception\Config + */ + public function setSendTimeoutSec(int $timeout): void + { + static::$options['sendTimeoutSec'] = $timeout; + } + + /** + * @throws Exception\Config + */ + public function setSendTimeoutUsec(int $timeout): void + { + static::$options['recvTimeoutUsec'] = $timeout; + } + + /** + * @throws Exception\Config + */ + public function setRecvTimeoutSec(int $timeout): void + { + static::$options['recvTimeoutSec'] = $timeout; + } + + /** + * @throws Exception\Config + */ + public function setRecvTimeoutUsec(int $timeout): void + { + static::$options['recvTimeoutUsec'] = $timeout; + } + + /** + * @throws Exception\Config + */ + public function setSslVerifyPeerName(bool $verify): void + { + static::$options['sslVerifyPeerName'] = $verify; + } } From c743dfdbeda1de84822f6ae282fc3006c9f42a4c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20S=C3=B8by=20Andersen?= Date: Wed, 12 Jun 2019 13:05:03 +0200 Subject: [PATCH 03/10] Remove debug --- src/Producer/RecordValidator.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Producer/RecordValidator.php b/src/Producer/RecordValidator.php index 3ef66f87..f2063843 100644 --- a/src/Producer/RecordValidator.php +++ b/src/Producer/RecordValidator.php @@ -33,7 +33,7 @@ public function validate(array $record, array $topicList): void /** @var ProducerConfig $config */ $config = ProducerConfig::getInstance(); - if (! isset($topicList[$record['topic']]) && ! $config->getAutoCreateTopicsEnable() && false) { + if (! isset($topicList[$record['topic']]) && ! $config->getAutoCreateTopicsEnable()) { throw Exception\InvalidRecordInSet::nonExististingTopic($record['topic']); } From c86c6e1b51a6b5d5a1fc6f2bcf7a791ba4fb4c2b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20S=C3=B8by=20Andersen?= Date: Wed, 12 Jun 2019 13:09:15 +0200 Subject: [PATCH 04/10] Remove more debug --- src/Producer/SyncProcess.php | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Producer/SyncProcess.php b/src/Producer/SyncProcess.php index d6e93edd..647a6a19 100644 --- a/src/Producer/SyncProcess.php +++ b/src/Producer/SyncProcess.php @@ -163,7 +163,6 @@ protected function convertRecordSet(array $recordSet): array $topicMeta = $this->getTopicMeta($record['topic']); $partNums = array_keys($topicMeta); shuffle($partNums); - $partNums = [0]; $partId = isset($record['partId'], $topicMeta[$record['partId']]) ? $record['partId'] : $partNums[0]; From 341f8d13dd477771594eff8f77ccc9d19af38686 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20S=C3=B8by=20Andersen?= Date: Wed, 12 Jun 2019 13:24:58 +0200 Subject: [PATCH 05/10] Change how timeouts is set in tests --- tests/Base/SocketTest.php | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/tests/Base/SocketTest.php b/tests/Base/SocketTest.php index 831df3b4..6ef0a312 100644 --- a/tests/Base/SocketTest.php +++ b/tests/Base/SocketTest.php @@ -260,9 +260,11 @@ public function testRecvTimeout(): void $streamMock->method('eof')->willReturn(false); $streamMock->method('read')->willReturn('xxxx'); - $socket = $this->mockStreamSocketClient($host, $port, null, null, ['select']); - $socket->setRecvTimeoutSec(3000); - $socket->setRecvTimeoutUsec(30001); + $config = $this->getMockForAbstractClass(Config::class); + $config->setRecvTimeoutSec(30000); + $config->setRecvTimeoutUsec(30001); + + $socket = $this->mockStreamSocketClient($host, $port, $config, null, ['select']); $socket->method('select') ->with($this->isType('array'), 3000, 30001, true) @@ -367,9 +369,11 @@ public function testSendTimeout(): void $streamMock->method('eof')->willReturn(false); $streamMock->method('write')->willReturn(4); - $socket = $this->mockStreamSocketClient($host, $port, null, null, ['select']); - $socket->setSendTimeoutSec(3000); - $socket->setSendTimeoutUsec(30001); + $config = $this->getMockForAbstractClass(Config::class); + $config->setSendTimeoutSec(30000); + $config->setSendTimeoutUsec(30001); + + $socket = $this->mockStreamSocketClient($host, $port, $config, null, ['select']); $socket->method('select') ->with($this->isType('array'), 3000, 30001, false) From 830d56367a21351715d2a30b4f7b3b6339ad5e9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20S=C3=B8by=20Andersen?= Date: Wed, 12 Jun 2019 13:29:03 +0200 Subject: [PATCH 06/10] Make sure config is always set --- tests/Base/SocketTest.php | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/Base/SocketTest.php b/tests/Base/SocketTest.php index 6ef0a312..05a87a5b 100644 --- a/tests/Base/SocketTest.php +++ b/tests/Base/SocketTest.php @@ -396,6 +396,10 @@ private function mockStreamSocketClient( ?SaslMechanism $sasl = null, array $mockMethod = [] ): Socket { + if ($config === null) { + $config = $this->getMockForAbstractClass(Config::class); + } + $socket = $this->getMockBuilder(Socket::class) ->setMethods(array_merge(['createSocket'], $mockMethod)) ->setConstructorArgs([$host, $port, $config, $sasl]) From a8ea164d93ddfcfdcdfb7b5fcf811be8f2e4b43a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20S=C3=B8by=20Andersen?= Date: Wed, 12 Jun 2019 13:33:45 +0200 Subject: [PATCH 07/10] Set auto create topics false in test that tests just that --- tests/Base/Producer/RecordValidatorTest.php | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/Base/Producer/RecordValidatorTest.php b/tests/Base/Producer/RecordValidatorTest.php index 7572213a..0e5e9be1 100644 --- a/tests/Base/Producer/RecordValidatorTest.php +++ b/tests/Base/Producer/RecordValidatorTest.php @@ -6,6 +6,7 @@ use Kafka\Exception\InvalidRecordInSet; use Kafka\Producer\RecordValidator; use PHPUnit\Framework\TestCase; +use Kafka\ProducerConfig; final class RecordValidatorTest extends TestCase { @@ -34,6 +35,9 @@ public function testValidRecordDoesNotThrowException(): void */ public function testInvalidRecordThrowsException(string $expectedExceptionMessage, array $record): void { + $config = ProducerConfig::getInstance(); + $config->setAutoCreateTopicsEnable(false); + $this->expectException(InvalidRecordInSet::class); $this->expectExceptionMessage($expectedExceptionMessage); From 0d4d775f26323625e903f39cb017b39ee62ee482 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20S=C3=B8by=20Andersen?= Date: Wed, 12 Jun 2019 13:39:34 +0200 Subject: [PATCH 08/10] Added verifypeername --- tests/Base/SocketTest.php | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/tests/Base/SocketTest.php b/tests/Base/SocketTest.php index 05a87a5b..2f22ef2b 100644 --- a/tests/Base/SocketTest.php +++ b/tests/Base/SocketTest.php @@ -94,24 +94,26 @@ public function testCreateStreamFailure(): void public function testCreateStreamSsl(): void { - $host = '127.0.0.1'; - $port = 9192; - $localCert = $this->root->url() . '/localCert'; - $localKey = $this->root->url() . '/localKey'; - $verifyPeer = false; - $passphrase = '123456'; - $cafile = $this->root->url() . '/cafile'; - $peerName = 'kafka'; + $host = '127.0.0.1'; + $port = 9192; + $localCert = $this->root->url() . '/localCert'; + $localKey = $this->root->url() . '/localKey'; + $verifyPeer = false; + $passphrase = '123456'; + $cafile = $this->root->url() . '/cafile'; + $peerName = 'kafka'; + $verifyPeerName = true; $context = stream_context_create( [ 'ssl' => [ - 'local_cert' => $localCert, - 'local_pk' => $localKey, - 'verify_peer' => $verifyPeer, - 'passphrase' => $passphrase, - 'cafile' => $cafile, - 'peer_name' => $peerName, + 'local_cert' => $localCert, + 'local_pk' => $localKey, + 'verify_peer' => $verifyPeer, + 'passphrase' => $passphrase, + 'cafile' => $cafile, + 'peer_name' => $peerName, + 'verify_peer_name' => $verifyPeerName, ], ] ); @@ -130,6 +132,7 @@ public function testCreateStreamSsl(): void $config->setSslPassphrase($passphrase); $config->setSslVerifyPeer($verifyPeer); $config->setSslPeerName($peerName); + $config->setSslVerifyPeerName($verifyPeerName); $sasl = $this->createMock(SaslMechanism::class); $sasl->expects($this->once()) From ea5f6c2303b08736387e0e82289cf29b41aa78f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20S=C3=B8by=20Andersen?= Date: Wed, 12 Jun 2019 13:44:19 +0200 Subject: [PATCH 09/10] Remove methods that had no validation. Seems to be the way to do it --- src/Config.php | 48 ------------------------------------------------ 1 file changed, 48 deletions(-) diff --git a/src/Config.php b/src/Config.php index fa31902b..4c38bfc5 100644 --- a/src/Config.php +++ b/src/Config.php @@ -247,14 +247,6 @@ public function setMetadataMaxAgeMs(int $metadataMaxAgeMs): void static::$options['metadataMaxAgeMs'] = $metadataMaxAgeMs; } - /** - * @throws Exception\Config - */ - public function setAutoCreateTopicsEnable(bool $flag = true): void - { - static::$options['autoCreateTopicsEnable'] = $flag; - } - /** * @throws Exception\Config */ @@ -326,44 +318,4 @@ public function setSaslMechanism(string $mechanism): void static::$options['saslMechanism'] = $mechanism; } - - /** - * @throws Exception\Config - */ - public function setSendTimeoutSec(int $timeout): void - { - static::$options['sendTimeoutSec'] = $timeout; - } - - /** - * @throws Exception\Config - */ - public function setSendTimeoutUsec(int $timeout): void - { - static::$options['recvTimeoutUsec'] = $timeout; - } - - /** - * @throws Exception\Config - */ - public function setRecvTimeoutSec(int $timeout): void - { - static::$options['recvTimeoutSec'] = $timeout; - } - - /** - * @throws Exception\Config - */ - public function setRecvTimeoutUsec(int $timeout): void - { - static::$options['recvTimeoutUsec'] = $timeout; - } - - /** - * @throws Exception\Config - */ - public function setSslVerifyPeerName(bool $verify): void - { - static::$options['sslVerifyPeerName'] = $verify; - } } From cde8e34fc1a5d71b97b85a87876962347becdd1d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20S=C3=B8by=20Andersen?= Date: Tue, 25 Jun 2019 09:25:21 +0200 Subject: [PATCH 10/10] Use init function, otherwise logger was never set --- src/Producer/SyncProcess.php | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/Producer/SyncProcess.php b/src/Producer/SyncProcess.php index 647a6a19..9a007328 100644 --- a/src/Producer/SyncProcess.php +++ b/src/Producer/SyncProcess.php @@ -28,6 +28,10 @@ class SyncProcess public function __construct(?RecordValidator $recordValidator = null) { $this->recordValidator = $recordValidator ?? new RecordValidator(); + } + + public function init(): void + { $config = $this->getConfig(); \Kafka\Protocol::init($config->getBrokerVersion(), $this->logger); @@ -47,6 +51,7 @@ public function __construct(?RecordValidator $recordValidator = null) */ public function send(array $recordSet): array { + $this->init(); $broker = $this->getBroker(); $config = $this->getConfig();