From 4f3ae69a7773b7f71764d6231b5bdcbc4f7b062f Mon Sep 17 00:00:00 2001 From: iliax Date: Thu, 20 Oct 2022 19:37:42 +0400 Subject: [PATCH 01/14] ISSUE-2787: 1. Getting topic config defaults from server response 2. rm KafkaConstants 3. doc field added to TopicConfigDTO --- .../kafka/ui/model/InternalTopicConfig.java | 21 ++++-- .../kafka/ui/service/ReactiveAdminClient.java | 10 +-- .../kafka/ui/service/TopicsService.java | 4 +- .../provectus/kafka/ui/util/ClusterUtil.java | 0 .../kafka/ui/util/KafkaConstants.java | 65 ------------------- .../main/resources/swagger/kafka-ui-api.yaml | 2 + 6 files changed, 24 insertions(+), 78 deletions(-) delete mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java delete mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaConstants.java diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopicConfig.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopicConfig.java index 294894ebc2f..d061dd49813 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopicConfig.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopicConfig.java @@ -1,8 +1,5 @@ package com.provectus.kafka.ui.model; -import static com.provectus.kafka.ui.util.KafkaConstants.TOPIC_DEFAULT_CONFIGS; -import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG; - import java.util.List; import lombok.Builder; import lombok.Data; @@ -19,6 +16,7 @@ public class InternalTopicConfig { private final boolean isSensitive; private final boolean isReadOnly; private final List synonyms; + private final String doc; public static InternalTopicConfig from(ConfigEntry configEntry) { InternalTopicConfig.InternalTopicConfigBuilder builder = InternalTopicConfig.builder() @@ -27,11 +25,22 @@ public static InternalTopicConfig from(ConfigEntry configEntry) { .source(configEntry.source()) .isReadOnly(configEntry.isReadOnly()) .isSensitive(configEntry.isSensitive()) - .synonyms(configEntry.synonyms()); - if (configEntry.name().equals(MESSAGE_FORMAT_VERSION_CONFIG)) { + .synonyms(configEntry.synonyms()) + .doc(configEntry.documentation()); + + if (configEntry.source() == ConfigEntry.ConfigSource.DEFAULT_CONFIG) { + // this is important case, because for some configs like "confluent.*" no synonyms returned, but + // they are set by default and "source" == DEFAULT_CONFIG builder.defaultValue(configEntry.value()); } else { - builder.defaultValue(TOPIC_DEFAULT_CONFIGS.get(configEntry.name())); + // normally by default first entity of synonyms values will be used. + configEntry.synonyms().stream() + // skipping DYNAMIC_TOPIC_CONFIG value - which is explicitly set value when + // topic was created (not default), see ConfigEntry.synonyms() doc + .filter(s -> s.source() != ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG) + .map(ConfigEntry.ConfigSynonym::value) + .findFirst() + .ifPresent(builder::defaultValue); } return builder.build(); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java index c08b839410e..3ef654025d1 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java @@ -145,20 +145,20 @@ public String getVersion() { } public Mono>> getTopicsConfig() { - return listTopics(true).flatMap(this::getTopicsConfig); + return listTopics(true).flatMap(topics -> getTopicsConfig(topics, false)); } - public Mono>> getTopicsConfig(Collection topicNames) { + public Mono>> getTopicsConfig(Collection topicNames, boolean includeDoc) { // we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count return partitionCalls( topicNames, 200, - this::getTopicsConfigImpl, + part -> getTopicsConfigImpl(part, includeDoc), (m1, m2) -> ImmutableMap.>builder().putAll(m1).putAll(m2).build() ); } - private Mono>> getTopicsConfigImpl(Collection topicNames) { + private Mono>> getTopicsConfigImpl(Collection topicNames, boolean includeDoc) { List resources = topicNames.stream() .map(topicName -> new ConfigResource(ConfigResource.Type.TOPIC, topicName)) .collect(toList()); @@ -166,7 +166,7 @@ private Mono>> getTopicsConfigImpl(Collection config.entrySet().stream() .collect(toMap( diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java index 2dffe0de75e..8badcebc360 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java @@ -68,7 +68,7 @@ public Mono> loadTopics(KafkaCluster c, List topics) } return adminClientService.get(c) .flatMap(ac -> - ac.describeTopics(topics).zipWith(ac.getTopicsConfig(topics), + ac.describeTopics(topics).zipWith(ac.getTopicsConfig(topics, false), (descriptions, configs) -> { statisticsCache.update(c, descriptions, configs); return getPartitionOffsets(descriptions, ac).map(offsets -> { @@ -160,7 +160,7 @@ public Mono getTopicDetails(KafkaCluster cluster, String topicNam public Mono> getTopicConfigs(KafkaCluster cluster, String topicName) { return adminClientService.get(cluster) - .flatMap(ac -> ac.getTopicsConfig(List.of(topicName))) + .flatMap(ac -> ac.getTopicsConfig(List.of(topicName), true)) .map(m -> m.values().stream().findFirst().orElseThrow(TopicNotFoundException::new)); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaConstants.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaConstants.java deleted file mode 100644 index aa482c57b59..00000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaConstants.java +++ /dev/null @@ -1,65 +0,0 @@ -package com.provectus.kafka.ui.util; - -import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_DELETE; -import static org.apache.kafka.common.config.TopicConfig.COMPRESSION_TYPE_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.DELETE_RETENTION_MS_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.FILE_DELETE_DELAY_MS_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.FLUSH_MS_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.INDEX_INTERVAL_BYTES_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.MAX_MESSAGE_BYTES_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.PREALLOCATE_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.RETENTION_BYTES_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.RETENTION_MS_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.SEGMENT_BYTES_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.SEGMENT_INDEX_BYTES_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.SEGMENT_JITTER_MS_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.SEGMENT_MS_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG; - -import java.util.AbstractMap; -import java.util.Map; - -public final class KafkaConstants { - - private static final String LONG_MAX_STRING = Long.toString(Long.MAX_VALUE); - - public static final Map TOPIC_DEFAULT_CONFIGS = Map.ofEntries( - new AbstractMap.SimpleEntry<>(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_DELETE), - new AbstractMap.SimpleEntry<>(COMPRESSION_TYPE_CONFIG, "producer"), - new AbstractMap.SimpleEntry<>(DELETE_RETENTION_MS_CONFIG, "86400000"), - new AbstractMap.SimpleEntry<>(FILE_DELETE_DELAY_MS_CONFIG, "60000"), - new AbstractMap.SimpleEntry<>(FLUSH_MESSAGES_INTERVAL_CONFIG, LONG_MAX_STRING), - new AbstractMap.SimpleEntry<>(FLUSH_MS_CONFIG, LONG_MAX_STRING), - new AbstractMap.SimpleEntry<>("follower.replication.throttled.replicas", ""), - new AbstractMap.SimpleEntry<>(INDEX_INTERVAL_BYTES_CONFIG, "4096"), - new AbstractMap.SimpleEntry<>("leader.replication.throttled.replicas", ""), - new AbstractMap.SimpleEntry<>(MAX_COMPACTION_LAG_MS_CONFIG, LONG_MAX_STRING), - new AbstractMap.SimpleEntry<>(MAX_MESSAGE_BYTES_CONFIG, "1000012"), - new AbstractMap.SimpleEntry<>(MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, LONG_MAX_STRING), - new AbstractMap.SimpleEntry<>(MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime"), - new AbstractMap.SimpleEntry<>(MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.5"), - new AbstractMap.SimpleEntry<>(MIN_COMPACTION_LAG_MS_CONFIG, "0"), - new AbstractMap.SimpleEntry<>(MIN_IN_SYNC_REPLICAS_CONFIG, "1"), - new AbstractMap.SimpleEntry<>(PREALLOCATE_CONFIG, "false"), - new AbstractMap.SimpleEntry<>(RETENTION_BYTES_CONFIG, "-1"), - new AbstractMap.SimpleEntry<>(RETENTION_MS_CONFIG, "604800000"), - new AbstractMap.SimpleEntry<>(SEGMENT_BYTES_CONFIG, "1073741824"), - new AbstractMap.SimpleEntry<>(SEGMENT_INDEX_BYTES_CONFIG, "10485760"), - new AbstractMap.SimpleEntry<>(SEGMENT_JITTER_MS_CONFIG, "0"), - new AbstractMap.SimpleEntry<>(SEGMENT_MS_CONFIG, "604800000"), - new AbstractMap.SimpleEntry<>(UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false"), - new AbstractMap.SimpleEntry<>(MESSAGE_DOWNCONVERSION_ENABLE_CONFIG, "true") - ); - - private KafkaConstants() { - } -} \ No newline at end of file diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index d07c24a61b1..302cf84c338 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -2260,6 +2260,8 @@ components: type: array items: $ref: "#/components/schemas/ConfigSynonym" + doc: + type: string required: - name From 86d931af75458d4592df0eaad31e6385a2a59f50 Mon Sep 17 00:00:00 2001 From: iliax Date: Thu, 20 Oct 2022 20:48:29 +0400 Subject: [PATCH 02/14] ReactiveAdminClient.SupportedFeature refactor --- .../kafka/ui/service/ReactiveAdminClient.java | 35 +++++++++++++------ .../kafka/ui/KafkaConsumerTests.java | 6 ++-- 2 files changed, 28 insertions(+), 13 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java index 3ef654025d1..463e2d9b6c6 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java @@ -12,6 +12,7 @@ import com.provectus.kafka.ui.util.NumberUtil; import java.io.Closeable; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -69,8 +70,24 @@ public class ReactiveAdminClient implements Closeable { private enum SupportedFeature { - INCREMENTAL_ALTER_CONFIGS, - ALTER_CONFIGS + INCREMENTAL_ALTER_CONFIGS(2.3f), + CONFIG_DOCUMENTATION_RETRIEVAL(2.6f); + + private final float sinceVersion; + + SupportedFeature(float sinceVersion) { + this.sinceVersion = sinceVersion; + } + + static Set forVersion(float kafkaVersion) { + return Arrays.stream(SupportedFeature.values()) + .filter(f -> kafkaVersion >= f.sinceVersion) + .collect(Collectors.toSet()); + } + + static Set defaultFeatures() { + return Set.of(); + } } @Value @@ -88,18 +105,15 @@ public static Mono create(AdminClient adminClient) { new ReactiveAdminClient( adminClient, ver, - Set.of(getSupportedUpdateFeatureForVersion(ver)))); + getSupportedUpdateFeaturesForVersion(ver))); } - private static SupportedFeature getSupportedUpdateFeatureForVersion(String versionStr) { + private static Set getSupportedUpdateFeaturesForVersion(String versionStr) { try { float version = NumberUtil.parserClusterVersion(versionStr); - return version <= 2.3f - ? SupportedFeature.ALTER_CONFIGS - : SupportedFeature.INCREMENTAL_ALTER_CONFIGS; + return SupportedFeature.forVersion(version); } catch (NumberFormatException e) { - log.info("Assuming non-incremental alter configs due to version parsing error"); - return SupportedFeature.ALTER_CONFIGS; + return SupportedFeature.defaultFeatures(); } } @@ -149,11 +163,12 @@ public Mono>> getTopicsConfig() { } public Mono>> getTopicsConfig(Collection topicNames, boolean includeDoc) { + var includeDocFixed = features.contains(SupportedFeature.CONFIG_DOCUMENTATION_RETRIEVAL) && includeDoc; // we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count return partitionCalls( topicNames, 200, - part -> getTopicsConfigImpl(part, includeDoc), + part -> getTopicsConfigImpl(part, includeDocFixed), (m1, m2) -> ImmutableMap.>builder().putAll(m1).putAll(m2).build() ); } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java index d248edf5e8c..ff11aa6656a 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java @@ -3,10 +3,10 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.springframework.http.MediaType.TEXT_EVENT_STREAM; -import com.provectus.kafka.ui.api.model.TopicConfig; import com.provectus.kafka.ui.model.BrokerConfigDTO; import com.provectus.kafka.ui.model.PartitionsIncreaseDTO; import com.provectus.kafka.ui.model.PartitionsIncreaseResponseDTO; +import com.provectus.kafka.ui.model.TopicConfigDTO; import com.provectus.kafka.ui.model.TopicCreationDTO; import com.provectus.kafka.ui.model.TopicDetailsDTO; import com.provectus.kafka.ui.model.TopicMessageEventDTO; @@ -206,12 +206,12 @@ public void shouldRetrieveTopicConfig() { .expectStatus() .isOk(); - List configs = webTestClient.get() + List configs = webTestClient.get() .uri("/api/clusters/{clusterName}/topics/{topicName}/config", LOCAL, topicName) .exchange() .expectStatus() .isOk() - .expectBodyList(TopicConfig.class) + .expectBodyList(TopicConfigDTO.class) .returnResult() .getResponseBody(); From 83bccc1fb8f931f8ee79e51d1b81e97d6ec79a4c Mon Sep 17 00:00:00 2001 From: iliax Date: Tue, 25 Oct 2022 00:59:56 +0400 Subject: [PATCH 03/14] ISSUE-754: Backend for kafka ACLs --- .../compose/jaas/zookeeper_jaas.conf | 4 + documentation/compose/kafka-ui-sasl.yaml | 38 +++-- .../kafka/ui/controller/AclsController.java | 55 +++++++ .../kafka/ui/mapper/ClusterMapper.java | 38 ++++- .../com/provectus/kafka/ui/model/Feature.java | 3 +- .../kafka/ui/service/FeatureService.java | 21 ++- .../kafka/ui/service/ReactiveAdminClient.java | 77 ++++++--- .../kafka/ui/service/acl/AclCsv.java | 81 ++++++++++ .../kafka/ui/service/acl/AclsService.java | 93 +++++++++++ .../kafka/ui/service/acl/AclCsvTest.java | 68 ++++++++ .../main/resources/swagger/kafka-ui-api.yaml | 149 ++++++++++++++++++ 11 files changed, 578 insertions(+), 49 deletions(-) create mode 100644 documentation/compose/jaas/zookeeper_jaas.conf create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclCsv.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclsService.java create mode 100644 kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclCsvTest.java diff --git a/documentation/compose/jaas/zookeeper_jaas.conf b/documentation/compose/jaas/zookeeper_jaas.conf new file mode 100644 index 00000000000..2b1754fba0e --- /dev/null +++ b/documentation/compose/jaas/zookeeper_jaas.conf @@ -0,0 +1,4 @@ +Server { + org.apache.zookeeper.server.auth.DigestLoginModule required + user_admin="admin-secret"; +}; \ No newline at end of file diff --git a/documentation/compose/kafka-ui-sasl.yaml b/documentation/compose/kafka-ui-sasl.yaml index 1c0312f11a2..6dfe4f7532e 100644 --- a/documentation/compose/kafka-ui-sasl.yaml +++ b/documentation/compose/kafka-ui-sasl.yaml @@ -12,41 +12,39 @@ services: - kafka environment: KAFKA_CLUSTERS_0_NAME: local -# SERVER_SERVLET_CONTEXT_PATH: "/kafkaui" - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9093 KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181 KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL: SASL_PLAINTEXT KAFKA_CLUSTERS_0_PROPERTIES_SASL_MECHANISM: PLAIN KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret";' + zookeeper: - image: confluentinc/cp-zookeeper:5.2.4 + image: wurstmeister/zookeeper:3.4.6 environment: - ZOOKEEPER_CLIENT_PORT: 2181 - ZOOKEEPER_TICK_TIME: 2000 + JVMFLAGS: "-Djava.security.auth.login.config=/etc/zookeeper/zookeeper_jaas.conf" + volumes: + - ./jaas/zookeeper_jaas.conf:/etc/zookeeper/zookeeper_jaas.conf ports: - 2181:2181 kafka: - image: wurstmeister/kafka:latest - hostname: kafka - container_name: kafka + image: wurstmeister/kafka:2.13-2.8.1 depends_on: - zookeeper ports: - - '9092:9092' + - 9092:9092 environment: - KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' - KAFKA_LISTENERS: SASL_PLAINTEXT://kafka:9092 - KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://kafka:9092 - KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer + KAFKA_SUPER_USERS: "User:admin" + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENERS: INTERNAL://:9093,EXTERNAL://:9092 + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9093,EXTERNAL://localhost:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT ALLOW_PLAINTEXT_LISTENER: 'yes' - KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/jaas/kafka_server.conf" - KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer - KAFKA_INTER_BROKER_LISTENER_NAME: SASL_PLAINTEXT + KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_SASL_ENABLED_MECHANISMS: PLAIN KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN - KAFKA_SECURITY_PROTOCOL: SASL_PLAINTEXT - KAFKA_SUPER_USERS: User:admin,User:enzo - KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true' + KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_jaas.conf" volumes: - - ./jaas:/etc/kafka/jaas \ No newline at end of file + - ./jaas/kafka_server.conf:/etc/kafka/kafka_jaas.conf \ No newline at end of file diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java new file mode 100644 index 00000000000..7d8fa9812da --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java @@ -0,0 +1,55 @@ +package com.provectus.kafka.ui.controller; + +import com.provectus.kafka.ui.api.AclsApi; +import com.provectus.kafka.ui.mapper.ClusterMapper; +import com.provectus.kafka.ui.model.KafkaAclDTO; +import com.provectus.kafka.ui.service.acl.AclsService; +import lombok.RequiredArgsConstructor; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.server.ServerWebExchange; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@RestController +@RequiredArgsConstructor +public class AclsController extends AbstractController implements AclsApi { + + private final AclsService aclsService; + + @Override + public Mono> createAcl(String clusterName, Mono kafkaAclDto, + ServerWebExchange exchange) { + return kafkaAclDto.map(ClusterMapper::toAclBinding) + .flatMap(binding -> aclsService.createAcl(getCluster(clusterName), binding)) + .thenReturn(ResponseEntity.ok().build()); + } + + @Override + public Mono> deleteAcl(String clusterName, Mono kafkaAclDto, + ServerWebExchange exchange) { + return kafkaAclDto.map(ClusterMapper::toAclBinding) + .flatMap(binding -> aclsService.deleteAcl(getCluster(clusterName), binding)) + .thenReturn(ResponseEntity.ok().build()); + } + + @Override + public Mono>> listAcls(String clusterName, ServerWebExchange exchange) { + return Mono.just( + ResponseEntity.ok( + aclsService.listAcls(getCluster(clusterName)).map(ClusterMapper::toKafkaAclDto))); + } + + @Override + public Mono> getAclAsCsv(String clusterName, ServerWebExchange exchange) { + return aclsService.getAclAsCsvString(getCluster(clusterName)) + .map(ResponseEntity::ok) + .flatMap(Mono::just); + } + + @Override + public Mono> syncAclsCsv(String clusterName, Mono csvMono, ServerWebExchange exchange) { + return csvMono.flatMap(csv -> aclsService.syncAclWithAclCsv(getCluster(clusterName), csv)) + .thenReturn(ResponseEntity.ok().build()); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java index bf7cb336360..24f52b0cb11 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java @@ -23,6 +23,7 @@ import com.provectus.kafka.ui.model.InternalSchemaRegistry; import com.provectus.kafka.ui.model.InternalTopic; import com.provectus.kafka.ui.model.InternalTopicConfig; +import com.provectus.kafka.ui.model.KafkaAclDTO; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.KafkaConnectCluster; import com.provectus.kafka.ui.model.MetricDTO; @@ -35,7 +36,6 @@ import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityCheck; import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityLevel; import com.provectus.kafka.ui.service.metrics.RawMetric; -import java.nio.file.Path; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -43,6 +43,13 @@ import java.util.Properties; import java.util.stream.Collectors; import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourceType; import org.mapstruct.Mapper; import org.mapstruct.Mapping; import org.mapstruct.Named; @@ -181,4 +188,33 @@ default Properties setProperties(Properties properties) { return copy; } + static AclBinding toAclBinding(KafkaAclDTO dto) { + return new AclBinding( + new ResourcePattern( + ResourceType.valueOf(dto.getResourceType().name()), + dto.getResourceName(), + PatternType.valueOf(dto.getNamePatternType().name()) + ), + new AccessControlEntry( + dto.getPrincipal(), + dto.getHost(), + AclOperation.valueOf(dto.getOperation().name()), + AclPermissionType.valueOf(dto.getPermission().name()) + ) + ); + } + + static KafkaAclDTO toKafkaAclDto(AclBinding binding) { + var pattern = binding.pattern(); + var filter = binding.toFilter().entryFilter(); + return new KafkaAclDTO() + .resourceType(KafkaAclDTO.ResourceTypeEnum.fromValue(pattern.resourceType().name())) + .resourceName(pattern.name()) + .namePatternType(KafkaAclDTO.NamePatternTypeEnum.fromValue(pattern.patternType().name())) + .principal(filter.principal()) + .host(filter.host()) + .operation(KafkaAclDTO.OperationEnum.fromValue(filter.operation().name())) + .permission(KafkaAclDTO.PermissionEnum.fromValue(filter.permissionType().name())); + } + } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Feature.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Feature.java index ff0e2fca4bc..f35039190d9 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Feature.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Feature.java @@ -4,5 +4,6 @@ public enum Feature { KAFKA_CONNECT, KSQL_DB, SCHEMA_REGISTRY, - TOPIC_DELETION + TOPIC_DELETION, + KAFKA_ACL } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java index 9097f2b25e7..0eac8931a6d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java @@ -2,6 +2,7 @@ import com.provectus.kafka.ui.model.Feature; import com.provectus.kafka.ui.model.KafkaCluster; +import com.provectus.kafka.ui.service.ReactiveAdminClient.SupportedFeature; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -42,16 +43,15 @@ public Mono> getAvailableFeatures(KafkaCluster cluster, @Nullable } if (controller != null) { - features.add( - isTopicDeletionEnabled(cluster, controller) - .flatMap(r -> Boolean.TRUE.equals(r) ? Mono.just(Feature.TOPIC_DELETION) : Mono.empty()) - ); + features.add(topicDeletion(cluster, controller)); } + features.add(acl(cluster)); + return Flux.fromIterable(features).flatMap(m -> m).collectList(); } - private Mono isTopicDeletionEnabled(KafkaCluster cluster, Node controller) { + private Mono topicDeletion(KafkaCluster cluster, Node controller) { return adminClientService.get(cluster) .flatMap(ac -> ac.loadBrokersConfig(List.of(controller.id()))) .map(config -> @@ -60,6 +60,15 @@ private Mono isTopicDeletionEnabled(KafkaCluster cluster, Node controll .filter(e -> e.name().equals(DELETE_TOPIC_ENABLED_SERVER_PROPERTY)) .map(e -> Boolean.parseBoolean(e.value())) .findFirst() - .orElse(true)); + .orElse(true)) + .flatMap(enabled -> enabled ? Mono.just(Feature.TOPIC_DELETION) : Mono.empty()); + } + + private Mono acl(KafkaCluster cluster) { + return adminClientService.get(cluster).flatMap( + ac -> ac.getClusterFeatures().contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED) + ? Mono.just(Feature.KAFKA_ACL) + : Mono.empty() + ); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java index 463e2d9b6c6..755874009b8 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java @@ -4,6 +4,7 @@ import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterators; import com.provectus.kafka.ui.exception.IllegalEntityStateException; @@ -12,7 +13,6 @@ import com.provectus.kafka.ui.util.NumberUtil; import java.io.Closeable; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -52,13 +52,17 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionReplica; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.requests.DescribeLogDirsResponse; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import reactor.util.function.Tuple2; @@ -69,24 +73,27 @@ @RequiredArgsConstructor public class ReactiveAdminClient implements Closeable { - private enum SupportedFeature { + public enum SupportedFeature { INCREMENTAL_ALTER_CONFIGS(2.3f), - CONFIG_DOCUMENTATION_RETRIEVAL(2.6f); + CONFIG_DOCUMENTATION_RETRIEVAL(2.6f), + AUTHORIZED_SECURITY_ENABLED(ReactiveAdminClient::isAuthorizedSecurityEnabled); - private final float sinceVersion; + private final BiFunction> predicate; - SupportedFeature(float sinceVersion) { - this.sinceVersion = sinceVersion; + SupportedFeature(BiFunction> predicate) { + this.predicate = predicate; } - static Set forVersion(float kafkaVersion) { - return Arrays.stream(SupportedFeature.values()) - .filter(f -> kafkaVersion >= f.sinceVersion) - .collect(Collectors.toSet()); + SupportedFeature(float fromVersion) { + this.predicate = (admin, ver) -> Mono.just(ver != null && ver >= fromVersion); } - static Set defaultFeatures() { - return Set.of(); + static Mono> forVersion(AdminClient ac, @Nullable Float kafkaVersion) { + return Flux.fromArray(SupportedFeature.values()) + .flatMap(f -> f.predicate.apply(ac, kafkaVersion).map(enabled -> Tuples.of(f, enabled))) + .filter(Tuple2::getT2) + .map(Tuple2::getT1) + .collect(Collectors.toSet()); } } @@ -101,20 +108,28 @@ public static class ClusterDescription { public static Mono create(AdminClient adminClient) { return getClusterVersion(adminClient) - .map(ver -> - new ReactiveAdminClient( - adminClient, - ver, - getSupportedUpdateFeaturesForVersion(ver))); + .flatMap(ver -> + getSupportedUpdateFeaturesForVersion(adminClient, ver) + .map(features -> + new ReactiveAdminClient(adminClient, ver, features))); } - private static Set getSupportedUpdateFeaturesForVersion(String versionStr) { + private static Mono> getSupportedUpdateFeaturesForVersion(AdminClient ac, String versionStr) { + Float kafkaVersion = null; try { - float version = NumberUtil.parserClusterVersion(versionStr); - return SupportedFeature.forVersion(version); + kafkaVersion = NumberUtil.parserClusterVersion(versionStr); } catch (NumberFormatException e) { - return SupportedFeature.defaultFeatures(); + //Nothing to do here } + return SupportedFeature.forVersion(ac, kafkaVersion); + } + + private static Mono isAuthorizedSecurityEnabled(AdminClient ac, @Nullable Float kafkaVersion) { + return toMono(ac.describeAcls(AclBindingFilter.ANY).values()) + .thenReturn(true) + .doOnError(th -> !(th instanceof SecurityDisabledException) && !(th instanceof InvalidRequestException), + th -> log.warn("Error checking if security enabled", th)) + .onErrorReturn(false); } //TODO: discuss - maybe we should map kafka-library's exceptions to our exceptions here @@ -146,6 +161,10 @@ private static Mono toMono(KafkaFuture future) { private final String version; private final Set features; + public Set getClusterFeatures() { + return features; + } + public Mono> listTopics(boolean listInternal) { return toMono(client.listTopics(new ListTopicsOptions().listInternal(listInternal)).names()); } @@ -417,6 +436,22 @@ public Mono> listOffsets(Collection pa .collect(toMap(Map.Entry::getKey, e -> e.getValue().offset()))); } + public Mono> listAcls() { + Preconditions.checkArgument(features.contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED)); + return toMono(client.describeAcls(AclBindingFilter.ANY).values()); + } + + public Mono createAcls(Collection aclBindings) { + Preconditions.checkArgument(features.contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED)); + return toMono(client.createAcls(aclBindings).all()); + } + + public Mono deleteAcls(Collection aclBindings) { + Preconditions.checkArgument(features.contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED)); + var filters = aclBindings.stream().map(AclBinding::toFilter).collect(Collectors.toSet()); + return toMono(client.deleteAcls(filters).all()).then(); + } + private Mono> topicPartitions(String topic) { return toMono(client.describeTopics(List.of(topic)).all()) .map(r -> r.values().stream() diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclCsv.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclCsv.java new file mode 100644 index 00000000000..821627274c8 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclCsv.java @@ -0,0 +1,81 @@ +package com.provectus.kafka.ui.service.acl; + +import com.provectus.kafka.ui.exception.ValidationException; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourceType; + +public class AclCsv { + + private static final String LINE_SEPARATOR = System.lineSeparator(); + private static final String VALUES_SEPARATOR = ","; + private static final String HEADER = "Principal,ResourceType,PatternType,ResourceName,Operation,PermissionType,Host"; + + public static String transformToCsvString(Collection acls) { + return Stream.concat(Stream.of(HEADER), acls.stream().map(AclCsv::createAclString)) + .collect(Collectors.joining(System.lineSeparator())); + } + + public static String createAclString(AclBinding binding) { + var pattern = binding.pattern(); + var filter = binding.toFilter().entryFilter(); + return String.format( + "%s,%s,%s,%s,%s,%s,%s", + filter.principal(), + pattern.resourceType(), + pattern.patternType(), + pattern.name(), + filter.operation(), + filter.permissionType(), + filter.host() + ); + } + + private static AclBinding parseCsvLine(String csv, int line) { + String[] values = csv.split(VALUES_SEPARATOR); + if (values.length != 7) { + throw new ValidationException("Input csv is not valid - there should be 7 columns in line " + line); + } + for (int i = 0; i < values.length; i++) { + if ((values[i] = values[i].trim()).isBlank()) { + throw new ValidationException("Input csv is not valid - blank value in colum " + i + ", line " + line); + } + } + try { + return new AclBinding( + new ResourcePattern( + ResourceType.valueOf(values[1]), values[3], PatternType.valueOf(values[2])), + new AccessControlEntry( + values[0], values[6], AclOperation.valueOf(values[4]), AclPermissionType.valueOf(values[5])) + ); + } catch (IllegalArgumentException enumParseError) { + throw new ValidationException("Error parsing enum value in line " + line); + } + } + + public static Collection parseCsv(String csvString) { + String[] lines = csvString.split(LINE_SEPARATOR); + if (lines.length == 0) { + throw new ValidationException("Error parsing ACL csv file: "); + } + boolean firstLineIsHeader = HEADER.equalsIgnoreCase(lines[0].trim().replace(" ", "")); + Set result = new HashSet<>(); + for (int i = firstLineIsHeader ? 1 : 0; i < lines.length; i++) { + String line = lines[i]; + if (!line.isBlank()) { + AclBinding aclBinding = parseCsvLine(line, i); + result.add(aclBinding); + } + } + return result; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclsService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclsService.java new file mode 100644 index 00000000000..e1484135754 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclsService.java @@ -0,0 +1,93 @@ +package com.provectus.kafka.ui.service.acl; + +import com.google.common.collect.Sets; +import com.provectus.kafka.ui.model.KafkaCluster; +import com.provectus.kafka.ui.service.AdminClientService; +import com.provectus.kafka.ui.service.ReactiveAdminClient; +import java.util.List; +import java.util.Set; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.acl.AclBinding; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@Slf4j +@Service +@RequiredArgsConstructor +public class AclsService { + + private final AdminClientService adminClientService; + + public Mono createAcl(KafkaCluster cluster, AclBinding aclBinding) { + var aclString = AclCsv.createAclString(aclBinding); + log.info("CREATING ACL: [{}]", aclString); + return adminClientService.get(cluster) + .flatMap(ac -> ac.createAcls(List.of(aclBinding))) + .doOnSuccess(v -> log.info("ACL CREATED: [{}]", aclString)); + } + + public Mono deleteAcl(KafkaCluster cluster, AclBinding aclBinding) { + var aclString = AclCsv.createAclString(aclBinding); + log.info("DELETING ACL: [{}]", aclString); + return adminClientService.get(cluster) + .flatMap(ac -> ac.deleteAcls(List.of(aclBinding))) + .doOnSuccess(v -> log.info("ACL DELETED: [{}]", aclString)); + } + + public Flux listAcls(KafkaCluster cluster) { + return adminClientService.get(cluster) + .flatMap(ReactiveAdminClient::listAcls) + .flatMapIterable(acls -> acls); + } + + public Mono getAclAsCsvString(KafkaCluster cluster) { + return adminClientService.get(cluster) + .flatMap(ReactiveAdminClient::listAcls) + .map(AclCsv::transformToCsvString); + } + + public Mono syncAclWithAclCsv(KafkaCluster cluster, String csv) { + return adminClientService.get(cluster) + .flatMap(ac -> ac.listAcls().flatMap(existingAclList -> { + var existingSet = Set.copyOf(existingAclList); + var newAcls = Set.copyOf(AclCsv.parseCsv(csv)); + var toDelete = Sets.difference(existingSet, newAcls); + var toAdd = Sets.difference(newAcls, existingSet); + logAclSyncPlan(cluster, toAdd, toDelete); + if (toAdd.isEmpty() && toDelete.isEmpty()) { + return Mono.empty(); + } + log.info("Starting new ACLs creation"); + return ac.createAcls(toAdd) + .doOnSuccess(v -> { + log.info("{} new ACLs created", toAdd.size()); + log.info("Starting ACLs deletion"); + }) + .then(ac.deleteAcls(toDelete) + .doOnSuccess(v -> log.info("{} ACLs deleted", toDelete.size()))); + })); + } + + private void logAclSyncPlan(KafkaCluster cluster, Set toBeAdded, Set toBeDeleted) { + log.info("'{}' cluster ACL sync plan: ", cluster.getName()); + if (toBeAdded.isEmpty() && toBeDeleted.isEmpty()) { + log.info("Nothing to do, ACL is already in sync"); + return; + } + if (!toBeAdded.isEmpty()) { + log.info("ACLs to be added ({}): ", toBeAdded.size()); + for (AclBinding aclBinding : toBeAdded) { + log.info(" " + AclCsv.createAclString(aclBinding)); + } + } + if (!toBeDeleted.isEmpty()) { + log.info("ACLs to be deleted ({}): ", toBeDeleted.size()); + for (AclBinding aclBinding : toBeDeleted) { + log.info(" " + AclCsv.createAclString(aclBinding)); + } + } + } + +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclCsvTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclCsvTest.java new file mode 100644 index 00000000000..c99c51dc6d4 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclCsvTest.java @@ -0,0 +1,68 @@ +package com.provectus.kafka.ui.service.acl; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.Collection; +import java.util.List; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourceType; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +class AclCsvTest { + + private static final List TEST_BINDINGS = List.of( + new AclBinding( + new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL), + new AccessControlEntry("User:test1", "*", AclOperation.READ, AclPermissionType.ALLOW)), + new AclBinding( + new ResourcePattern(ResourceType.GROUP, "group1", PatternType.PREFIXED), + new AccessControlEntry("User:test2", "localhost", AclOperation.DESCRIBE, AclPermissionType.DENY)) + ); + + @ParameterizedTest + @ValueSource(strings = { + "Principal,ResourceType, PatternType, ResourceName,Operation,PermissionType,Host\n" + + "User:test1,TOPIC,LITERAL,*,READ,ALLOW,*\n" + + "User:test2,GROUP,PREFIXED,group1,DESCRIBE,DENY,localhost", + + //without header + "User:test1,TOPIC,LITERAL,*,READ,ALLOW,*\n" + + "\n" + + "User:test2,GROUP,PREFIXED,group1,DESCRIBE,DENY,localhost" + + "\n" + }) + void parsesValidInputCsv(String csvString) { + Collection parsed = AclCsv.parseCsv(csvString); + assertThat(parsed).containsExactlyInAnyOrderElementsOf(TEST_BINDINGS); + } + + @ParameterizedTest + @ValueSource(strings = { + // columns > 7 + "User:test1,TOPIC,LITERAL,*,READ,ALLOW,*,1,2,3,4", + // columns < 7 + "User:test1,TOPIC,LITERAL,*", + // enum values are illegal + "User:test1,ILLEGAL,LITERAL,*,READ,ALLOW,*", + "User:test1,TOPIC,LITERAL,*,READ,ILLEGAL,*" + }) + void throwsExceptionForInvalidInputCsv(String csvString) { + assertThatThrownBy(() -> AclCsv.parseCsv(csvString)).isNotNull(); + } + + @Test + void transformAndParseUseSameFormat() { + String csv = AclCsv.transformToCsvString(TEST_BINDINGS); + Collection parsedBindings = AclCsv.parseCsv(csv); + assertThat(parsedBindings).containsExactlyInAnyOrderElementsOf(TEST_BINDINGS); + } + +} \ No newline at end of file diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 302cf84c338..9222394aa45 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -1776,6 +1776,111 @@ paths: $ref: '#/components/schemas/PartitionsIncreaseResponse' 404: description: Not found + + /api/clusters/{clusterName}/acls: + get: + tags: + - Acls + summary: listKafkaAcls + operationId: listAcls + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/KafkaAcl' + + /api/clusters/{clusterName}/acl/csv: + get: + tags: + - Acls + summary: getAclAsCsv + operationId: getAclAsCsv + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + text/plain: + schema: + type: string + post: + tags: + - Acls + summary: syncAclsCsv + operationId: syncAclsCsv + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + requestBody: + content: + text/plain: + schema: + type: string + responses: + 200: + description: OK + + /api/clusters/{clusterName}/acl: + post: + tags: + - Acls + summary: createAcl + operationId: createAcl + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/KafkaAcl' + responses: + 200: + description: OK + + delete: + tags: + - Acls + summary: deleteAcl + operationId: deleteAcl + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/KafkaAcl' + responses: + 200: + description: OK + 404: + description: Acl not found + /api/info/timestampformat: get: tags: @@ -1903,6 +2008,7 @@ components: - KAFKA_CONNECT - KSQL_DB - TOPIC_DELETION + - KAFKA_ACL required: - id - name @@ -3216,3 +3322,46 @@ components: - COMPACT - COMPACT_DELETE - UNKNOWN + + KafkaAcl: + type: object + required: [resourceType, resourceName, namePatternType, principal, host, operation, permission] + properties: + resourceType: + type: string + enum: + - TOPIC + - GROUP + - CLUSTER + - TRANSACTIONAL_ID + - DELEGATION_TOPIC + resourceName: + type: string # "*" if acl can be applied to any resource of given type + namePatternType: + type: string + enum: + - LITERAL + - PREFIXED + principal: + type: string + host: + type: string # "*" if acl can be applied to any resource of given type + operation: + type: string + enum: + - ALL # Cluster, Topic, Group + - READ # Topic, Group + - WRITE # Topic, TransactionalId + - CREATE # Cluster, Topic + - DELETE # Topic, Group + - ALTER # Cluster, Topic, + - DESCRIBE # Cluster, Topic, Group, TransactionalId, DelegationToken + - CLUSTER_ACTION # Cluster + - DESCRIBE_CONFIGS # Cluster, Topic + - ALTER_CONFIGS # Cluster, Topic + - IDEMPOTENT_WRITE # - + permission: + type: string + enum: + - ALLOW + - DENY \ No newline at end of file From c1d9f0183d24f04f38c42fa8ffbae3b69f7a88db Mon Sep 17 00:00:00 2001 From: iliax Date: Wed, 26 Oct 2022 14:33:32 +0400 Subject: [PATCH 04/14] ISSUE-754: UnsupportedVersionException handling added when trying check is ACL enabled --- .../com/provectus/kafka/ui/service/ReactiveAdminClient.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java index 755874009b8..f68d3d6dcbc 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java @@ -61,6 +61,7 @@ import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.requests.DescribeLogDirsResponse; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -127,7 +128,9 @@ private static Mono> getSupportedUpdateFeaturesForVersion( private static Mono isAuthorizedSecurityEnabled(AdminClient ac, @Nullable Float kafkaVersion) { return toMono(ac.describeAcls(AclBindingFilter.ANY).values()) .thenReturn(true) - .doOnError(th -> !(th instanceof SecurityDisabledException) && !(th instanceof InvalidRequestException), + .doOnError(th -> !(th instanceof SecurityDisabledException) + && !(th instanceof InvalidRequestException) + && !(th instanceof UnsupportedVersionException), th -> log.warn("Error checking if security enabled", th)) .onErrorReturn(false); } From 13af67c6912958256e26ec9553bcdf970b7db945 Mon Sep 17 00:00:00 2001 From: iliax Date: Wed, 18 Jan 2023 15:51:35 +0400 Subject: [PATCH 05/14] syncAclWithAclCsv test added --- .../kafka/ui/service/acl/AclsServiceTest.java | 80 +++++++++++++++++++ 1 file changed, 80 insertions(+) create mode 100644 kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclsServiceTest.java diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclsServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclsServiceTest.java new file mode 100644 index 00000000000..0d4142a4ec9 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclsServiceTest.java @@ -0,0 +1,80 @@ +package com.provectus.kafka.ui.service.acl; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.*; + +import com.provectus.kafka.ui.model.KafkaCluster; +import com.provectus.kafka.ui.service.AdminClientService; +import com.provectus.kafka.ui.service.ReactiveAdminClient; +import java.util.Collection; +import java.util.List; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourceType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import reactor.core.publisher.Mono; + +class AclsServiceTest { + + private static final KafkaCluster CLUSTER = KafkaCluster.builder().build(); + + private final ReactiveAdminClient adminClientMock = mock(ReactiveAdminClient.class); + private final AdminClientService adminClientService = mock(AdminClientService.class); + + private final AclsService aclsService = new AclsService(adminClientService); + + @BeforeEach + void initMocks() { + when(adminClientService.get(CLUSTER)).thenReturn(Mono.just(adminClientMock)); + } + + @Test + void testSyncAclWithAclCsv() { + var existingBinding1 = new AclBinding( + new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL), + new AccessControlEntry("User:test1", "*", AclOperation.READ, AclPermissionType.ALLOW)); + + var existingBinding2 = new AclBinding( + new ResourcePattern(ResourceType.GROUP, "group1", PatternType.PREFIXED), + new AccessControlEntry("User:test2", "localhost", AclOperation.DESCRIBE, AclPermissionType.DENY)); + + var newBindingToBeAdded = new AclBinding( + new ResourcePattern(ResourceType.GROUP, "groupNew", PatternType.PREFIXED), + new AccessControlEntry("User:test3", "localhost", AclOperation.DESCRIBE, AclPermissionType.DENY)); + + when(adminClientMock.listAcls()) + .thenReturn(Mono.just(List.of(existingBinding1, existingBinding2))); + + ArgumentCaptor createdCaptor = ArgumentCaptor.forClass(Collection.class); + when(adminClientMock.createAcls((Collection) createdCaptor.capture())) + .thenReturn(Mono.empty()); + + ArgumentCaptor deletedCaptor = ArgumentCaptor.forClass(Collection.class); + when(adminClientMock.deleteAcls((Collection) deletedCaptor.capture())) + .thenReturn(Mono.empty()); + + aclsService.syncAclWithAclCsv( + CLUSTER, + "Principal,ResourceType, PatternType, ResourceName,Operation,PermissionType,Host\n" + + "User:test1,TOPIC,LITERAL,*,READ,ALLOW,*\n" + + "User:test3,GROUP,PREFIXED,groupNew,DESCRIBE,DENY,localhost" + ).block(); + + Collection createdBindings = (Collection) createdCaptor.getValue(); + assertThat(createdBindings) + .hasSize(1) + .contains(newBindingToBeAdded); + + Collection deletedBindings = (Collection) deletedCaptor.getValue(); + assertThat(deletedBindings) + .hasSize(1) + .contains(existingBinding2); + } + +} \ No newline at end of file From 0db0af05677936fe1cf161da639a53d4d00caf5a Mon Sep 17 00:00:00 2001 From: iliax Date: Wed, 18 Jan 2023 16:40:58 +0400 Subject: [PATCH 06/14] checkstyle fix --- .../com/provectus/kafka/ui/service/ReactiveAdminClient.java | 5 +---- .../com/provectus/kafka/ui/service/acl/AclsServiceTest.java | 5 +++-- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java index c6fed0f4aa8..6c2ef63091e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java @@ -4,12 +4,10 @@ import static java.util.stream.Collectors.toMap; import static org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableTable; import com.google.common.collect.Iterables; import com.google.common.collect.Table; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterators; import com.provectus.kafka.ui.exception.IllegalEntityStateException; import com.provectus.kafka.ui.exception.NotFoundException; import com.provectus.kafka.ui.exception.ValidationException; @@ -17,7 +15,6 @@ import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant; import java.io.Closeable; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclsServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclsServiceTest.java index 0d4142a4ec9..a5fbfd58c16 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclsServiceTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclsServiceTest.java @@ -1,7 +1,8 @@ package com.provectus.kafka.ui.service.acl; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.service.AdminClientService; @@ -77,4 +78,4 @@ void testSyncAclWithAclCsv() { .contains(existingBinding2); } -} \ No newline at end of file +} From cb17449407f6958355a3886c162e9dc9a117de4d Mon Sep 17 00:00:00 2001 From: iliax Date: Wed, 18 Jan 2023 17:07:51 +0400 Subject: [PATCH 07/14] RBAC integration added --- .../kafka/ui/controller/AclsController.java | 57 ++++++++++++++++--- .../kafka/ui/model/rbac/AccessContext.java | 12 +++- .../kafka/ui/model/rbac/Resource.java | 3 +- .../ui/model/rbac/permission/AclAction.java | 15 +++++ .../main/resources/swagger/kafka-ui-api.yaml | 1 + 5 files changed, 77 insertions(+), 11 deletions(-) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/AclAction.java diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java index 7d8fa9812da..4ccb3373c92 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java @@ -3,7 +3,10 @@ import com.provectus.kafka.ui.api.AclsApi; import com.provectus.kafka.ui.mapper.ClusterMapper; import com.provectus.kafka.ui.model.KafkaAclDTO; +import com.provectus.kafka.ui.model.rbac.AccessContext; +import com.provectus.kafka.ui.model.rbac.permission.AclAction; import com.provectus.kafka.ui.service.acl.AclsService; +import com.provectus.kafka.ui.service.rbac.AccessControlService; import lombok.RequiredArgsConstructor; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.RestController; @@ -16,11 +19,19 @@ public class AclsController extends AbstractController implements AclsApi { private final AclsService aclsService; + private final AccessControlService accessControlService; @Override public Mono> createAcl(String clusterName, Mono kafkaAclDto, ServerWebExchange exchange) { - return kafkaAclDto.map(ClusterMapper::toAclBinding) + AccessContext context = AccessContext.builder() + .cluster(clusterName) + .aclActions(AclAction.EDIT) + .build(); + + return accessControlService.validateAccess(context) + .then(kafkaAclDto) + .map(ClusterMapper::toAclBinding) .flatMap(binding -> aclsService.createAcl(getCluster(clusterName), binding)) .thenReturn(ResponseEntity.ok().build()); } @@ -28,28 +39,56 @@ public Mono> createAcl(String clusterName, Mono> deleteAcl(String clusterName, Mono kafkaAclDto, ServerWebExchange exchange) { - return kafkaAclDto.map(ClusterMapper::toAclBinding) + AccessContext context = AccessContext.builder() + .cluster(clusterName) + .aclActions(AclAction.EDIT) + .build(); + + return accessControlService.validateAccess(context) + .then(kafkaAclDto) + .map(ClusterMapper::toAclBinding) .flatMap(binding -> aclsService.deleteAcl(getCluster(clusterName), binding)) .thenReturn(ResponseEntity.ok().build()); } @Override public Mono>> listAcls(String clusterName, ServerWebExchange exchange) { - return Mono.just( - ResponseEntity.ok( - aclsService.listAcls(getCluster(clusterName)).map(ClusterMapper::toKafkaAclDto))); + AccessContext context = AccessContext.builder() + .cluster(clusterName) + .aclActions(AclAction.VIEW) + .build(); + + return accessControlService.validateAccess(context).then( + Mono.just( + ResponseEntity.ok( + aclsService.listAcls(getCluster(clusterName)).map(ClusterMapper::toKafkaAclDto))) + ); } @Override public Mono> getAclAsCsv(String clusterName, ServerWebExchange exchange) { - return aclsService.getAclAsCsvString(getCluster(clusterName)) - .map(ResponseEntity::ok) - .flatMap(Mono::just); + AccessContext context = AccessContext.builder() + .cluster(clusterName) + .aclActions(AclAction.VIEW) + .build(); + + return accessControlService.validateAccess(context).then( + aclsService.getAclAsCsvString(getCluster(clusterName)) + .map(ResponseEntity::ok) + .flatMap(Mono::just) + ); } @Override public Mono> syncAclsCsv(String clusterName, Mono csvMono, ServerWebExchange exchange) { - return csvMono.flatMap(csv -> aclsService.syncAclWithAclCsv(getCluster(clusterName), csv)) + AccessContext context = AccessContext.builder() + .cluster(clusterName) + .aclActions(AclAction.EDIT) + .build(); + + return accessControlService.validateAccess(context) + .then(csvMono) + .flatMap(csv -> aclsService.syncAclWithAclCsv(getCluster(clusterName), csv)) .thenReturn(ResponseEntity.ok().build()); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/AccessContext.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/AccessContext.java index abe18fb9668..bed7747e8d5 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/AccessContext.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/AccessContext.java @@ -1,5 +1,6 @@ package com.provectus.kafka.ui.model.rbac; +import com.provectus.kafka.ui.model.rbac.permission.AclAction; import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction; import com.provectus.kafka.ui.model.rbac.permission.ConnectAction; import com.provectus.kafka.ui.model.rbac.permission.ConsumerGroupAction; @@ -34,6 +35,8 @@ public class AccessContext { Collection ksqlActions; + Collection aclActions; + public static AccessContextBuilder builder() { return new AccessContextBuilder(); } @@ -51,6 +54,7 @@ public static final class AccessContextBuilder { private String schema; private Collection schemaActions = Collections.emptySet(); private Collection ksqlActions = Collections.emptySet(); + private Collection aclActions = Collections.emptySet(); private AccessContextBuilder() { } @@ -121,6 +125,12 @@ public AccessContextBuilder ksqlActions(KsqlAction... actions) { return this; } + public AccessContextBuilder aclActions(AclAction... actions) { + Assert.isTrue(actions.length > 0, "actions not present"); + this.aclActions = List.of(actions); + return this; + } + public AccessContext build() { return new AccessContext(cluster, clusterConfigActions, topic, topicActions, @@ -128,7 +138,7 @@ public AccessContext build() { connect, connectActions, connector, schema, schemaActions, - ksqlActions); + ksqlActions, aclActions); } } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Resource.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Resource.java index 3dafd7e6b24..4f8e30f2080 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Resource.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Resource.java @@ -10,7 +10,8 @@ public enum Resource { CONSUMER, SCHEMA, CONNECT, - KSQL; + KSQL, + ACL; @Nullable public static Resource fromString(String name) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/AclAction.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/AclAction.java new file mode 100644 index 00000000000..c86af7e72d2 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/AclAction.java @@ -0,0 +1,15 @@ +package com.provectus.kafka.ui.model.rbac.permission; + +import org.apache.commons.lang3.EnumUtils; +import org.jetbrains.annotations.Nullable; + +public enum AclAction implements PermissibleAction { + + VIEW, + EDIT; + + @Nullable + public static AclAction fromString(String name) { + return EnumUtils.getEnum(AclAction.class, name); + } +} diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index fce40b6f47e..25fdd25da86 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -3316,6 +3316,7 @@ components: - SCHEMA - CONNECT - KSQL + - ACL KafkaAcl: type: object From ffff9640073fc5a09a51da96b76b785f555859f6 Mon Sep 17 00:00:00 2001 From: iliax Date: Tue, 14 Mar 2023 00:04:29 +0400 Subject: [PATCH 08/14] PR updated --- .../kafka/ui/mapper/ClusterMapper.java | 35 ++++++++++++++++--- .../kafka/ui/model/rbac/AccessContext.java | 2 +- .../kafka/ui/model/rbac/Permission.java | 2 ++ .../kafka/ui/service/FeatureService.java | 2 +- .../kafka/ui/service/ReactiveAdminClient.java | 12 +------ .../provectus/kafka/ui/util/KafkaVersion.java | 11 +++--- .../main/resources/swagger/kafka-ui-api.yaml | 9 +++-- 7 files changed, 47 insertions(+), 26 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java index b8721f63320..5bd817ee40b 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java @@ -117,8 +117,35 @@ default BrokerDiskUsageDTO map(Integer id, InternalBrokerDiskUsage internalBroke return brokerDiskUsage; } - default DataMasking map(List maskingProperties) { - return DataMasking.create(maskingProperties); + static KafkaAclDTO.OperationEnum mapAclOperation(AclOperation operation) { + return switch (operation) { + case ALL -> KafkaAclDTO.OperationEnum.ALL; + case READ -> KafkaAclDTO.OperationEnum.READ; + case WRITE -> KafkaAclDTO.OperationEnum.WRITE; + case CREATE -> KafkaAclDTO.OperationEnum.CREATE; + case DELETE -> KafkaAclDTO.OperationEnum.DELETE; + case ALTER -> KafkaAclDTO.OperationEnum.ALTER; + case DESCRIBE -> KafkaAclDTO.OperationEnum.DESCRIBE; + case CLUSTER_ACTION -> KafkaAclDTO.OperationEnum.CLUSTER_ACTION; + case DESCRIBE_CONFIGS -> KafkaAclDTO.OperationEnum.DESCRIBE_CONFIGS; + case ALTER_CONFIGS -> KafkaAclDTO.OperationEnum.ALTER_CONFIGS; + case IDEMPOTENT_WRITE -> KafkaAclDTO.OperationEnum.IDEMPOTENT_WRITE; + case CREATE_TOKENS -> KafkaAclDTO.OperationEnum.CREATE_TOKENS; + case DESCRIBE_TOKENS -> KafkaAclDTO.OperationEnum.DESCRIBE_TOKENS; + case ANY, UNKNOWN -> KafkaAclDTO.OperationEnum.UNKNOWN; + }; + } + + static KafkaAclDTO.ResourceTypeEnum mapAclResourceType(ResourceType resourceType) { + return switch (resourceType) { + case CLUSTER -> KafkaAclDTO.ResourceTypeEnum.CLUSTER; + case TOPIC -> KafkaAclDTO.ResourceTypeEnum.TOPIC; + case GROUP -> KafkaAclDTO.ResourceTypeEnum.GROUP; + case DELEGATION_TOKEN -> KafkaAclDTO.ResourceTypeEnum.DELEGATION_TOKEN; + case TRANSACTIONAL_ID -> KafkaAclDTO.ResourceTypeEnum.TRANSACTIONAL_ID; + case USER -> KafkaAclDTO.ResourceTypeEnum.USER; + case ANY, UNKNOWN -> KafkaAclDTO.ResourceTypeEnum.UNKNOWN; + }; } static AclBinding toAclBinding(KafkaAclDTO dto) { @@ -141,12 +168,12 @@ static KafkaAclDTO toKafkaAclDto(AclBinding binding) { var pattern = binding.pattern(); var filter = binding.toFilter().entryFilter(); return new KafkaAclDTO() - .resourceType(KafkaAclDTO.ResourceTypeEnum.fromValue(pattern.resourceType().name())) + .resourceType(mapAclResourceType(pattern.resourceType())) .resourceName(pattern.name()) .namePatternType(KafkaAclDTO.NamePatternTypeEnum.fromValue(pattern.patternType().name())) .principal(filter.principal()) .host(filter.host()) - .operation(KafkaAclDTO.OperationEnum.fromValue(filter.operation().name())) + .operation(mapAclOperation(filter.operation())) .permission(KafkaAclDTO.PermissionEnum.fromValue(filter.permissionType().name())); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/AccessContext.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/AccessContext.java index ff2f3f1e979..45858093a7f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/AccessContext.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/AccessContext.java @@ -1,7 +1,7 @@ package com.provectus.kafka.ui.model.rbac; -import com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction; import com.provectus.kafka.ui.model.rbac.permission.AclAction; +import com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction; import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction; import com.provectus.kafka.ui.model.rbac.permission.ConnectAction; import com.provectus.kafka.ui.model.rbac.permission.ConsumerGroupAction; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Permission.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Permission.java index 837f9008f3e..9b0b4c163e6 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Permission.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Permission.java @@ -3,6 +3,7 @@ import static com.provectus.kafka.ui.model.rbac.Resource.CLUSTERCONFIG; import static com.provectus.kafka.ui.model.rbac.Resource.KSQL; +import com.provectus.kafka.ui.model.rbac.permission.AclAction; import com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction; import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction; import com.provectus.kafka.ui.model.rbac.permission.ConnectAction; @@ -73,6 +74,7 @@ private List getAllActionValues() { case SCHEMA -> Arrays.stream(SchemaAction.values()).map(Enum::toString).toList(); case CONNECT -> Arrays.stream(ConnectAction.values()).map(Enum::toString).toList(); case KSQL -> Arrays.stream(KsqlAction.values()).map(Enum::toString).toList(); + case ACL -> Arrays.stream(AclAction.values()).map(Enum::toString).toList(); }; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java index e08b5a746a7..c3d77eb87ae 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java @@ -68,7 +68,7 @@ private Mono isTopicDeletionEnabled(KafkaCluster cluster, Node controll private Mono acl(KafkaCluster cluster) { return adminClientService.get(cluster).flatMap( - ac -> ac.getClusterFeatures().contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED) + ac -> ac.getClusterFeatures().contains(ReactiveAdminClient.SupportedFeature.AUTHORIZED_SECURITY_ENABLED) ? Mono.just(ClusterFeature.KAFKA_ACL) : Mono.empty() ); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java index e8bca34807c..ce08f9062e6 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java @@ -107,10 +107,6 @@ static Mono> forVersion(AdminClient ac, @Nullable Float ka .map(Tuple2::getT1) .collect(Collectors.toSet()); } - - static Set defaultFeatures() { - return Set.of(); - } } @Value @@ -131,13 +127,7 @@ public static Mono create(AdminClient adminClient) { } private static Mono> getSupportedUpdateFeaturesForVersion(AdminClient ac, String versionStr) { - Float kafkaVersion = null; - try { - float version = KafkaVersion.parse(versionStr); - return SupportedFeature.forVersion(version); - } catch (NumberFormatException e) { - return SupportedFeature.defaultFeatures(); - } + @Nullable Float kafkaVersion = KafkaVersion.parse(versionStr).orElse(null); return SupportedFeature.forVersion(ac, kafkaVersion); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaVersion.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaVersion.java index 48ff7ff1214..fdbe96e88f9 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaVersion.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaVersion.java @@ -1,23 +1,20 @@ package com.provectus.kafka.ui.util; -import lombok.experimental.UtilityClass; +import java.util.Optional; import lombok.extern.slf4j.Slf4j; -@UtilityClass @Slf4j public class KafkaVersion { - public static float parse(String version) throws NumberFormatException { - log.trace("Parsing cluster version [{}]", version); + public static Optional parse(String version) throws NumberFormatException { try { final String[] parts = version.split("\\."); if (parts.length > 2) { version = parts[0] + "." + parts[1]; } - return Float.parseFloat(version.split("-")[0]); + return Optional.of(Float.parseFloat(version.split("-")[0])); } catch (Exception e) { - log.error("Conversion clusterVersion [{}] to float value failed", version, e); - throw e; + return Optional.empty(); } } } diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 1f994b0affa..e5bf7a3e2c4 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -3420,11 +3420,13 @@ components: resourceType: type: string enum: + - UNKNOWN # Unknown operation, need to update mapping code on BE - TOPIC - GROUP - CLUSTER - TRANSACTIONAL_ID - - DELEGATION_TOPIC + - DELEGATION_TOKEN + - USER resourceName: type: string # "*" if acl can be applied to any resource of given type namePatternType: @@ -3439,6 +3441,7 @@ components: operation: type: string enum: + - UNKNOWN # Unknown operation, need to update mapping code on BE - ALL # Cluster, Topic, Group - READ # Topic, Group - WRITE # Topic, TransactionalId @@ -3449,7 +3452,9 @@ components: - CLUSTER_ACTION # Cluster - DESCRIBE_CONFIGS # Cluster, Topic - ALTER_CONFIGS # Cluster, Topic - - IDEMPOTENT_WRITE # - + - IDEMPOTENT_WRITE # Cluster + - CREATE_TOKENS + - DESCRIBE_TOKENS permission: type: string enum: From 297a1d96e1231c48881c61f68fff18674bc2d80c Mon Sep 17 00:00:00 2001 From: iliax Date: Tue, 14 Mar 2023 16:42:27 +0400 Subject: [PATCH 09/14] wip --- documentation/compose/jaas/kafka_server.conf | 6 +++- .../compose/jaas/zookeeper_jaas.conf | 6 ++-- documentation/compose/kafka-ui-sasl.yaml | 19 +++++++--- .../kafka/ui/mapper/ClusterMapper.java | 7 ++-- .../kafka/ui/model/ClusterFeature.java | 3 +- .../kafka/ui/service/FeatureService.java | 35 ++++++++++++------- .../kafka/ui/service/StatisticsService.java | 2 +- .../main/resources/swagger/kafka-ui-api.yaml | 3 +- 8 files changed, 53 insertions(+), 28 deletions(-) diff --git a/documentation/compose/jaas/kafka_server.conf b/documentation/compose/jaas/kafka_server.conf index 25388be5aa8..0c1fb34652a 100644 --- a/documentation/compose/jaas/kafka_server.conf +++ b/documentation/compose/jaas/kafka_server.conf @@ -11,4 +11,8 @@ KafkaClient { user_admin="admin-secret"; }; -Client {}; +Client { + org.apache.zookeeper.server.auth.DigestLoginModule required + username="zkuser" + password="zkuserpassword"; +}; diff --git a/documentation/compose/jaas/zookeeper_jaas.conf b/documentation/compose/jaas/zookeeper_jaas.conf index 2b1754fba0e..2d7fd1b1c29 100644 --- a/documentation/compose/jaas/zookeeper_jaas.conf +++ b/documentation/compose/jaas/zookeeper_jaas.conf @@ -1,4 +1,4 @@ Server { - org.apache.zookeeper.server.auth.DigestLoginModule required - user_admin="admin-secret"; -}; \ No newline at end of file + org.apache.zookeeper.server.auth.DigestLoginModule required + user_zkuser="zkuserpassword"; +}; diff --git a/documentation/compose/kafka-ui-sasl.yaml b/documentation/compose/kafka-ui-sasl.yaml index e4a2b3cc4a7..ebc920cf38f 100644 --- a/documentation/compose/kafka-ui-sasl.yaml +++ b/documentation/compose/kafka-ui-sasl.yaml @@ -8,15 +8,26 @@ services: ports: - 8080:8080 depends_on: + - zookeeper - kafka environment: KAFKA_CLUSTERS_0_NAME: local KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL: SASL_PLAINTEXT KAFKA_CLUSTERS_0_PROPERTIES_SASL_MECHANISM: PLAIN +# KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="enzo" password="cisternino";' KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret";' DYNAMIC_CONFIG_ENABLED: true # not necessary for sasl auth, added for tests + zookeeper: + image: wurstmeister/zookeeper:3.4.6 + environment: + JVMFLAGS: "-Djava.security.auth.login.config=/etc/zookeeper/zookeeper_jaas.conf" + volumes: + - ./jaas/zookeeper_jaas.conf:/etc/zookeeper/zookeeper_jaas.conf + ports: + - 2181:2181 + kafka: image: confluentinc/cp-kafka:7.2.1 hostname: kafka @@ -26,27 +37,25 @@ services: - "9997:9997" environment: KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' KAFKA_ADVERTISED_LISTENERS: 'SASL_PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092' KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/jaas/kafka_server.conf" + KAFKA_AUTHORIZER_CLASS_NAME: "kafka.security.authorizer.AclAuthorizer" KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_JMX_PORT: 9997 KAFKA_JMX_HOSTNAME: localhost - KAFKA_PROCESS_ROLES: 'broker,controller' KAFKA_NODE_ID: 1 KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093' KAFKA_LISTENERS: 'SASL_PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://0.0.0.0:9092' KAFKA_INTER_BROKER_LISTENER_NAME: 'SASL_PLAINTEXT' KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN' KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: 'PLAIN' - KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' - KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' KAFKA_SECURITY_PROTOCOL: 'SASL_PLAINTEXT' - KAFKA_SUPER_USERS: 'User:admin,User:enzo' + KAFKA_SUPER_USERS: 'User:admin' volumes: - ./scripts/update_run.sh:/tmp/update_run.sh - ./jaas:/etc/kafka/jaas - command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'" diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java index 5bd817ee40b..4d438ae4e9c 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java @@ -28,7 +28,6 @@ import com.provectus.kafka.ui.model.TopicConfigDTO; import com.provectus.kafka.ui.model.TopicDTO; import com.provectus.kafka.ui.model.TopicDetailsDTO; -import com.provectus.kafka.ui.service.masking.DataMasking; import com.provectus.kafka.ui.service.metrics.RawMetric; import java.util.List; import java.util.Map; @@ -132,7 +131,8 @@ static KafkaAclDTO.OperationEnum mapAclOperation(AclOperation operation) { case IDEMPOTENT_WRITE -> KafkaAclDTO.OperationEnum.IDEMPOTENT_WRITE; case CREATE_TOKENS -> KafkaAclDTO.OperationEnum.CREATE_TOKENS; case DESCRIBE_TOKENS -> KafkaAclDTO.OperationEnum.DESCRIBE_TOKENS; - case ANY, UNKNOWN -> KafkaAclDTO.OperationEnum.UNKNOWN; + case ANY -> throw new IllegalArgumentException("ANY operation can be only part of filter"); + case UNKNOWN -> KafkaAclDTO.OperationEnum.UNKNOWN; }; } @@ -144,7 +144,8 @@ static KafkaAclDTO.ResourceTypeEnum mapAclResourceType(ResourceType resourceType case DELEGATION_TOKEN -> KafkaAclDTO.ResourceTypeEnum.DELEGATION_TOKEN; case TRANSACTIONAL_ID -> KafkaAclDTO.ResourceTypeEnum.TRANSACTIONAL_ID; case USER -> KafkaAclDTO.ResourceTypeEnum.USER; - case ANY, UNKNOWN -> KafkaAclDTO.ResourceTypeEnum.UNKNOWN; + case ANY -> throw new IllegalArgumentException("ANY type can be only part of filter"); + case UNKNOWN -> KafkaAclDTO.ResourceTypeEnum.UNKNOWN; }; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ClusterFeature.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ClusterFeature.java index 9ed7a38bb29..2973e5500d9 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ClusterFeature.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ClusterFeature.java @@ -5,5 +5,6 @@ public enum ClusterFeature { KSQL_DB, SCHEMA_REGISTRY, TOPIC_DELETION, - KAFKA_ACL + KAFKA_ACL_VIEW, + KAFKA_ACL_EDIT } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java index c3d77eb87ae..e6c2c2f06c7 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java @@ -2,6 +2,7 @@ import com.provectus.kafka.ui.model.ClusterFeature; import com.provectus.kafka.ui.model.KafkaCluster; +import com.provectus.kafka.ui.service.ReactiveAdminClient.ClusterDescription; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -12,6 +13,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.Node; +import org.apache.kafka.common.acl.AclOperation; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -25,7 +27,7 @@ public class FeatureService { private final AdminClientService adminClientService; - public Mono> getAvailableFeatures(KafkaCluster cluster, @Nullable Node controller) { + public Mono> getAvailableFeatures(KafkaCluster cluster, ClusterDescription clusterDescription) { List> features = new ArrayList<>(); if (Optional.ofNullable(cluster.getConnectsClients()) @@ -42,19 +44,17 @@ public Mono> getAvailableFeatures(KafkaCluster cluster, @Nu features.add(Mono.just(ClusterFeature.SCHEMA_REGISTRY)); } - if (controller != null) { - features.add( - isTopicDeletionEnabled(cluster, controller) - .flatMap(r -> Boolean.TRUE.equals(r) ? Mono.just(ClusterFeature.TOPIC_DELETION) : Mono.empty()) - ); - } - - features.add(acl(cluster)); + features.add(topicDeletionEnabled(cluster, clusterDescription.getController())); + features.add(aclView(cluster)); + features.add(aclEdit(clusterDescription)); return Flux.fromIterable(features).flatMap(m -> m).collectList(); } - private Mono isTopicDeletionEnabled(KafkaCluster cluster, Node controller) { + private Mono topicDeletionEnabled(KafkaCluster cluster, @Nullable Node controller) { + if (controller == null) { + return Mono.empty(); + } return adminClientService.get(cluster) .flatMap(ac -> ac.loadBrokersConfig(List.of(controller.id()))) .map(config -> @@ -63,13 +63,22 @@ private Mono isTopicDeletionEnabled(KafkaCluster cluster, Node controll .filter(e -> e.name().equals(DELETE_TOPIC_ENABLED_SERVER_PROPERTY)) .map(e -> Boolean.parseBoolean(e.value())) .findFirst() - .orElse(true)); + .orElse(true)) + .flatMap(enabled -> enabled ? Mono.just(ClusterFeature.TOPIC_DELETION) : Mono.empty()); + } + + private Mono aclEdit(ClusterDescription clusterDescription) { + var authorizedOps = clusterDescription.getAuthorizedOperations(); + boolean canEdit = authorizedOps.contains(AclOperation.ALL) || authorizedOps.contains(AclOperation.ALTER); + return canEdit + ? Mono.just(ClusterFeature.KAFKA_ACL_EDIT) + : Mono.empty(); } - private Mono acl(KafkaCluster cluster) { + private Mono aclView(KafkaCluster cluster) { return adminClientService.get(cluster).flatMap( ac -> ac.getClusterFeatures().contains(ReactiveAdminClient.SupportedFeature.AUTHORIZED_SECURITY_ENABLED) - ? Mono.just(ClusterFeature.KAFKA_ACL) + ? Mono.just(ClusterFeature.KAFKA_ACL_VIEW) : Mono.empty() ); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/StatisticsService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/StatisticsService.java index a36a64ff6dc..994c30714ae 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/StatisticsService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/StatisticsService.java @@ -41,7 +41,7 @@ private Mono getStatistics(KafkaCluster cluster) { List.of( metricsCollector.getBrokerMetrics(cluster, description.getNodes()), getLogDirInfo(description, ac), - featureService.getAvailableFeatures(cluster, description.getController()), + featureService.getAvailableFeatures(cluster, description), loadTopicConfigs(cluster), describeTopics(cluster)), results -> diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index e5bf7a3e2c4..5710dfc7af3 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -2056,7 +2056,8 @@ components: - KAFKA_CONNECT - KSQL_DB - TOPIC_DELETION - - KAFKA_ACL + - KAFKA_ACL_VIEW # get ACLs listing + - KAFKA_ACL_EDIT # create & delete ACLs required: - id - name From be26f86d5094c1437ffd36bc68e13cdf07c70ce5 Mon Sep 17 00:00:00 2001 From: iliax Date: Tue, 14 Mar 2023 16:59:57 +0400 Subject: [PATCH 10/14] wip --- .../com/provectus/kafka/ui/service/FeatureService.java | 9 ++++++--- .../provectus/kafka/ui/service/ReactiveAdminClient.java | 1 + 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java index e6c2c2f06c7..7749849ab14 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java @@ -8,6 +8,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.function.Predicate; import javax.annotation.Nullable; import lombok.RequiredArgsConstructor; @@ -53,7 +54,7 @@ public Mono> getAvailableFeatures(KafkaCluster cluster, Clu private Mono topicDeletionEnabled(KafkaCluster cluster, @Nullable Node controller) { if (controller == null) { - return Mono.empty(); + return Mono.just(ClusterFeature.TOPIC_DELETION); // assuming it is enabled by default } return adminClientService.get(cluster) .flatMap(ac -> ac.loadBrokersConfig(List.of(controller.id()))) @@ -64,11 +65,13 @@ private Mono topicDeletionEnabled(KafkaCluster cluster, @Nullabl .map(e -> Boolean.parseBoolean(e.value())) .findFirst() .orElse(true)) - .flatMap(enabled -> enabled ? Mono.just(ClusterFeature.TOPIC_DELETION) : Mono.empty()); + .flatMap(enabled -> enabled + ? Mono.just(ClusterFeature.TOPIC_DELETION) + : Mono.empty()); } private Mono aclEdit(ClusterDescription clusterDescription) { - var authorizedOps = clusterDescription.getAuthorizedOperations(); + var authorizedOps = Optional.ofNullable(clusterDescription.getAuthorizedOperations()).orElse(Set.of()); boolean canEdit = authorizedOps.contains(AclOperation.ALL) || authorizedOps.contains(AclOperation.ALTER); return canEdit ? Mono.just(ClusterFeature.KAFKA_ACL_EDIT) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java index ce08f9062e6..599e19981bb 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java @@ -115,6 +115,7 @@ public static class ClusterDescription { Node controller; String clusterId; Collection nodes; + @Nullable // null, if ACL is disabled Set authorizedOperations; } From 8e5cf2b8165169a94687d1c54d16ec02737b26f6 Mon Sep 17 00:00:00 2001 From: iliax Date: Tue, 14 Mar 2023 17:02:33 +0400 Subject: [PATCH 11/14] wip --- .../kafka/ui/service/metrics/JmxSslSocketFactory.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxSslSocketFactory.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxSslSocketFactory.java index 06304365c79..fa84fc361c4 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxSslSocketFactory.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxSslSocketFactory.java @@ -61,7 +61,9 @@ class JmxSslSocketFactory extends javax.net.ssl.SSLSocketFactory { } catch (Exception e) { log.error("----------------------------------"); log.error("SSL can't be enabled for JMX retrieval. " - + "Make sure your java app run with '--add-opens java.rmi/javax.rmi.ssl=ALL-UNNAMED' arg.", e); + + "Make sure your java app run with '--add-opens java.rmi/javax.rmi.ssl=ALL-UNNAMED' arg. Err: {}", + e.getMessage()); + log.trace("SSL can't be enabled for JMX retrieval", e); log.error("----------------------------------"); } SSL_JMX_SUPPORTED = sslJmxSupported; From d323b89641c62e81910d5b7b3b016c7fc8d72ccc Mon Sep 17 00:00:00 2001 From: iliax Date: Mon, 3 Apr 2023 11:47:42 +0400 Subject: [PATCH 12/14] minor fixes --- .../java/com/provectus/kafka/ui/service/acl/AclCsv.java | 2 +- .../java/com/provectus/kafka/ui/service/acl/AclCsvTest.java | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclCsv.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclCsv.java index 821627274c8..673b17ee1f8 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclCsv.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclCsv.java @@ -65,7 +65,7 @@ private static AclBinding parseCsvLine(String csv, int line) { public static Collection parseCsv(String csvString) { String[] lines = csvString.split(LINE_SEPARATOR); if (lines.length == 0) { - throw new ValidationException("Error parsing ACL csv file: "); + throw new ValidationException("Error parsing ACL csv file: no lines in file"); } boolean firstLineIsHeader = HEADER.equalsIgnoreCase(lines[0].trim().replace(" ", "")); Set result = new HashSet<>(); diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclCsvTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclCsvTest.java index c99c51dc6d4..08ca4d15073 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclCsvTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclCsvTest.java @@ -3,6 +3,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import com.provectus.kafka.ui.exception.ValidationException; import java.util.Collection; import java.util.List; import org.apache.kafka.common.acl.AccessControlEntry; @@ -55,7 +56,8 @@ void parsesValidInputCsv(String csvString) { "User:test1,TOPIC,LITERAL,*,READ,ILLEGAL,*" }) void throwsExceptionForInvalidInputCsv(String csvString) { - assertThatThrownBy(() -> AclCsv.parseCsv(csvString)).isNotNull(); + assertThatThrownBy(() -> AclCsv.parseCsv(csvString)) + .isInstanceOf(ValidationException.class); } @Test @@ -65,4 +67,4 @@ void transformAndParseUseSameFormat() { assertThat(parsedBindings).containsExactlyInAnyOrderElementsOf(TEST_BINDINGS); } -} \ No newline at end of file +} From 807ca5ae9652fd51383c7f47e20f602ed584e51a Mon Sep 17 00:00:00 2001 From: iliax Date: Fri, 21 Apr 2023 12:39:09 +0400 Subject: [PATCH 13/14] filter params added to acls endpoint --- .../kafka/ui/controller/AclsController.java | 25 +++++++++- .../kafka/ui/mapper/ClusterMapper.java | 32 ++++++++----- .../kafka/ui/service/ReactiveAdminClient.java | 7 ++- .../kafka/ui/service/acl/AclsService.java | 10 ++-- .../kafka/ui/service/acl/AclsServiceTest.java | 3 +- .../main/resources/swagger/kafka-ui-api.yaml | 48 ++++++++++++++----- 6 files changed, 91 insertions(+), 34 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java index 4ccb3373c92..83d2ef553e8 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java @@ -3,11 +3,17 @@ import com.provectus.kafka.ui.api.AclsApi; import com.provectus.kafka.ui.mapper.ClusterMapper; import com.provectus.kafka.ui.model.KafkaAclDTO; +import com.provectus.kafka.ui.model.KafkaAclNamePatternTypeDTO; +import com.provectus.kafka.ui.model.KafkaAclResourceTypeDTO; import com.provectus.kafka.ui.model.rbac.AccessContext; import com.provectus.kafka.ui.model.rbac.permission.AclAction; import com.provectus.kafka.ui.service.acl.AclsService; import com.provectus.kafka.ui.service.rbac.AccessControlService; +import java.util.Optional; import lombok.RequiredArgsConstructor; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePatternFilter; +import org.apache.kafka.common.resource.ResourceType; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.server.ServerWebExchange; @@ -52,16 +58,31 @@ public Mono> deleteAcl(String clusterName, Mono>> listAcls(String clusterName, ServerWebExchange exchange) { + public Mono>> listAcls(String clusterName, + KafkaAclResourceTypeDTO resourceTypeDto, + String resourceName, + KafkaAclNamePatternTypeDTO namePatternTypeDto, + ServerWebExchange exchange) { AccessContext context = AccessContext.builder() .cluster(clusterName) .aclActions(AclAction.VIEW) .build(); + var resourceType = Optional.ofNullable(resourceTypeDto) + .map(ClusterMapper::mapAclResourceTypeDto) + .orElse(ResourceType.ANY); + + var namePatternType = Optional.ofNullable(namePatternTypeDto) + .map(ClusterMapper::mapPatternTypeDto) + .orElse(PatternType.ANY); + + var filter = new ResourcePatternFilter(resourceType, resourceName, namePatternType); + return accessControlService.validateAccess(context).then( Mono.just( ResponseEntity.ok( - aclsService.listAcls(getCluster(clusterName)).map(ClusterMapper::toKafkaAclDto))) + aclsService.listAcls(getCluster(clusterName), filter) + .map(ClusterMapper::toKafkaAclDto))) ); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java index 4d438ae4e9c..a122a269a4e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java @@ -21,6 +21,8 @@ import com.provectus.kafka.ui.model.InternalTopic; import com.provectus.kafka.ui.model.InternalTopicConfig; import com.provectus.kafka.ui.model.KafkaAclDTO; +import com.provectus.kafka.ui.model.KafkaAclNamePatternTypeDTO; +import com.provectus.kafka.ui.model.KafkaAclResourceTypeDTO; import com.provectus.kafka.ui.model.MetricDTO; import com.provectus.kafka.ui.model.Metrics; import com.provectus.kafka.ui.model.PartitionDTO; @@ -136,25 +138,33 @@ static KafkaAclDTO.OperationEnum mapAclOperation(AclOperation operation) { }; } - static KafkaAclDTO.ResourceTypeEnum mapAclResourceType(ResourceType resourceType) { + static KafkaAclResourceTypeDTO mapAclResourceType(ResourceType resourceType) { return switch (resourceType) { - case CLUSTER -> KafkaAclDTO.ResourceTypeEnum.CLUSTER; - case TOPIC -> KafkaAclDTO.ResourceTypeEnum.TOPIC; - case GROUP -> KafkaAclDTO.ResourceTypeEnum.GROUP; - case DELEGATION_TOKEN -> KafkaAclDTO.ResourceTypeEnum.DELEGATION_TOKEN; - case TRANSACTIONAL_ID -> KafkaAclDTO.ResourceTypeEnum.TRANSACTIONAL_ID; - case USER -> KafkaAclDTO.ResourceTypeEnum.USER; + case CLUSTER -> KafkaAclResourceTypeDTO.CLUSTER; + case TOPIC -> KafkaAclResourceTypeDTO.TOPIC; + case GROUP -> KafkaAclResourceTypeDTO.GROUP; + case DELEGATION_TOKEN -> KafkaAclResourceTypeDTO.DELEGATION_TOKEN; + case TRANSACTIONAL_ID -> KafkaAclResourceTypeDTO.TRANSACTIONAL_ID; + case USER -> KafkaAclResourceTypeDTO.USER; case ANY -> throw new IllegalArgumentException("ANY type can be only part of filter"); - case UNKNOWN -> KafkaAclDTO.ResourceTypeEnum.UNKNOWN; + case UNKNOWN -> KafkaAclResourceTypeDTO.UNKNOWN; }; } + static ResourceType mapAclResourceTypeDto(KafkaAclResourceTypeDTO dto) { + return ResourceType.valueOf(dto.name()); + } + + static PatternType mapPatternTypeDto(KafkaAclNamePatternTypeDTO dto) { + return PatternType.valueOf(dto.name()); + } + static AclBinding toAclBinding(KafkaAclDTO dto) { return new AclBinding( new ResourcePattern( - ResourceType.valueOf(dto.getResourceType().name()), + mapAclResourceTypeDto(dto.getResourceType()), dto.getResourceName(), - PatternType.valueOf(dto.getNamePatternType().name()) + mapPatternTypeDto(dto.getNamePatternType()) ), new AccessControlEntry( dto.getPrincipal(), @@ -171,7 +181,7 @@ static KafkaAclDTO toKafkaAclDto(AclBinding binding) { return new KafkaAclDTO() .resourceType(mapAclResourceType(pattern.resourceType())) .resourceName(pattern.name()) - .namePatternType(KafkaAclDTO.NamePatternTypeEnum.fromValue(pattern.patternType().name())) + .namePatternType(KafkaAclNamePatternTypeDTO.fromValue(pattern.patternType().name())) .principal(filter.principal()) .host(filter.host()) .operation(mapAclOperation(filter.operation())) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java index 53bd5c7faf5..8451a89f97d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java @@ -61,6 +61,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.TopicPartitionReplica; +import org.apache.kafka.common.acl.AccessControlEntryFilter; import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.acl.AclOperation; @@ -74,6 +75,8 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.requests.DescribeLogDirsResponse; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourcePatternFilter; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -592,9 +595,9 @@ Mono> listOffsetsUnsafe(Collection par ); } - public Mono> listAcls() { + public Mono> listAcls(ResourcePatternFilter filter) { Preconditions.checkArgument(features.contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED)); - return toMono(client.describeAcls(AclBindingFilter.ANY).values()); + return toMono(client.describeAcls(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).values()); } public Mono createAcls(Collection aclBindings) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclsService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclsService.java index e1484135754..8c5a8dab06f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclsService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclsService.java @@ -3,12 +3,12 @@ import com.google.common.collect.Sets; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.service.AdminClientService; -import com.provectus.kafka.ui.service.ReactiveAdminClient; import java.util.List; import java.util.Set; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.resource.ResourcePatternFilter; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -36,21 +36,21 @@ public Mono deleteAcl(KafkaCluster cluster, AclBinding aclBinding) { .doOnSuccess(v -> log.info("ACL DELETED: [{}]", aclString)); } - public Flux listAcls(KafkaCluster cluster) { + public Flux listAcls(KafkaCluster cluster, ResourcePatternFilter filter) { return adminClientService.get(cluster) - .flatMap(ReactiveAdminClient::listAcls) + .flatMap(c -> c.listAcls(filter)) .flatMapIterable(acls -> acls); } public Mono getAclAsCsvString(KafkaCluster cluster) { return adminClientService.get(cluster) - .flatMap(ReactiveAdminClient::listAcls) + .flatMap(c -> c.listAcls(ResourcePatternFilter.ANY)) .map(AclCsv::transformToCsvString); } public Mono syncAclWithAclCsv(KafkaCluster cluster, String csv) { return adminClientService.get(cluster) - .flatMap(ac -> ac.listAcls().flatMap(existingAclList -> { + .flatMap(ac -> ac.listAcls(ResourcePatternFilter.ANY).flatMap(existingAclList -> { var existingSet = Set.copyOf(existingAclList); var newAcls = Set.copyOf(AclCsv.parseCsv(csv)); var toDelete = Sets.difference(existingSet, newAcls); diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclsServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclsServiceTest.java index a5fbfd58c16..5791bb20414 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclsServiceTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclsServiceTest.java @@ -15,6 +15,7 @@ import org.apache.kafka.common.acl.AclPermissionType; import org.apache.kafka.common.resource.PatternType; import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourcePatternFilter; import org.apache.kafka.common.resource.ResourceType; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -49,7 +50,7 @@ void testSyncAclWithAclCsv() { new ResourcePattern(ResourceType.GROUP, "groupNew", PatternType.PREFIXED), new AccessControlEntry("User:test3", "localhost", AclOperation.DESCRIBE, AclPermissionType.DENY)); - when(adminClientMock.listAcls()) + when(adminClientMock.listAcls(ResourcePatternFilter.ANY)) .thenReturn(Mono.just(List.of(existingBinding1, existingBinding2))); ArgumentCaptor createdCaptor = ArgumentCaptor.forClass(Collection.class); diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 003ddc2deb5..a70500c4dd7 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -1742,6 +1742,21 @@ paths: required: true schema: type: string + - name: resourceType + in: query + required: false + schema: + $ref: '#/components/schemas/KafkaAclResourceType' + - name: resourceName + in: query + required: false + schema: + type: string + - name: namePatternType + in: query + required: false + schema: + $ref: '#/components/schemas/KafkaAclNamePatternType' responses: 200: description: OK @@ -3444,22 +3459,11 @@ components: required: [resourceType, resourceName, namePatternType, principal, host, operation, permission] properties: resourceType: - type: string - enum: - - UNKNOWN # Unknown operation, need to update mapping code on BE - - TOPIC - - GROUP - - CLUSTER - - TRANSACTIONAL_ID - - DELEGATION_TOKEN - - USER + $ref: '#/components/schemas/KafkaAclResourceType' resourceName: type: string # "*" if acl can be applied to any resource of given type namePatternType: - type: string - enum: - - LITERAL - - PREFIXED + $ref: '#/components/schemas/KafkaAclNamePatternType' principal: type: string host: @@ -3487,6 +3491,24 @@ components: - ALLOW - DENY + KafkaAclResourceType: + type: string + enum: + - UNKNOWN # Unknown operation, need to update mapping code on BE + - TOPIC + - GROUP + - CLUSTER + - TRANSACTIONAL_ID + - DELEGATION_TOKEN + - USER + + KafkaAclNamePatternType: + type: string + enum: + - MATCH + - LITERAL + - PREFIXED + RestartRequest: type: object properties: From 00dcff6678c0f2bb9300df2944628b2b8125c6ba Mon Sep 17 00:00:00 2001 From: iliax Date: Tue, 2 May 2023 11:10:09 +0400 Subject: [PATCH 14/14] kafka-ui-acl-with-zk.yaml added, kafka-ui-sasl.yaml reverted --- .../compose/kafka-ui-acl-with-zk.yaml | 59 +++++++++++++++++++ documentation/compose/kafka-ui-sasl.yaml | 19 ++---- 2 files changed, 64 insertions(+), 14 deletions(-) create mode 100644 documentation/compose/kafka-ui-acl-with-zk.yaml diff --git a/documentation/compose/kafka-ui-acl-with-zk.yaml b/documentation/compose/kafka-ui-acl-with-zk.yaml new file mode 100644 index 00000000000..e1d70b29702 --- /dev/null +++ b/documentation/compose/kafka-ui-acl-with-zk.yaml @@ -0,0 +1,59 @@ +--- +version: '2' +services: + + kafka-ui: + container_name: kafka-ui + image: provectuslabs/kafka-ui:latest + ports: + - 8080:8080 + depends_on: + - zookeeper + - kafka + environment: + KAFKA_CLUSTERS_0_NAME: local + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 + KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL: SASL_PLAINTEXT + KAFKA_CLUSTERS_0_PROPERTIES_SASL_MECHANISM: PLAIN + KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret";' + + zookeeper: + image: wurstmeister/zookeeper:3.4.6 + environment: + JVMFLAGS: "-Djava.security.auth.login.config=/etc/zookeeper/zookeeper_jaas.conf" + volumes: + - ./jaas/zookeeper_jaas.conf:/etc/zookeeper/zookeeper_jaas.conf + ports: + - 2181:2181 + + kafka: + image: confluentinc/cp-kafka:7.2.1 + hostname: kafka + container_name: kafka + ports: + - "9092:9092" + - "9997:9997" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' + KAFKA_ADVERTISED_LISTENERS: 'SASL_PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092' + KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/jaas/kafka_server.conf" + KAFKA_AUTHORIZER_CLASS_NAME: "kafka.security.authorizer.AclAuthorizer" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_JMX_PORT: 9997 + KAFKA_JMX_HOSTNAME: localhost + KAFKA_NODE_ID: 1 + KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093' + KAFKA_LISTENERS: 'SASL_PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://0.0.0.0:9092' + KAFKA_INTER_BROKER_LISTENER_NAME: 'SASL_PLAINTEXT' + KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN' + KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: 'PLAIN' + KAFKA_SECURITY_PROTOCOL: 'SASL_PLAINTEXT' + KAFKA_SUPER_USERS: 'User:admin' + volumes: + - ./scripts/update_run.sh:/tmp/update_run.sh + - ./jaas:/etc/kafka/jaas diff --git a/documentation/compose/kafka-ui-sasl.yaml b/documentation/compose/kafka-ui-sasl.yaml index ebc920cf38f..e4a2b3cc4a7 100644 --- a/documentation/compose/kafka-ui-sasl.yaml +++ b/documentation/compose/kafka-ui-sasl.yaml @@ -8,26 +8,15 @@ services: ports: - 8080:8080 depends_on: - - zookeeper - kafka environment: KAFKA_CLUSTERS_0_NAME: local KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL: SASL_PLAINTEXT KAFKA_CLUSTERS_0_PROPERTIES_SASL_MECHANISM: PLAIN -# KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="enzo" password="cisternino";' KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret";' DYNAMIC_CONFIG_ENABLED: true # not necessary for sasl auth, added for tests - zookeeper: - image: wurstmeister/zookeeper:3.4.6 - environment: - JVMFLAGS: "-Djava.security.auth.login.config=/etc/zookeeper/zookeeper_jaas.conf" - volumes: - - ./jaas/zookeeper_jaas.conf:/etc/zookeeper/zookeeper_jaas.conf - ports: - - 2181:2181 - kafka: image: confluentinc/cp-kafka:7.2.1 hostname: kafka @@ -37,25 +26,27 @@ services: - "9997:9997" environment: KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' KAFKA_ADVERTISED_LISTENERS: 'SASL_PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092' KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/jaas/kafka_server.conf" - KAFKA_AUTHORIZER_CLASS_NAME: "kafka.security.authorizer.AclAuthorizer" KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_JMX_PORT: 9997 KAFKA_JMX_HOSTNAME: localhost + KAFKA_PROCESS_ROLES: 'broker,controller' KAFKA_NODE_ID: 1 KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093' KAFKA_LISTENERS: 'SASL_PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://0.0.0.0:9092' KAFKA_INTER_BROKER_LISTENER_NAME: 'SASL_PLAINTEXT' KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN' KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: 'PLAIN' + KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' + KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' KAFKA_SECURITY_PROTOCOL: 'SASL_PLAINTEXT' - KAFKA_SUPER_USERS: 'User:admin' + KAFKA_SUPER_USERS: 'User:admin,User:enzo' volumes: - ./scripts/update_run.sh:/tmp/update_run.sh - ./jaas:/etc/kafka/jaas + command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'"