diff --git a/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java b/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java index 6bb82bf7e..48a8fedbf 100644 --- a/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java +++ b/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java @@ -3,11 +3,13 @@ import io.kafbat.ui.config.ClustersProperties; import io.kafbat.ui.connect.model.ClusterInfo; import io.kafbat.ui.connect.model.Connector; +import io.kafbat.ui.connect.model.ConnectorStatus; import io.kafbat.ui.connect.model.ConnectorStatusConnector; import io.kafbat.ui.connect.model.ConnectorTask; import io.kafbat.ui.connect.model.ConnectorTopics; import io.kafbat.ui.connect.model.ExpandedConnector; import io.kafbat.ui.connect.model.NewConnector; +import io.kafbat.ui.connect.model.TaskStatus; import io.kafbat.ui.model.ConnectDTO; import io.kafbat.ui.model.ConnectorDTO; import io.kafbat.ui.model.ConnectorPluginConfigValidationResponseDTO; @@ -42,14 +44,36 @@ default ClusterInfo toClient(KafkaConnectState state) { @Mapping(target = "status", ignore = true) @Mapping(target = "connect", ignore = true) - ConnectorDTO fromClient(io.kafbat.ui.connect.model.Connector connector); - - default ConnectorDTO fromClient(Connector connector, ConnectorTopics topics) { - ConnectorDTO connectorDto = this.fromClient(connector); + ConnectorDTO fromClient(Connector connector); + + default ConnectorDTO fromClient(Connector connector, + String connect, + ConnectorTopics topics, + Map sanitizedConfigs, + ConnectorStatus status) { + ConnectorDTO result = this.fromClient(connector); + result.connect(connect); if (topics != null) { - return connectorDto.topics(topics.getTopics()); + result = result.topics(topics.getTopics()); + } + if (sanitizedConfigs != null) { + result = result.config(sanitizedConfigs); + } + if (status != null && status.getConnector() != null) { + result = result.status(fromClient(status.getConnector())); + + if (status.getTasks() != null) { + boolean isAnyTaskFailed = status.getTasks().stream() + .map(TaskStatus::getState) + .anyMatch(TaskStatus.StateEnum.FAILED::equals); + + if (isAnyTaskFailed) { + result.getStatus().state(ConnectorStateDTO.TASK_FAILED); + } + } } - return connectorDto; + + return result; } ConnectorStatusDTO fromClient(ConnectorStatusConnector connectorStatus); diff --git a/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java b/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java index 8b6bcabe4..813307f6d 100644 --- a/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java +++ b/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java @@ -227,35 +227,20 @@ public Mono getConnector(KafkaCluster cluster, String connectName, String connectorName) { return api(cluster, connectName) .mono(client -> - Mono.zip(client.getConnector(connectorName), getConnectorTopics(cluster, connectName, connectorName)) - .map(t -> kafkaConnectMapper.fromClient(t.getT1(), t.getT2())) - .flatMap(connector -> - client.getConnectorStatus(connector.getName()) - // status request can return 404 if tasks not assigned yet - .onErrorResume(WebClientResponseException.NotFound.class, + Mono.zip( + client.getConnector(connectorName), + getConnectorTopics(cluster, connectName, connectorName), + client.getConnectorStatus(connectorName).onErrorResume(WebClientResponseException.NotFound.class, e -> emptyStatus(connectorName)) - .map(connectorStatus -> { - var status = connectorStatus.getConnector(); - var sanitizedConfig = kafkaConfigSanitizer.sanitizeConnectorConfig(connector.getConfig()); - ConnectorDTO result = new ConnectorDTO() - .connect(connectName) - .status(kafkaConnectMapper.fromClient(status)) - .type(connector.getType()) - .tasks(connector.getTasks()) - .name(connector.getName()) - .config(sanitizedConfig); - - if (connectorStatus.getTasks() != null) { - boolean isAnyTaskFailed = connectorStatus.getTasks().stream() - .map(TaskStatus::getState) - .anyMatch(TaskStatus.StateEnum.FAILED::equals); - - if (isAnyTaskFailed) { - result.getStatus().state(ConnectorStateDTO.TASK_FAILED); - } - } - return result; - }) + ) + .map(t -> + kafkaConnectMapper.fromClient( + t.getT1(), + connectName, + t.getT2(), + kafkaConfigSanitizer.sanitizeConnectorConfig(t.getT1().getConfig()), + t.getT3() + ) ) ); } @@ -281,7 +266,7 @@ public Mono setConnectorConfig(KafkaCluster cluster, String connec .mono(c -> requestBody .flatMap(body -> c.setConnectorConfig(connectorName, body)) - .map(kafkaConnectMapper::fromClient)); + .map(connector -> kafkaConnectMapper.fromClient(connector))); } public Mono deleteConnector(