From 1e4aa283f37724aaca15681bae46f7a2491e0822 Mon Sep 17 00:00:00 2001 From: German Osin Date: Thu, 11 Sep 2025 19:51:25 +0200 Subject: [PATCH 1/7] BE: Fixes #498 Added sort by number of messages --- .../java/io/kafbat/ui/controller/TopicsController.java | 1 + .../main/java/io/kafbat/ui/model/InternalTopic.java | 10 ++++++++++ contract-typespec/api/topics.tsp | 1 + contract/src/main/resources/swagger/kafbat-ui-api.yaml | 1 + 4 files changed, 13 insertions(+) diff --git a/api/src/main/java/io/kafbat/ui/controller/TopicsController.java b/api/src/main/java/io/kafbat/ui/controller/TopicsController.java index 4ad3f11e9..f320be7a9 100644 --- a/api/src/main/java/io/kafbat/ui/controller/TopicsController.java +++ b/api/src/main/java/io/kafbat/ui/controller/TopicsController.java @@ -362,6 +362,7 @@ private Comparator getComparatorForTopic( case OUT_OF_SYNC_REPLICAS -> Comparator.comparing(t -> t.getReplicas() - t.getInSyncReplicas()); case REPLICATION_FACTOR -> Comparator.comparing(InternalTopic::getReplicationFactor); case SIZE -> Comparator.comparing(InternalTopic::getSegmentSize); + case MESSAGES_NUMBER -> Comparator.comparing(InternalTopic::getMessagesNumber); default -> defaultComparator; }; } diff --git a/api/src/main/java/io/kafbat/ui/model/InternalTopic.java b/api/src/main/java/io/kafbat/ui/model/InternalTopic.java index 5dfab7c42..f4b67bb80 100644 --- a/api/src/main/java/io/kafbat/ui/model/InternalTopic.java +++ b/api/src/main/java/io/kafbat/ui/model/InternalTopic.java @@ -11,6 +11,7 @@ import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.TopicPartition; +import org.jetbrains.annotations.NotNull; @Data @Builder(toBuilder = true) @@ -143,4 +144,13 @@ public static InternalTopic from(TopicDescription topicDescription, return topic.build(); } + public long getMessagesNumber() { + long result = 0; + if (partitions != null && !partitions.isEmpty()) { + for (InternalPartition partition : partitions.values()) { + result += (partition.getOffsetMax() - partition.getOffsetMin()); + } + } + return result; + } } diff --git a/contract-typespec/api/topics.tsp b/contract-typespec/api/topics.tsp index 7c03856bc..37a9c3026 100644 --- a/contract-typespec/api/topics.tsp +++ b/contract-typespec/api/topics.tsp @@ -156,6 +156,7 @@ enum TopicColumnsToSort { TOTAL_PARTITIONS, REPLICATION_FACTOR, SIZE, + MESSAGES_NUMBER } model Topic { diff --git a/contract/src/main/resources/swagger/kafbat-ui-api.yaml b/contract/src/main/resources/swagger/kafbat-ui-api.yaml index 09f1c45f8..18a2b825b 100644 --- a/contract/src/main/resources/swagger/kafbat-ui-api.yaml +++ b/contract/src/main/resources/swagger/kafbat-ui-api.yaml @@ -2692,6 +2692,7 @@ components: - TOTAL_PARTITIONS - REPLICATION_FACTOR - SIZE + - MESSAGES_NUMBER SchemaColumnsToSort: type: string From 628e95a39b8c33c29ee375f30bb5bd7e0a5a87d0 Mon Sep 17 00:00:00 2001 From: German Osin Date: Thu, 11 Sep 2025 21:11:39 +0200 Subject: [PATCH 2/7] Expose messages number --- .../main/java/io/kafbat/ui/mapper/ClusterMapper.java | 9 +++++++++ .../main/java/io/kafbat/ui/model/InternalTopic.java | 12 +++++++----- contract-typespec/api/topics.tsp | 1 + .../src/main/resources/swagger/kafbat-ui-api.yaml | 4 ++++ 4 files changed, 21 insertions(+), 5 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java b/api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java index df3051af3..24ef65991 100644 --- a/api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java +++ b/api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java @@ -51,6 +51,7 @@ import org.apache.kafka.common.resource.ResourceType; import org.mapstruct.Mapper; import org.mapstruct.Mapping; +import org.openapitools.jackson.nullable.JsonNullable; @Mapper(componentModel = "spring") public interface ClusterMapper { @@ -104,6 +105,14 @@ default ConfigSynonymDTO toConfigSynonym(ConfigEntry.ConfigSynonym config) { TopicDTO toTopic(InternalTopic topic); + default JsonNullable toJsonNullable(T value) { + if (value == null) { + return JsonNullable.undefined(); + } else { + return JsonNullable.of(value); + } + } + PartitionDTO toPartition(InternalPartition topic); BrokerDTO toBrokerDto(InternalBroker broker); diff --git a/api/src/main/java/io/kafbat/ui/model/InternalTopic.java b/api/src/main/java/io/kafbat/ui/model/InternalTopic.java index f4b67bb80..7300db5dc 100644 --- a/api/src/main/java/io/kafbat/ui/model/InternalTopic.java +++ b/api/src/main/java/io/kafbat/ui/model/InternalTopic.java @@ -144,11 +144,13 @@ public static InternalTopic from(TopicDescription topicDescription, return topic.build(); } - public long getMessagesNumber() { - long result = 0; - if (partitions != null && !partitions.isEmpty()) { - for (InternalPartition partition : partitions.values()) { - result += (partition.getOffsetMax() - partition.getOffsetMin()); + public Long getMessagesNumber() { + Long result = null; + if (cleanUpPolicy.equals(CleanupPolicy.DELETE)) { + if (partitions != null && !partitions.isEmpty()) { + for (InternalPartition partition : partitions.values()) { + result += (partition.getOffsetMax() - partition.getOffsetMin()); + } } } return result; diff --git a/contract-typespec/api/topics.tsp b/contract-typespec/api/topics.tsp index 37a9c3026..d76975f98 100644 --- a/contract-typespec/api/topics.tsp +++ b/contract-typespec/api/topics.tsp @@ -171,6 +171,7 @@ model Topic { bytesInPerSec?: float64; bytesOutPerSec?: float64; underReplicatedPartitions?: int32; + messagesNumber?: int64 | null; cleanUpPolicy?: CleanUpPolicy; partitions?: Partition[]; } diff --git a/contract/src/main/resources/swagger/kafbat-ui-api.yaml b/contract/src/main/resources/swagger/kafbat-ui-api.yaml index 18a2b825b..13be7be81 100644 --- a/contract/src/main/resources/swagger/kafbat-ui-api.yaml +++ b/contract/src/main/resources/swagger/kafbat-ui-api.yaml @@ -2748,6 +2748,10 @@ components: type: array items: $ref: "#/components/schemas/Partition" + messagesNumber: + type: integer + format: int64 + nullable: true required: - name From 1e61f993e4f4f6c8f52cda1fc11eb1e2583b827b Mon Sep 17 00:00:00 2001 From: German Osin Date: Fri, 12 Sep 2025 08:16:45 +0200 Subject: [PATCH 3/7] Expose messages number --- api/src/main/java/io/kafbat/ui/model/InternalTopic.java | 1 + 1 file changed, 1 insertion(+) diff --git a/api/src/main/java/io/kafbat/ui/model/InternalTopic.java b/api/src/main/java/io/kafbat/ui/model/InternalTopic.java index 7300db5dc..b4021cc4c 100644 --- a/api/src/main/java/io/kafbat/ui/model/InternalTopic.java +++ b/api/src/main/java/io/kafbat/ui/model/InternalTopic.java @@ -147,6 +147,7 @@ public static InternalTopic from(TopicDescription topicDescription, public Long getMessagesNumber() { Long result = null; if (cleanUpPolicy.equals(CleanupPolicy.DELETE)) { + result = 0L; if (partitions != null && !partitions.isEmpty()) { for (InternalPartition partition : partitions.values()) { result += (partition.getOffsetMax() - partition.getOffsetMin()); From 9e7ddce642aeaaf35c57193435ade6af9248d242 Mon Sep 17 00:00:00 2001 From: German Osin Date: Fri, 12 Sep 2025 16:09:22 +0200 Subject: [PATCH 4/7] Handle null values for messages number --- .../main/java/io/kafbat/ui/controller/TopicsController.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/api/src/main/java/io/kafbat/ui/controller/TopicsController.java b/api/src/main/java/io/kafbat/ui/controller/TopicsController.java index f320be7a9..67c82e5a1 100644 --- a/api/src/main/java/io/kafbat/ui/controller/TopicsController.java +++ b/api/src/main/java/io/kafbat/ui/controller/TopicsController.java @@ -362,7 +362,10 @@ private Comparator getComparatorForTopic( case OUT_OF_SYNC_REPLICAS -> Comparator.comparing(t -> t.getReplicas() - t.getInSyncReplicas()); case REPLICATION_FACTOR -> Comparator.comparing(InternalTopic::getReplicationFactor); case SIZE -> Comparator.comparing(InternalTopic::getSegmentSize); - case MESSAGES_NUMBER -> Comparator.comparing(InternalTopic::getMessagesNumber); + case MESSAGES_NUMBER -> Comparator.comparing( + InternalTopic::getMessagesNumber, + Comparator.nullsLast(Long::compareTo) + ); default -> defaultComparator; }; } From d3f59c5b4122bd5d153473d2452fb3abddc11812 Mon Sep 17 00:00:00 2001 From: German Osin Date: Fri, 12 Sep 2025 16:10:51 +0200 Subject: [PATCH 5/7] added nullable --- api/src/main/java/io/kafbat/ui/model/InternalTopic.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/src/main/java/io/kafbat/ui/model/InternalTopic.java b/api/src/main/java/io/kafbat/ui/model/InternalTopic.java index b4021cc4c..d81afd937 100644 --- a/api/src/main/java/io/kafbat/ui/model/InternalTopic.java +++ b/api/src/main/java/io/kafbat/ui/model/InternalTopic.java @@ -144,7 +144,7 @@ public static InternalTopic from(TopicDescription topicDescription, return topic.build(); } - public Long getMessagesNumber() { + public @Nullable Long getMessagesNumber() { Long result = null; if (cleanUpPolicy.equals(CleanupPolicy.DELETE)) { result = 0L; From 859a2dd54b59b8cf4ce83acddd2d63452c305d6e Mon Sep 17 00:00:00 2001 From: German Osin Date: Fri, 12 Sep 2025 16:14:45 +0200 Subject: [PATCH 6/7] Fixed naming --- .../main/java/io/kafbat/ui/controller/TopicsController.java | 3 +-- api/src/main/java/io/kafbat/ui/model/InternalTopic.java | 4 +--- contract-typespec/api/topics.tsp | 2 +- contract/src/main/resources/swagger/kafbat-ui-api.yaml | 2 +- 4 files changed, 4 insertions(+), 7 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/controller/TopicsController.java b/api/src/main/java/io/kafbat/ui/controller/TopicsController.java index 67c82e5a1..96d0b62e9 100644 --- a/api/src/main/java/io/kafbat/ui/controller/TopicsController.java +++ b/api/src/main/java/io/kafbat/ui/controller/TopicsController.java @@ -7,7 +7,6 @@ import static io.kafbat.ui.model.rbac.permission.TopicAction.EDIT; import static io.kafbat.ui.model.rbac.permission.TopicAction.VIEW; import static java.util.stream.Collectors.toList; -import static org.apache.commons.lang3.Strings.CI; import io.kafbat.ui.api.TopicsApi; import io.kafbat.ui.config.ClustersProperties; @@ -363,7 +362,7 @@ private Comparator getComparatorForTopic( case REPLICATION_FACTOR -> Comparator.comparing(InternalTopic::getReplicationFactor); case SIZE -> Comparator.comparing(InternalTopic::getSegmentSize); case MESSAGES_NUMBER -> Comparator.comparing( - InternalTopic::getMessagesNumber, + InternalTopic::getMessagesCount, Comparator.nullsLast(Long::compareTo) ); default -> defaultComparator; diff --git a/api/src/main/java/io/kafbat/ui/model/InternalTopic.java b/api/src/main/java/io/kafbat/ui/model/InternalTopic.java index d81afd937..221c854f3 100644 --- a/api/src/main/java/io/kafbat/ui/model/InternalTopic.java +++ b/api/src/main/java/io/kafbat/ui/model/InternalTopic.java @@ -10,8 +10,6 @@ import lombok.Data; import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.TopicDescription; -import org.apache.kafka.common.TopicPartition; -import org.jetbrains.annotations.NotNull; @Data @Builder(toBuilder = true) @@ -144,7 +142,7 @@ public static InternalTopic from(TopicDescription topicDescription, return topic.build(); } - public @Nullable Long getMessagesNumber() { + public @Nullable Long getMessagesCount() { Long result = null; if (cleanUpPolicy.equals(CleanupPolicy.DELETE)) { result = 0L; diff --git a/contract-typespec/api/topics.tsp b/contract-typespec/api/topics.tsp index d76975f98..58dd02b0c 100644 --- a/contract-typespec/api/topics.tsp +++ b/contract-typespec/api/topics.tsp @@ -171,7 +171,7 @@ model Topic { bytesInPerSec?: float64; bytesOutPerSec?: float64; underReplicatedPartitions?: int32; - messagesNumber?: int64 | null; + messagesCount?: int64 | null; cleanUpPolicy?: CleanUpPolicy; partitions?: Partition[]; } diff --git a/contract/src/main/resources/swagger/kafbat-ui-api.yaml b/contract/src/main/resources/swagger/kafbat-ui-api.yaml index 13be7be81..d6b8e965d 100644 --- a/contract/src/main/resources/swagger/kafbat-ui-api.yaml +++ b/contract/src/main/resources/swagger/kafbat-ui-api.yaml @@ -2748,7 +2748,7 @@ components: type: array items: $ref: "#/components/schemas/Partition" - messagesNumber: + messagesCount: type: integer format: int64 nullable: true From 9feae201567eab3923c693dad5337cb688634fee Mon Sep 17 00:00:00 2001 From: German Osin Date: Fri, 12 Sep 2025 16:17:22 +0200 Subject: [PATCH 7/7] Fixed naming --- api/src/main/java/io/kafbat/ui/controller/TopicsController.java | 2 +- contract-typespec/api/topics.tsp | 2 +- contract/src/main/resources/swagger/kafbat-ui-api.yaml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/controller/TopicsController.java b/api/src/main/java/io/kafbat/ui/controller/TopicsController.java index 96d0b62e9..39885d593 100644 --- a/api/src/main/java/io/kafbat/ui/controller/TopicsController.java +++ b/api/src/main/java/io/kafbat/ui/controller/TopicsController.java @@ -361,7 +361,7 @@ private Comparator getComparatorForTopic( case OUT_OF_SYNC_REPLICAS -> Comparator.comparing(t -> t.getReplicas() - t.getInSyncReplicas()); case REPLICATION_FACTOR -> Comparator.comparing(InternalTopic::getReplicationFactor); case SIZE -> Comparator.comparing(InternalTopic::getSegmentSize); - case MESSAGES_NUMBER -> Comparator.comparing( + case MESSAGES_COUNT -> Comparator.comparing( InternalTopic::getMessagesCount, Comparator.nullsLast(Long::compareTo) ); diff --git a/contract-typespec/api/topics.tsp b/contract-typespec/api/topics.tsp index 58dd02b0c..5fad04813 100644 --- a/contract-typespec/api/topics.tsp +++ b/contract-typespec/api/topics.tsp @@ -156,7 +156,7 @@ enum TopicColumnsToSort { TOTAL_PARTITIONS, REPLICATION_FACTOR, SIZE, - MESSAGES_NUMBER + MESSAGES_COUNT } model Topic { diff --git a/contract/src/main/resources/swagger/kafbat-ui-api.yaml b/contract/src/main/resources/swagger/kafbat-ui-api.yaml index d6b8e965d..a05c9a5a4 100644 --- a/contract/src/main/resources/swagger/kafbat-ui-api.yaml +++ b/contract/src/main/resources/swagger/kafbat-ui-api.yaml @@ -2692,7 +2692,7 @@ components: - TOTAL_PARTITIONS - REPLICATION_FACTOR - SIZE - - MESSAGES_NUMBER + - MESSAGES_COUNT SchemaColumnsToSort: type: string