Skip to content

Commit

Permalink
CONFLUENT: Make min.insync.replicas visible but read-only in topic de… (
Browse files Browse the repository at this point in the history
apache#99)

CONFLUENT: Make min.insync.replicas visible but read-only in topic describe
  • Loading branch information
bob-barrett committed Dec 29, 2018
1 parent 0a39633 commit 5690a7b
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 5 deletions.
Expand Up @@ -21,6 +21,7 @@ public class MultiTenantConfigRestrictions {
"num.partitions"
);

// Topic configs that are modifiable by cloud users
public static final Set<String> UPDATABLE_TOPIC_CONFIGS = Utils.mkSet(
"cleanup.policy",
"max.message.bytes",
Expand All @@ -32,4 +33,9 @@ public class MultiTenantConfigRestrictions {
"delete.retention.ms",
"segment.bytes"
);

// Topic configs that are returned in topic describe but marked as read only
public static final Set<String> READ_ONLY_TOPIC_CONFIGS = Utils.mkSet(
"min.insync.replicas"
);
}
Expand Up @@ -374,10 +374,17 @@ private DescribeConfigsResponse filteredDescribeConfigsResponse(
return MultiTenantConfigRestrictions.VISIBLE_BROKER_CONFIGS.contains(ce.name());
}
if (resource.type() == ConfigResource.Type.TOPIC) {
return MultiTenantConfigRestrictions.UPDATABLE_TOPIC_CONFIGS.contains(ce.name());
// Allow both updatable and visible topic configs in the response
return MultiTenantConfigRestrictions.UPDATABLE_TOPIC_CONFIGS.contains(ce.name()) ||
MultiTenantConfigRestrictions.READ_ONLY_TOPIC_CONFIGS.contains(ce.name());
}
return false;
})
// For configs that are visible but not updatable, set readOnly to true
.map(ce -> MultiTenantConfigRestrictions.READ_ONLY_TOPIC_CONFIGS.contains(ce.name()) ?
new DescribeConfigsResponse.ConfigEntry(ce.name(), ce.value(), ce.source(), ce.isSensitive(), true, ce.synonyms()) :
ce
)
.collect(Collectors.toSet());
filteredConfigs.put(
resource,
Expand Down
Expand Up @@ -9,6 +9,7 @@
import java.util.Locale;
import java.util.Optional;
import java.util.stream.Collectors;

import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -135,6 +136,8 @@
import java.util.stream.Stream;

import static java.util.stream.Collectors.toList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -1512,18 +1515,24 @@ public void testDescribeConfigsResponseWithAllBrokerConfigs() throws IOException
}

public void testDescribeConfigsResponse(boolean allowDescribeBrokerConfigs) throws IOException {
DescribeConfigsResponse.ConfigSource source = DescribeConfigsResponse.ConfigSource.STATIC_BROKER_CONFIG;
DescribeConfigsResponse.ConfigSource brokerSource = DescribeConfigsResponse.ConfigSource.STATIC_BROKER_CONFIG;
DescribeConfigsResponse.ConfigSource topicSource = DescribeConfigsResponse.ConfigSource.TOPIC_CONFIG;
Set<DescribeConfigsResponse.ConfigSynonym> emptySynonyms = Collections.emptySet();
Collection<DescribeConfigsResponse.ConfigEntry> brokerConfigEntries = Arrays.asList(
new DescribeConfigsResponse.ConfigEntry("message.max.bytes", "10000", source, false, false, emptySynonyms),
new DescribeConfigsResponse.ConfigEntry("num.network.threads", "5", source, false, false, emptySynonyms)
new DescribeConfigsResponse.ConfigEntry("message.max.bytes", "10000", brokerSource, false, false, emptySynonyms),
new DescribeConfigsResponse.ConfigEntry("num.network.threads", "5", brokerSource, false, false, emptySynonyms)
);
Collection<DescribeConfigsResponse.ConfigEntry> topicConfigEntries = Arrays.asList(
new DescribeConfigsResponse.ConfigEntry("retention.bytes", "10000000", topicSource, false, false, emptySynonyms),
new DescribeConfigsResponse.ConfigEntry("min.insync.replicas", "2", topicSource, false, false, emptySynonyms),
new DescribeConfigsResponse.ConfigEntry("min.cleanable.dirty.ratio", "0.5", topicSource, false, false, emptySynonyms)
);

for (short ver = ApiKeys.DESCRIBE_CONFIGS.oldestVersion(); ver <= ApiKeys.DESCRIBE_CONFIGS.latestVersion(); ver++) {
MultiTenantRequestContext context = newRequestContext(ApiKeys.DESCRIBE_CONFIGS, ver);
Map<ConfigResource, DescribeConfigsResponse.Config> resourceErrors = new HashMap<>();
resourceErrors.put(new ConfigResource(ConfigResource.Type.TOPIC, "tenant_foo"), new DescribeConfigsResponse.Config(new ApiError(Errors.NONE, ""),
Collections.<DescribeConfigsResponse.ConfigEntry>emptyList()));
topicConfigEntries));
resourceErrors.put(new ConfigResource(ConfigResource.Type.BROKER, "blah"), new DescribeConfigsResponse.Config(new ApiError(Errors.NONE, ""),
brokerConfigEntries));
resourceErrors.put(new ConfigResource(ConfigResource.Type.TOPIC, "tenant_bar"), new DescribeConfigsResponse.Config(new ApiError(Errors.NONE, ""),
Expand All @@ -1535,6 +1544,28 @@ public void testDescribeConfigsResponse(boolean allowDescribeBrokerConfigs) thro
assertEquals(mkSet(new ConfigResource(ConfigResource.Type.TOPIC, "foo"),
new ConfigResource(ConfigResource.Type.BROKER, "blah"),
new ConfigResource(ConfigResource.Type.TOPIC, "bar")), intercepted.configs().keySet());

Collection<DescribeConfigsResponse.ConfigEntry> interceptedTopicConfigs =
intercepted.configs().get(new ConfigResource(ConfigResource.Type.TOPIC, "foo")).entries();
Map<String, Boolean> topicReadOnlyMap = new HashMap<>();
for (DescribeConfigsResponse.ConfigEntry configEntry : interceptedTopicConfigs) {
topicReadOnlyMap.put(configEntry.name(), configEntry.isReadOnly());
}
if (allowDescribeBrokerConfigs) {
assertEquals(
mkMap(
mkEntry("retention.bytes", Boolean.FALSE),
mkEntry("min.insync.replicas", Boolean.FALSE),
mkEntry("min.cleanable.dirty.ratio", Boolean.FALSE)),
topicReadOnlyMap);
} else {
assertEquals(
mkMap(
mkEntry("retention.bytes", Boolean.FALSE),
mkEntry("min.insync.replicas", Boolean.TRUE)),
topicReadOnlyMap);
}

Collection<DescribeConfigsResponse.ConfigEntry> interceptedBrokerConfigs =
intercepted.configs().get(new ConfigResource(ConfigResource.Type.BROKER, "blah")).entries();
Set<String> interceptedEntries = new HashSet<>();
Expand Down

0 comments on commit 5690a7b

Please sign in to comment.