Skip to content

Commit

Permalink
Allow to use custom options for producers
Browse files Browse the repository at this point in the history
  • Loading branch information
mateusjunges committed Nov 5, 2021
1 parent 45f7861 commit 38ca04c
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/Config/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public function getProducerOptions(): array
'metadata.broker.list' => $this->broker,
];

return array_merge($config, $this->getSaslOptions());
return array_merge($config, $this->customOptions, $this->getSaslOptions());
}

private function getSaslOptions(): array
Expand Down
38 changes: 38 additions & 0 deletions tests/Config/ConfigTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,42 @@ public function testItReturnsProducerOptions()
$config->getProducerOptions()
);
}

public function testItAcceptsCustomOptionsForProducersConfig()
{
$customOptions = [
'bootstrap.servers' => '[REMOTE_ADDRESS]',
'metadata.broker.list' => '[REMOTE_ADDRESS]',
'security.protocol' => 'SASL_SSL',
'sasl.mechanisms' => 'PLAIN',
'sasl.username' => '[API_KEY]',
'sasl.password' => '[API_KEY]',
];

$config = new Config(
broker: 'broker',
topics: ['topic'],
securityProtocol: 'SASL_PLAINTEXT',
commit: 1,
groupId: 'group',
consumer: $this->createMock(Consumer::class),
dlq: null,
customOptions: $customOptions
);

$expectedOptions = [
'compression.codec' => 'snappy',
'bootstrap.servers' => '[REMOTE_ADDRESS]',
'metadata.broker.list' => '[REMOTE_ADDRESS]',
'security.protocol' => 'SASL_SSL',
'sasl.mechanisms' => 'PLAIN',
'sasl.username' => '[API_KEY]',
'sasl.password' => '[API_KEY]',
];

$this->assertEquals(
$expectedOptions,
$config->getProducerOptions()
);
}
}
17 changes: 17 additions & 0 deletions tests/KafkaTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,23 @@ public function testICanDisableDebugUsingWithDebugDisabledMethod()
$this->assertArrayNotHasKey('debug', $message->getHeaders());
}

public function testICanUseCustomOptionsForProducerConfig()
{
$producer = Kafka::publishOn('localhost:9092', 'test-topic')
->withConfigOptions($expectedOptions = [
'bootstrap.servers' => '[REMOTE_ADDRESS]',
'metadata.broker.list' => '[REMOTE_ADDRESS]',
'security.protocol' => 'SASL_SSL',
'sasl.mechanisms' => 'PLAIN',
'sasl.username' => '[API_KEY]',
'sasl.password' => '[API_KEY]',
]);

$options = $this->getPropertyWithReflection('options', $producer);

$this->assertEquals($expectedOptions, $options);
}

public function testCreateConsumerReturnsAConsumerBuilderInstance()
{
$consumer = Kafka::createConsumer('broker');
Expand Down

0 comments on commit 38ca04c

Please sign in to comment.