From 6647d0fde5cd7ad9fcc05921d4e7e73793dbc909 Mon Sep 17 00:00:00 2001 From: Ildar Almakaev Date: Sun, 30 May 2021 19:17:46 +0300 Subject: [PATCH 1/3] [Kafka Connect] Fix getting list of connector's topics --- .../kafka/ui/mapper/KafkaConnectMapper.java | 47 +++++++------------ .../ui/model/connect/InternalConnectInfo.java | 17 +++++++ .../kafka/ui/service/KafkaConnectService.java | 37 +++++++++++++-- .../resources/swagger/kafka-connect-api.yaml | 30 ++++++++++++ 4 files changed, 96 insertions(+), 35 deletions(-) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/connect/InternalConnectInfo.java diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/KafkaConnectMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/KafkaConnectMapper.java index b1bcd44d8c3..968b27286f7 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/KafkaConnectMapper.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/KafkaConnectMapper.java @@ -11,11 +11,8 @@ import com.provectus.kafka.ui.model.FullConnectorInfo; import com.provectus.kafka.ui.model.Task; import com.provectus.kafka.ui.model.TaskStatus; -import java.util.Arrays; +import com.provectus.kafka.ui.model.connect.InternalConnectInfo; import java.util.List; -import java.util.Map; -import java.util.function.Function; -import org.apache.commons.lang3.tuple.Triple; import org.mapstruct.Mapper; @Mapper(componentModel = "spring") @@ -36,32 +33,22 @@ ConnectorPluginConfigValidationResponse fromClient( com.provectus.kafka.ui.connect.model.ConnectorPluginConfigValidationResponse connectorPluginConfigValidationResponse); - default FullConnectorInfo fullConnectorInfoFromTuple(Triple, - List> triple) { - Function, List> getTopicsFromConfig = config -> { - var topic = config.get("topic"); - if (topic != null) { - return List.of((String) topic); - } - return Arrays.asList(((String) config.get("topics")).split(",")); - }; - + default FullConnectorInfo fullConnectorInfoFromTuple(InternalConnectInfo connectInfo) { + Connector connector = connectInfo.getConnector(); + List tasks = connectInfo.getTasks(); + int failedTasksCount = (int) tasks.stream() + .map(Task::getStatus) + .map(TaskStatus::getState) + .filter(ConnectorTaskStatus.FAILED::equals) + .count(); return new FullConnectorInfo() - .connect(triple.getLeft().getConnect()) - .name(triple.getLeft().getName()) - .connectorClass((String) triple.getMiddle().get("connector.class")) - .type(triple.getLeft().getType()) - .topics(getTopicsFromConfig.apply(triple.getMiddle())) - .status( - triple.getLeft().getStatus() - ) - .tasksCount(triple.getRight().size()) - .failedTasksCount((int) triple.getRight().stream() - .map(Task::getStatus) - .map(TaskStatus::getState) - .filter(ConnectorTaskStatus.FAILED::equals) - .count()); + .connect(connector.getConnect()) + .name(connector.getName()) + .connectorClass((String) connectInfo.getConfig().get("connector.class")) + .type(connector.getType()) + .topics(connectInfo.getTopics()) + .status(connector.getStatus()) + .tasksCount(tasks.size()) + .failedTasksCount(failedTasksCount); } - - ; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/connect/InternalConnectInfo.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/connect/InternalConnectInfo.java new file mode 100644 index 00000000000..4c177c628d6 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/connect/InternalConnectInfo.java @@ -0,0 +1,17 @@ +package com.provectus.kafka.ui.model.connect; + +import com.provectus.kafka.ui.model.Connector; +import com.provectus.kafka.ui.model.Task; +import java.util.List; +import java.util.Map; +import lombok.Builder; +import lombok.Data; + +@Data +@Builder(toBuilder = true) +public class InternalConnectInfo { + private final Connector connector; + private final Map config; + private final List tasks; + private final List topics; +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java index cb381edb468..78730f813ed 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java @@ -3,6 +3,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.provectus.kafka.ui.client.KafkaConnectClients; +import com.provectus.kafka.ui.connect.model.ConnectorTopics; import com.provectus.kafka.ui.exception.ClusterNotFoundException; import com.provectus.kafka.ui.exception.ConnectNotFoundException; import com.provectus.kafka.ui.mapper.ClusterMapper; @@ -17,6 +18,7 @@ import com.provectus.kafka.ui.model.KafkaConnectCluster; import com.provectus.kafka.ui.model.NewConnector; import com.provectus.kafka.ui.model.Task; +import com.provectus.kafka.ui.model.connect.InternalConnectInfo; import java.util.Collection; import java.util.List; import java.util.Map; @@ -26,7 +28,6 @@ import lombok.SneakyThrows; import lombok.extern.log4j.Log4j2; import org.apache.commons.lang3.tuple.Pair; -import org.apache.commons.lang3.tuple.Triple; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -58,16 +59,42 @@ public Flux getAllConnectors(String clusterName) { .flatMap(pair -> getConnector(clusterName, pair.getLeft(), pair.getRight())) .flatMap(connector -> getConnectorConfig(clusterName, connector.getConnect(), connector.getName()) - .map(config -> Pair.of(connector, config)) + .map(config -> InternalConnectInfo.builder() + .connector(connector) + .config(config) + .build() + ) ) - .flatMap(pair -> - getConnectorTasks(clusterName, pair.getLeft().getConnect(), pair.getLeft().getName()) + .flatMap(connectInfo -> + getConnectorTasks(clusterName, connectInfo.getConnector().getConnect(), connectInfo.getConnector().getName()) .collectList() - .map(tasks -> Triple.of(pair.getLeft(), pair.getRight(), tasks)) + .map(tasks -> InternalConnectInfo.builder() + .connector(connectInfo.getConnector()) + .config(connectInfo.getConfig()) + .tasks(tasks) + .build() + ) ) + .flatMap(connectInfo -> getConnectorTopics(clusterName, connectInfo.getConnector().getConnect(), connectInfo.getConnector().getName()) + .map(ct -> InternalConnectInfo.builder() + .connector(connectInfo.getConnector()) + .config(connectInfo.getConfig()) + .tasks(connectInfo.getTasks()) + .topics(ct.getTopics()) + .build() + )) .map(kafkaConnectMapper::fullConnectorInfoFromTuple); } + private Mono getConnectorTopics(String clusterName, String kafkaConnectClusterName, String connectorName) { + return getConnectAddress(clusterName, kafkaConnectClusterName) + .flatMap(connectUrl -> KafkaConnectClients + .withBaseUrl(connectUrl) + .getConnectorTopics(connectorName) + .map(result -> result.get(connectorName)) + ); + } + private Flux> getConnectorNames(String clusterName, Connect connect) { return getConnectors(clusterName, connect.getName()) .collectList().map(e -> e.get(0)) diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-connect-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-connect-api.yaml index 237da9abb20..e6e1b6ced6f 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-connect-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-connect-api.yaml @@ -231,6 +231,28 @@ paths: items: $ref: '#/components/schemas/ConnectorTask' + /connectors/{connectorName}/topics: + get: + tags: + - KafkaConnectClient + summary: The set of topic names the connector has been using since its creation or since the last time its set of active topics was reset + operationId: getConnectorTopics + parameters: + - name: connectorName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + type: object + additionalProperties: + $ref: '#/components/schemas/ConnectorTopics' + /connectors/{connectorName}/tasks/{taskId}/status: get: tags: @@ -499,3 +521,11 @@ components: items: $ref: '#/components/schemas/ConnectorPluginConfig' + ConnectorTopics: + type: object + properties: + topics: + type: array + items: + type: string + From 81c6928513593263844dda835aaac194d04a5789 Mon Sep 17 00:00:00 2001 From: Ildar Almakaev Date: Mon, 31 May 2021 08:39:01 +0300 Subject: [PATCH 2/3] Fix code style in KafkaConnectService.java --- .../kafka/ui/service/KafkaConnectService.java | 43 +++++++++++-------- 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java index 78730f813ed..86718189b82 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java @@ -66,28 +66,35 @@ public Flux getAllConnectors(String clusterName) { ) ) .flatMap(connectInfo -> - getConnectorTasks(clusterName, connectInfo.getConnector().getConnect(), connectInfo.getConnector().getName()) - .collectList() - .map(tasks -> InternalConnectInfo.builder() - .connector(connectInfo.getConnector()) - .config(connectInfo.getConfig()) - .tasks(tasks) - .build() - ) + { + Connector connector = connectInfo.getConnector(); + return getConnectorTasks(clusterName, connector.getConnect(), connector.getName()) + .collectList() + .map(tasks -> InternalConnectInfo.builder() + .connector(connector) + .config(connectInfo.getConfig()) + .tasks(tasks) + .build() + ); + } ) - .flatMap(connectInfo -> getConnectorTopics(clusterName, connectInfo.getConnector().getConnect(), connectInfo.getConnector().getName()) - .map(ct -> InternalConnectInfo.builder() - .connector(connectInfo.getConnector()) - .config(connectInfo.getConfig()) - .tasks(connectInfo.getTasks()) - .topics(ct.getTopics()) - .build() - )) + .flatMap(connectInfo -> { + Connector connector = connectInfo.getConnector(); + return getConnectorTopics(clusterName, connector.getConnect(), connector.getName()) + .map(ct -> InternalConnectInfo.builder() + .connector(connector) + .config(connectInfo.getConfig()) + .tasks(connectInfo.getTasks()) + .topics(ct.getTopics()) + .build() + ); + }) .map(kafkaConnectMapper::fullConnectorInfoFromTuple); } - private Mono getConnectorTopics(String clusterName, String kafkaConnectClusterName, String connectorName) { - return getConnectAddress(clusterName, kafkaConnectClusterName) + private Mono getConnectorTopics(String clusterName, String connectClusterName, + String connectorName) { + return getConnectAddress(clusterName, connectClusterName) .flatMap(connectUrl -> KafkaConnectClients .withBaseUrl(connectUrl) .getConnectorTopics(connectorName) From 419812f612633b0bb399a3c61a1b7860c787bc61 Mon Sep 17 00:00:00 2001 From: Ildar Almakaev Date: Mon, 31 May 2021 09:25:13 +0300 Subject: [PATCH 3/3] Fix code style --- .../kafka/ui/service/KafkaConnectService.java | 24 +++++++++---------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java index 86718189b82..bea3dbb98b0 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java @@ -65,19 +65,17 @@ public Flux getAllConnectors(String clusterName) { .build() ) ) - .flatMap(connectInfo -> - { - Connector connector = connectInfo.getConnector(); - return getConnectorTasks(clusterName, connector.getConnect(), connector.getName()) - .collectList() - .map(tasks -> InternalConnectInfo.builder() - .connector(connector) - .config(connectInfo.getConfig()) - .tasks(tasks) - .build() - ); - } - ) + .flatMap(connectInfo -> { + Connector connector = connectInfo.getConnector(); + return getConnectorTasks(clusterName, connector.getConnect(), connector.getName()) + .collectList() + .map(tasks -> InternalConnectInfo.builder() + .connector(connector) + .config(connectInfo.getConfig()) + .tasks(tasks) + .build() + ); + }) .flatMap(connectInfo -> { Connector connector = connectInfo.getConnector(); return getConnectorTopics(clusterName, connector.getConnect(), connector.getName())