Skip to content

Commit

Permalink
GH-1678: Allow CF Property Overrides
Browse files Browse the repository at this point in the history
Resolves #1678

Although the consumer factory can override properties using an overloaded
`createConsumer` method, add update/remove methods for properties to be
consistent with the `ProducerFactory`.

Change maps to concurrent to avoid possible `ConcurrentModificationException`.
  • Loading branch information
garyrussell authored and artembilan committed Jan 27, 2021
1 parent 9101166 commit 8879427
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 the original author or authors.
* Copyright 2016-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -204,6 +204,23 @@ default List<ConsumerPostProcessor<K, V>> getPostProcessors() {
return Collections.emptyList();
}

/**
* Update the consumer configuration map; useful for situations such as
* credential rotation.
* @param updates the configuration properties to update.
* @since 2.7
*/
default void updateConfigs(Map<String, Object> updates) {
}

/**
* Remove the specified key from the configuration map.
* @param configKey the key to remove.
* @since 2.7
*/
default void removeConfig(String configKey) {
}

/**
* Called whenever a consumer is added or removed.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 the original author or authors.
* Copyright 2016-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

import org.aopalliance.aop.Advice;
Expand Down Expand Up @@ -121,7 +122,7 @@ public DefaultKafkaConsumerFactory(Map<String, Object> configs,
@Nullable Supplier<Deserializer<K>> keyDeserializerSupplier,
@Nullable Supplier<Deserializer<V>> valueDeserializerSupplier) {

this.configs = new HashMap<>(configs);
this.configs = new ConcurrentHashMap<>(configs);
this.keyDeserializerSupplier = keyDeserializerSupplier == null ? () -> null : keyDeserializerSupplier;
this.valueDeserializerSupplier = valueDeserializerSupplier == null ? () -> null : valueDeserializerSupplier;
}
Expand Down Expand Up @@ -229,6 +230,16 @@ public boolean removeListener(Listener<K, V> listener) {
return this.listeners.remove(listener);
}

@Override
public void updateConfigs(Map<String, Object> updates) {
this.configs.putAll(updates);
}

@Override
public void removeConfig(String configKey) {
this.configs.remove(configKey);
}

@Override
public Consumer<K, V> createConsumer(@Nullable String groupId, @Nullable String clientIdPrefix,
@Nullable String clientIdSuffix) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 the original author or authors.
* Copyright 2016-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -192,7 +192,7 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs,
@Nullable Supplier<Serializer<K>> keySerializerSupplier,
@Nullable Supplier<Serializer<V>> valueSerializerSupplier) {

this.configs = new HashMap<>(configs);
this.configs = new ConcurrentHashMap<>(configs);
this.keySerializerSupplier = keySerializerSupplier == null ? () -> null : keySerializerSupplier;
this.valueSerializerSupplier = valueSerializerSupplier == null ? () -> null : valueSerializerSupplier;
if (this.clientIdPrefix == null && configs.get(ProducerConfig.CLIENT_ID_CONFIG) instanceof String) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2020 the original author or authors.
* Copyright 2017-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -145,6 +145,10 @@ protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object>
target.createConsumer(null, null, null, overrides);
assertThat(configPassedToKafkaConsumer.get("config1")).isEqualTo("overridden");
assertThat(configPassedToKafkaConsumer.get("config2")).isSameAs(originalConfig.get("config2"));
target.updateConfigs(Map.of("config1", "newValue"));
assertThat(target.getConfigurationProperties().get("config1")).isEqualTo("newValue");
target.removeConfig("config1");
assertThat(target.getConfigurationProperties().get("config1")).isNull();
}

@Test
Expand Down

0 comments on commit 8879427

Please sign in to comment.