Skip to content

Commit

Permalink
GH-1641: Reconfigurable Producer Factory
Browse files Browse the repository at this point in the history
Resolves #1641

**I will back-port; the test will need some tweaks**
  • Loading branch information
garyrussell authored and artembilan committed Nov 24, 2020
1 parent 1ff7823 commit 1140d5e
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,6 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory

private Duration physicalCloseTimeout = DEFAULT_PHYSICAL_CLOSE_TIMEOUT;

private String transactionIdPrefix;

private ApplicationContext applicationContext;

private String beanName = "not.managed.by.Spring";
Expand All @@ -150,10 +148,12 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory

private boolean producerPerThread;

private String clientIdPrefix;

private long maxAge;

private volatile String transactionIdPrefix;

private volatile String clientIdPrefix;

private volatile CloseSafeProducer<K, V> producer;

/**
Expand Down Expand Up @@ -412,6 +412,34 @@ public boolean removePostProcessor(ProducerPostProcessor<K, V> postProcessor) {
return this.postProcessors.remove(postProcessor);
}

@Override
public void updateConfigs(Map<String, Object> updates) {
updates.entrySet().forEach(entry -> {
if (entry.getKey().equals(ProducerConfig.TRANSACTIONAL_ID_CONFIG)) {
Assert.isTrue(entry.getValue() instanceof String, () -> "'" + ProducerConfig.TRANSACTIONAL_ID_CONFIG
+ "' must be a String, not a " + entry.getClass().getName());
Assert.isTrue(this.transactionIdPrefix != null
? entry.getValue() != null
: entry.getValue() == null,
"Cannot change transactional capability");
this.transactionIdPrefix = (String) entry.getValue();
}
else if (entry.getKey().equals(ProducerConfig.CLIENT_ID_CONFIG)) {
Assert.isTrue(entry.getValue() instanceof String, () -> "'" + ProducerConfig.CLIENT_ID_CONFIG
+ "' must be a String, not a " + entry.getClass().getName());
this.clientIdPrefix = (String) entry.getValue();
}
else {
this.configs.put(entry.getKey(), entry.getValue());
}
});
}

@Override
public void removeConfig(String configKey) {
this.configs.remove(configKey);
}

/**
* When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ default boolean transactionCapable() {
* @since 1.3.8
*/
default void closeProducerFor(String transactionIdSuffix) {
// NOSONAR
}

/**
Expand All @@ -101,15 +100,13 @@ default boolean isProducerPerConsumerPartition() {
* @since 2.3
*/
default void closeThreadBoundProducer() {
// NOSONAR
}

/**
* Reset any state in the factory, if supported.
* @since 2.4
*/
default void reset() {
// NOSONAR
}

/**
Expand Down Expand Up @@ -236,6 +233,23 @@ default List<ProducerPostProcessor<K, V>> getPostProcessors() {
return Collections.emptyList();
}

/**
* Update the producer configuration map; useful for situations such as
* credential rotation.
* @param updates the configuration properties to update.
* @since 2.5.10
*/
default void updateConfigs(Map<String, Object> updates) {
}

/**
* Remove the specified key from the configuration map.
* @param configKey the key to remove.
* @since 2.5.10
*/
default void removeConfig(String configKey) {
}

/**
* Called whenever a producer is added or removed.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.BDDMockito.given;
Expand Down Expand Up @@ -413,4 +414,33 @@ protected Producer createRawProducer(Map configs) {
pf.destroy();
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
void configUpdates() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(configs);
assertThat(pf.getConfigurationProperties()).hasSize(2);
configs.remove(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
configs.put(ProducerConfig.ACKS_CONFIG, "all");
pf.updateConfigs(configs);
assertThat(pf.getConfigurationProperties()).hasSize(3);
pf.removeConfig(ProducerConfig.ACKS_CONFIG);
assertThat(pf.getConfigurationProperties()).hasSize(2);
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-");
assertThatIllegalArgumentException().isThrownBy(() -> pf.updateConfigs(configs));
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "clientId");
DefaultKafkaProducerFactory pf1 = new DefaultKafkaProducerFactory<>(configs);
assertThat(pf1.getConfigurationProperties()).hasSize(5);
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "clientId2");
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx2-");
pf1.updateConfigs(configs);
assertThat(pf1.getConfigurationProperties()).hasSize(5);
assertThat(KafkaTestUtils.getPropertyValue(pf1, "clientIdPrefix")).isEqualTo("clientId2");
assertThat(KafkaTestUtils.getPropertyValue(pf1, "transactionIdPrefix")).isEqualTo("tx2-");
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, null);
assertThatIllegalArgumentException().isThrownBy(() -> pf1.updateConfigs(configs));
}

}
16 changes: 16 additions & 0 deletions src/reference/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,22 @@ public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
----
====

Starting with version 2.5.10, you can now update the producer properties after the factory is created.
This might be useful, for example, if you have to update SSL key/trust store locations after a credentials change.
The changes will not affect existing producer instances; call `reset()` to close any existing producers so that new producers will be created using the new properties.
NOTE: You cannot change a transactional producer factory to non-transactional, and vice-versa.

Two new methods are now provided:

====
[source, java]
----
void updateConfigs(Map<String, Object> updates);
void removeConfig(String configKey);
----
====

[[replying-template]]
===== Using `ReplyingKafkaTemplate`

Expand Down
4 changes: 4 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,7 @@ See <<seek-to-current>>, <<after-rollback>>, <<recovering-batch-eh>> for more in

You can now set a maximum age for producers after which they will be closed and recreated.
See <<transactions>> for more information.

You can now update the configuration map after the `DefaultKafkaProducerFactory` has been created.
This might be useful, for example, if you have to update SSL key/trust store locations after a credentials change.
See <<producer-factory>> for more information.

0 comments on commit 1140d5e

Please sign in to comment.