diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/KafkaConnectMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/KafkaConnectMapper.java index b1bcd44d8c3..968b27286f7 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/KafkaConnectMapper.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/KafkaConnectMapper.java @@ -11,11 +11,8 @@ import com.provectus.kafka.ui.model.FullConnectorInfo; import com.provectus.kafka.ui.model.Task; import com.provectus.kafka.ui.model.TaskStatus; -import java.util.Arrays; +import com.provectus.kafka.ui.model.connect.InternalConnectInfo; import java.util.List; -import java.util.Map; -import java.util.function.Function; -import org.apache.commons.lang3.tuple.Triple; import org.mapstruct.Mapper; @Mapper(componentModel = "spring") @@ -36,32 +33,22 @@ ConnectorPluginConfigValidationResponse fromClient( com.provectus.kafka.ui.connect.model.ConnectorPluginConfigValidationResponse connectorPluginConfigValidationResponse); - default FullConnectorInfo fullConnectorInfoFromTuple(Triple, - List> triple) { - Function, List> getTopicsFromConfig = config -> { - var topic = config.get("topic"); - if (topic != null) { - return List.of((String) topic); - } - return Arrays.asList(((String) config.get("topics")).split(",")); - }; - + default FullConnectorInfo fullConnectorInfoFromTuple(InternalConnectInfo connectInfo) { + Connector connector = connectInfo.getConnector(); + List tasks = connectInfo.getTasks(); + int failedTasksCount = (int) tasks.stream() + .map(Task::getStatus) + .map(TaskStatus::getState) + .filter(ConnectorTaskStatus.FAILED::equals) + .count(); return new FullConnectorInfo() - .connect(triple.getLeft().getConnect()) - .name(triple.getLeft().getName()) - .connectorClass((String) triple.getMiddle().get("connector.class")) - .type(triple.getLeft().getType()) - .topics(getTopicsFromConfig.apply(triple.getMiddle())) - .status( - triple.getLeft().getStatus() - ) - .tasksCount(triple.getRight().size()) - .failedTasksCount((int) triple.getRight().stream() - .map(Task::getStatus) - .map(TaskStatus::getState) - .filter(ConnectorTaskStatus.FAILED::equals) - .count()); + .connect(connector.getConnect()) + .name(connector.getName()) + .connectorClass((String) connectInfo.getConfig().get("connector.class")) + .type(connector.getType()) + .topics(connectInfo.getTopics()) + .status(connector.getStatus()) + .tasksCount(tasks.size()) + .failedTasksCount(failedTasksCount); } - - ; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/connect/InternalConnectInfo.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/connect/InternalConnectInfo.java new file mode 100644 index 00000000000..4c177c628d6 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/connect/InternalConnectInfo.java @@ -0,0 +1,17 @@ +package com.provectus.kafka.ui.model.connect; + +import com.provectus.kafka.ui.model.Connector; +import com.provectus.kafka.ui.model.Task; +import java.util.List; +import java.util.Map; +import lombok.Builder; +import lombok.Data; + +@Data +@Builder(toBuilder = true) +public class InternalConnectInfo { + private final Connector connector; + private final Map config; + private final List tasks; + private final List topics; +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java index cb381edb468..bea3dbb98b0 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java @@ -3,6 +3,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.provectus.kafka.ui.client.KafkaConnectClients; +import com.provectus.kafka.ui.connect.model.ConnectorTopics; import com.provectus.kafka.ui.exception.ClusterNotFoundException; import com.provectus.kafka.ui.exception.ConnectNotFoundException; import com.provectus.kafka.ui.mapper.ClusterMapper; @@ -17,6 +18,7 @@ import com.provectus.kafka.ui.model.KafkaConnectCluster; import com.provectus.kafka.ui.model.NewConnector; import com.provectus.kafka.ui.model.Task; +import com.provectus.kafka.ui.model.connect.InternalConnectInfo; import java.util.Collection; import java.util.List; import java.util.Map; @@ -26,7 +28,6 @@ import lombok.SneakyThrows; import lombok.extern.log4j.Log4j2; import org.apache.commons.lang3.tuple.Pair; -import org.apache.commons.lang3.tuple.Triple; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -58,16 +59,47 @@ public Flux getAllConnectors(String clusterName) { .flatMap(pair -> getConnector(clusterName, pair.getLeft(), pair.getRight())) .flatMap(connector -> getConnectorConfig(clusterName, connector.getConnect(), connector.getName()) - .map(config -> Pair.of(connector, config)) - ) - .flatMap(pair -> - getConnectorTasks(clusterName, pair.getLeft().getConnect(), pair.getLeft().getName()) - .collectList() - .map(tasks -> Triple.of(pair.getLeft(), pair.getRight(), tasks)) + .map(config -> InternalConnectInfo.builder() + .connector(connector) + .config(config) + .build() + ) ) + .flatMap(connectInfo -> { + Connector connector = connectInfo.getConnector(); + return getConnectorTasks(clusterName, connector.getConnect(), connector.getName()) + .collectList() + .map(tasks -> InternalConnectInfo.builder() + .connector(connector) + .config(connectInfo.getConfig()) + .tasks(tasks) + .build() + ); + }) + .flatMap(connectInfo -> { + Connector connector = connectInfo.getConnector(); + return getConnectorTopics(clusterName, connector.getConnect(), connector.getName()) + .map(ct -> InternalConnectInfo.builder() + .connector(connector) + .config(connectInfo.getConfig()) + .tasks(connectInfo.getTasks()) + .topics(ct.getTopics()) + .build() + ); + }) .map(kafkaConnectMapper::fullConnectorInfoFromTuple); } + private Mono getConnectorTopics(String clusterName, String connectClusterName, + String connectorName) { + return getConnectAddress(clusterName, connectClusterName) + .flatMap(connectUrl -> KafkaConnectClients + .withBaseUrl(connectUrl) + .getConnectorTopics(connectorName) + .map(result -> result.get(connectorName)) + ); + } + private Flux> getConnectorNames(String clusterName, Connect connect) { return getConnectors(clusterName, connect.getName()) .collectList().map(e -> e.get(0)) diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-connect-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-connect-api.yaml index 237da9abb20..e6e1b6ced6f 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-connect-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-connect-api.yaml @@ -231,6 +231,28 @@ paths: items: $ref: '#/components/schemas/ConnectorTask' + /connectors/{connectorName}/topics: + get: + tags: + - KafkaConnectClient + summary: The set of topic names the connector has been using since its creation or since the last time its set of active topics was reset + operationId: getConnectorTopics + parameters: + - name: connectorName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + type: object + additionalProperties: + $ref: '#/components/schemas/ConnectorTopics' + /connectors/{connectorName}/tasks/{taskId}/status: get: tags: @@ -499,3 +521,11 @@ components: items: $ref: '#/components/schemas/ConnectorPluginConfig' + ConnectorTopics: + type: object + properties: + topics: + type: array + items: + type: string +