From 5f6b467c2b8c3aee31147db4f7009b4487e3c753 Mon Sep 17 00:00:00 2001 From: Sofia Shnaidman Date: Wed, 11 Mar 2020 11:06:17 +0300 Subject: [PATCH 1/7] Added concumer groups list with search. --- kafka-ui-react-app/mock/index.js | 2 + .../mock/payload/consumerGroups.json | 39 +++++++++++ kafka-ui-react-app/src/components/App.tsx | 2 + .../ConsumerGroups/ConsumerGroups.tsx | 34 ++++++++++ .../ConsumersGroupsContainer.ts | 24 +++++++ .../components/ConsumerGroups/List/List.tsx | 64 +++++++++++++++++++ .../ConsumerGroups/List/ListContainer.ts | 20 ++++++ .../ConsumerGroups/List/ListItem.tsx | 24 +++++++ .../src/components/Nav/ClusterMenu.tsx | 5 +- kafka-ui-react-app/src/lib/paths.ts | 2 + kafka-ui-react-app/src/redux/actionType.ts | 4 ++ .../src/redux/actions/actions.ts | 7 ++ .../src/redux/actions/thunks.ts | 10 +++ .../src/redux/api/consumerGroups.ts | 8 +++ kafka-ui-react-app/src/redux/api/index.ts | 1 + .../src/redux/interfaces/consumerGroup.ts | 5 ++ .../src/redux/interfaces/index.ts | 3 + .../redux/reducers/consumerGroups/reducer.ts | 15 +++++ .../reducers/consumerGroups/selectors.ts | 15 +++++ .../src/redux/reducers/index.ts | 2 + 20 files changed, 285 insertions(+), 1 deletion(-) create mode 100644 kafka-ui-react-app/mock/payload/consumerGroups.json create mode 100644 kafka-ui-react-app/src/components/ConsumerGroups/ConsumerGroups.tsx create mode 100644 kafka-ui-react-app/src/components/ConsumerGroups/ConsumersGroupsContainer.ts create mode 100644 kafka-ui-react-app/src/components/ConsumerGroups/List/List.tsx create mode 100644 kafka-ui-react-app/src/components/ConsumerGroups/List/ListContainer.ts create mode 100644 kafka-ui-react-app/src/components/ConsumerGroups/List/ListItem.tsx create mode 100644 kafka-ui-react-app/src/redux/api/consumerGroups.ts create mode 100644 kafka-ui-react-app/src/redux/interfaces/consumerGroup.ts create mode 100644 kafka-ui-react-app/src/redux/reducers/consumerGroups/reducer.ts create mode 100644 kafka-ui-react-app/src/redux/reducers/consumerGroups/selectors.ts diff --git a/kafka-ui-react-app/mock/index.js b/kafka-ui-react-app/mock/index.js index 8e589dfcdb0..6aa2454b33d 100644 --- a/kafka-ui-react-app/mock/index.js +++ b/kafka-ui-react-app/mock/index.js @@ -5,6 +5,7 @@ const brokerMetrics = require('./payload/brokerMetrics.json'); const topics = require('./payload/topics.json'); const topicDetails = require('./payload/topicDetails.json'); const topicConfigs = require('./payload/topicConfigs.json'); +const consumerGroups = require('./payload/consumerGroups.json'); const db = { clusters, @@ -13,6 +14,7 @@ const db = { topics: topics.map((topic) => ({...topic, id: topic.name})), topicDetails, topicConfigs, + consumerGroups: consumerGroups.map((group) => ({...group, id: group.consumerGroupId})) }; const server = jsonServer.create(); const router = jsonServer.router(db); diff --git a/kafka-ui-react-app/mock/payload/consumerGroups.json b/kafka-ui-react-app/mock/payload/consumerGroups.json new file mode 100644 index 00000000000..f0cace45d27 --- /dev/null +++ b/kafka-ui-react-app/mock/payload/consumerGroups.json @@ -0,0 +1,39 @@ +[ + { + "clusterId": "fake.cluster", + "consumerGroupId": "_fake.cluster.consumer_1", + "numConsumers": 1, + "numTopics": 11 + }, + { + "clusterId": "fake.cluster", + "consumerGroupId": "_fake.cluster.consumer_2", + "numConsumers": 2, + "numTopics": 22 + }, + { + "clusterId": "fake.cluster", + "consumerGroupId": "_fake.cluster.consumer_3", + "numConsumers": 3, + "numTopics": 33 + }, + + { + "clusterId": "kafka-ui.cluster", + "consumerGroupId": "_kafka-ui.cluster.consumer_1", + "numConsumers": 4, + "numTopics": 44 + }, + { + "clusterId": "kafka-ui.cluster", + "consumerGroupId": "_kafka-ui.cluster.consumer_2", + "numConsumers": 5, + "numTopics": 55 + }, + { + "clusterId": "kafka-ui.cluster", + "consumerGroupId": "_kafka-ui.cluster.consumer_3", + "numConsumers": 6, + "numTopics": 66 + } +] diff --git a/kafka-ui-react-app/src/components/App.tsx b/kafka-ui-react-app/src/components/App.tsx index f32df104b67..46a5ff84191 100644 --- a/kafka-ui-react-app/src/components/App.tsx +++ b/kafka-ui-react-app/src/components/App.tsx @@ -10,6 +10,7 @@ import TopicsContainer from './Topics/TopicsContainer'; import NavConatiner from './Nav/NavConatiner'; import PageLoader from './common/PageLoader/PageLoader'; import Dashboard from './Dashboard/Dashboard'; +import ConsumersGroupsContainer from './ConsumerGroups/ConsumersGroupsContainer'; interface AppProps { isClusterListFetched: boolean; @@ -39,6 +40,7 @@ const App: React.FC = ({ + ) : ( diff --git a/kafka-ui-react-app/src/components/ConsumerGroups/ConsumerGroups.tsx b/kafka-ui-react-app/src/components/ConsumerGroups/ConsumerGroups.tsx new file mode 100644 index 00000000000..6286345e018 --- /dev/null +++ b/kafka-ui-react-app/src/components/ConsumerGroups/ConsumerGroups.tsx @@ -0,0 +1,34 @@ +import React from 'react'; +import { ClusterName } from 'redux/interfaces'; +import { + Switch, + Route, +} from 'react-router-dom'; +import ListContainer from './List/ListContainer'; +import PageLoader from 'components/common/PageLoader/PageLoader'; + +interface Props { + clusterName: ClusterName; + isFetched: boolean; + fetchConsumerGroupsList: (clusterName: ClusterName) => void; +} + +const ConsumerGroups: React.FC = ({ + clusterName, + isFetched, + fetchConsumerGroupsList, +}) => { + React.useEffect(() => { fetchConsumerGroupsList(clusterName); }, [fetchConsumerGroupsList, clusterName]); + + if (isFetched) { + return ( + + + + ); + } + + return (); +}; + +export default ConsumerGroups; diff --git a/kafka-ui-react-app/src/components/ConsumerGroups/ConsumersGroupsContainer.ts b/kafka-ui-react-app/src/components/ConsumerGroups/ConsumersGroupsContainer.ts new file mode 100644 index 00000000000..b86dacea9ff --- /dev/null +++ b/kafka-ui-react-app/src/components/ConsumerGroups/ConsumersGroupsContainer.ts @@ -0,0 +1,24 @@ +import { connect } from 'react-redux'; +import { fetchConsumerGroupsList } from 'redux/actions'; +import { RootState, ClusterName } from 'redux/interfaces'; +import { RouteComponentProps } from 'react-router-dom'; +import ConsumerGroups from './ConsumerGroups'; +import { getIsConsumerGroupsListFetched } from '../../redux/reducers/consumerGroups/selectors'; + + +interface RouteProps { + clusterName: ClusterName; +} + +interface OwnProps extends RouteComponentProps { } + +const mapStateToProps = (state: RootState, { match: { params: { clusterName } }}: OwnProps) => ({ + isFetched: getIsConsumerGroupsListFetched(state), + clusterName, +}); + +const mapDispatchToProps = { + fetchConsumerGroupsList: (clusterName: ClusterName) => fetchConsumerGroupsList(clusterName), +}; + +export default connect(mapStateToProps, mapDispatchToProps)(ConsumerGroups); diff --git a/kafka-ui-react-app/src/components/ConsumerGroups/List/List.tsx b/kafka-ui-react-app/src/components/ConsumerGroups/List/List.tsx new file mode 100644 index 00000000000..cdeaf0a855d --- /dev/null +++ b/kafka-ui-react-app/src/components/ConsumerGroups/List/List.tsx @@ -0,0 +1,64 @@ +import React, { ChangeEvent } from 'react'; +import { ConsumerGroup, ClusterName } from 'redux/interfaces'; +import ListItem from './ListItem'; +import Breadcrumb from 'components/common/Breadcrumb/Breadcrumb'; + +interface Props { + clusterName: ClusterName; + consumerGroups: (ConsumerGroup)[]; +} + +const List: React.FC = ({ + consumerGroups, +}) => { + + const [searchText, setSearchText] = React.useState(''); + + const handleInputChange = (event: React.ChangeEvent) => { + setSearchText(event.target.value); + }; + + const items = consumerGroups; + + return ( +
+ All Consumer Groups + +
+
+
+ +
+
+ + + + + + + + + + {items + .filter( (consumerGroup) => !searchText || consumerGroup?.consumerGroupId?.indexOf(searchText) >= 0) + .map((consumerGroup, index) => ( + + ))} + +
Consumer group IDNum of consumersNum of topics
+
+
+ ); +}; + +export default List; diff --git a/kafka-ui-react-app/src/components/ConsumerGroups/List/ListContainer.ts b/kafka-ui-react-app/src/components/ConsumerGroups/List/ListContainer.ts new file mode 100644 index 00000000000..e44a5528a16 --- /dev/null +++ b/kafka-ui-react-app/src/components/ConsumerGroups/List/ListContainer.ts @@ -0,0 +1,20 @@ +import { connect } from 'react-redux'; +import {ClusterName, RootState} from 'redux/interfaces'; +import { getConsumerGroupsList } from 'redux/reducers/consumerGroups/selectors'; +import List from './List'; +import { withRouter, RouteComponentProps } from 'react-router-dom'; + +interface RouteProps { + clusterName: ClusterName; +} + +interface OwnProps extends RouteComponentProps { } + +const mapStateToProps = (state: RootState, { match: { params: { clusterName } } }: OwnProps) => ({ + clusterName, + consumerGroups: getConsumerGroupsList(state) +}); + +export default withRouter( + connect(mapStateToProps)(List) +); diff --git a/kafka-ui-react-app/src/components/ConsumerGroups/List/ListItem.tsx b/kafka-ui-react-app/src/components/ConsumerGroups/List/ListItem.tsx new file mode 100644 index 00000000000..a6dfa45b332 --- /dev/null +++ b/kafka-ui-react-app/src/components/ConsumerGroups/List/ListItem.tsx @@ -0,0 +1,24 @@ +import React from 'react'; +import { NavLink } from 'react-router-dom'; +import { ConsumerGroup } from 'redux/interfaces'; + +const ListItem: React.FC = ({ + consumerGroupId, + numConsumers, + numTopics, +}) => { + return ( + + {/* + + {consumerGroupId} + + */} + {consumerGroupId} + {numConsumers} + {numTopics} + + ); +} + +export default ListItem; diff --git a/kafka-ui-react-app/src/components/Nav/ClusterMenu.tsx b/kafka-ui-react-app/src/components/Nav/ClusterMenu.tsx index 5c417b83c48..28fb9b68da3 100644 --- a/kafka-ui-react-app/src/components/Nav/ClusterMenu.tsx +++ b/kafka-ui-react-app/src/components/Nav/ClusterMenu.tsx @@ -1,7 +1,7 @@ import React, { CSSProperties } from 'react'; import { Cluster } from 'redux/interfaces'; import { NavLink } from 'react-router-dom'; -import { clusterBrokersPath, clusterTopicsPath } from 'lib/paths'; +import { clusterBrokersPath, clusterTopicsPath, clusterConsumerGroupsPath } from 'lib/paths'; interface Props extends Cluster {} @@ -37,6 +37,9 @@ const ClusterMenu: React.FC = ({ Topics + + Consumers + diff --git a/kafka-ui-react-app/src/lib/paths.ts b/kafka-ui-react-app/src/lib/paths.ts index 78099dbfe29..c01592db0ed 100644 --- a/kafka-ui-react-app/src/lib/paths.ts +++ b/kafka-ui-react-app/src/lib/paths.ts @@ -13,3 +13,5 @@ export const clusterTopicNewPath = (clusterName: ClusterName) => `${clusterPath( export const clusterTopicPath = (clusterName: ClusterName, topicName: TopicName) => `${clusterTopicsPath(clusterName)}/${topicName}`; export const clusterTopicSettingsPath = (clusterName: ClusterName, topicName: TopicName) => `${clusterTopicsPath(clusterName)}/${topicName}/settings`; export const clusterTopicMessagesPath = (clusterName: ClusterName, topicName: TopicName) => `${clusterTopicsPath(clusterName)}/${topicName}/messages`; + +export const clusterConsumerGroupsPath = (clusterName: ClusterName) => `${clusterPath(clusterName)}/consumer-groups`; \ No newline at end of file diff --git a/kafka-ui-react-app/src/redux/actionType.ts b/kafka-ui-react-app/src/redux/actionType.ts index a07b08b3ee7..1b91c1915d7 100644 --- a/kafka-ui-react-app/src/redux/actionType.ts +++ b/kafka-ui-react-app/src/redux/actionType.ts @@ -26,4 +26,8 @@ export enum ActionType { POST_TOPIC__REQUEST = 'POST_TOPIC__REQUEST', POST_TOPIC__SUCCESS = 'POST_TOPIC__SUCCESS', POST_TOPIC__FAILURE = 'POST_TOPIC__FAILURE', + + GET_CONSUMER_GROUPS__REQUEST = 'GET_CONSUMER_GROUPS__REQUEST', + GET_CONSUMER_GROUPS__SUCCESS = 'GET_CONSUMER_GROUPS__SUCCESS', + GET_CONSUMER_GROUPS__FAILURE = 'GET_CONSUMER_GROUPS__FAILURE', }; diff --git a/kafka-ui-react-app/src/redux/actions/actions.ts b/kafka-ui-react-app/src/redux/actions/actions.ts index 83eba3f28bd..449328e9e40 100644 --- a/kafka-ui-react-app/src/redux/actions/actions.ts +++ b/kafka-ui-react-app/src/redux/actions/actions.ts @@ -1,5 +1,6 @@ import { createAsyncAction} from 'typesafe-actions'; import { ActionType } from 'redux/actionType'; +import { ConsumerGroup } from '../interfaces/consumerGroup'; import { Broker, BrokerMetrics, @@ -51,3 +52,9 @@ export const createTopicAction = createAsyncAction( ActionType.POST_TOPIC__SUCCESS, ActionType.POST_TOPIC__FAILURE, )(); + +export const fetchConsumerGroupsAction = createAsyncAction( + ActionType.GET_CONSUMER_GROUPS__REQUEST, + ActionType.GET_CONSUMER_GROUPS__SUCCESS, + ActionType.GET_CONSUMER_GROUPS__FAILURE, +)(); diff --git a/kafka-ui-react-app/src/redux/actions/thunks.ts b/kafka-ui-react-app/src/redux/actions/thunks.ts index c23f43a56d9..da1ff10b956 100644 --- a/kafka-ui-react-app/src/redux/actions/thunks.ts +++ b/kafka-ui-react-app/src/redux/actions/thunks.ts @@ -77,3 +77,13 @@ export const createTopic = (clusterName: ClusterName, form: TopicFormData): Prom dispatch(actions.createTopicAction.failure()); } }; + +export const fetchConsumerGroupsList = (clusterName: ClusterName): PromiseThunk => async (dispatch) => { + dispatch(actions.fetchConsumerGroupsAction.request()); + try { + const consumerGroups = await api.getConsumerGroups(clusterName); + dispatch(actions.fetchConsumerGroupsAction.success(consumerGroups)); + } catch (e) { + dispatch(actions.fetchConsumerGroupsAction.failure()); + } +}; diff --git a/kafka-ui-react-app/src/redux/api/consumerGroups.ts b/kafka-ui-react-app/src/redux/api/consumerGroups.ts new file mode 100644 index 00000000000..225b60479e5 --- /dev/null +++ b/kafka-ui-react-app/src/redux/api/consumerGroups.ts @@ -0,0 +1,8 @@ +import { ClusterName } from '../interfaces/cluster'; +import { ConsumerGroup } from '../interfaces/consumerGroup'; +import { BASE_PARAMS, BASE_URL } from '../../lib/constants'; + + +export const getConsumerGroups = (clusterName: ClusterName): Promise => + fetch(`${BASE_URL}/clusters/${clusterName}/consumerGroups`, { ...BASE_PARAMS }) + .then(res => res.json()); diff --git a/kafka-ui-react-app/src/redux/api/index.ts b/kafka-ui-react-app/src/redux/api/index.ts index 491b0279a95..5031b4fbd9f 100644 --- a/kafka-ui-react-app/src/redux/api/index.ts +++ b/kafka-ui-react-app/src/redux/api/index.ts @@ -1,3 +1,4 @@ export * from './topics'; export * from './clusters'; export * from './brokers'; +export * from './consumerGroups'; diff --git a/kafka-ui-react-app/src/redux/interfaces/consumerGroup.ts b/kafka-ui-react-app/src/redux/interfaces/consumerGroup.ts new file mode 100644 index 00000000000..3008e5cf0f6 --- /dev/null +++ b/kafka-ui-react-app/src/redux/interfaces/consumerGroup.ts @@ -0,0 +1,5 @@ +export interface ConsumerGroup { + consumerGroupId: string; + numConsumers: number; + numTopics: number; +} \ No newline at end of file diff --git a/kafka-ui-react-app/src/redux/interfaces/index.ts b/kafka-ui-react-app/src/redux/interfaces/index.ts index 9878a7d90c5..304b0ea65a8 100644 --- a/kafka-ui-react-app/src/redux/interfaces/index.ts +++ b/kafka-ui-react-app/src/redux/interfaces/index.ts @@ -8,10 +8,12 @@ import { TopicsState } from './topic'; import { Cluster } from './cluster'; import { BrokersState } from './broker'; import { LoaderState } from './loader'; +import { ConsumerGroup } from './consumerGroup'; export * from './topic'; export * from './cluster'; export * from './broker'; +export * from './consumerGroup'; export * from './loader'; export enum FetchStatus { @@ -25,6 +27,7 @@ export interface RootState { topics: TopicsState; clusters: Cluster[]; brokers: BrokersState; + consumerGroups: ConsumerGroup[]; loader: LoaderState; } diff --git a/kafka-ui-react-app/src/redux/reducers/consumerGroups/reducer.ts b/kafka-ui-react-app/src/redux/reducers/consumerGroups/reducer.ts new file mode 100644 index 00000000000..bd0e8ec71da --- /dev/null +++ b/kafka-ui-react-app/src/redux/reducers/consumerGroups/reducer.ts @@ -0,0 +1,15 @@ +import { Action, ConsumerGroup } from 'redux/interfaces'; +import { ActionType } from 'redux/actionType'; + +export const initialState: ConsumerGroup[] = []; + +const reducer = (state = initialState, action: Action): ConsumerGroup[] => { + switch (action.type) { + case ActionType.GET_CONSUMER_GROUPS__SUCCESS: + return action.payload; + default: + return state; + } +}; + +export default reducer; diff --git a/kafka-ui-react-app/src/redux/reducers/consumerGroups/selectors.ts b/kafka-ui-react-app/src/redux/reducers/consumerGroups/selectors.ts new file mode 100644 index 00000000000..e471bc0afec --- /dev/null +++ b/kafka-ui-react-app/src/redux/reducers/consumerGroups/selectors.ts @@ -0,0 +1,15 @@ +import { createSelector } from 'reselect'; +import { ConsumerGroup, RootState, FetchStatus } from 'redux/interfaces'; +import { createFetchingSelector } from 'redux/reducers/loader/selectors'; + + +const consumerGroupsState = ({ consumerGroups }: RootState): ConsumerGroup[] => consumerGroups; + +const getConsumerGroupsListFetchingStatus = createFetchingSelector('GET_CONSUMER_GROUPS'); + +export const getIsConsumerGroupsListFetched = createSelector( + getConsumerGroupsListFetchingStatus, + (status) => status === FetchStatus.fetched, +); + +export const getConsumerGroupsList = createSelector(consumerGroupsState, (consumerGroups) => consumerGroups); \ No newline at end of file diff --git a/kafka-ui-react-app/src/redux/reducers/index.ts b/kafka-ui-react-app/src/redux/reducers/index.ts index babf641ae01..a93f407680b 100644 --- a/kafka-ui-react-app/src/redux/reducers/index.ts +++ b/kafka-ui-react-app/src/redux/reducers/index.ts @@ -2,6 +2,7 @@ import { combineReducers } from 'redux'; import topics from './topics/reducer'; import clusters from './clusters/reducer'; import brokers from './brokers/reducer'; +import consumerGroups from './consumerGroups/reducer'; import loader from './loader/reducer'; import { RootState } from 'redux/interfaces'; @@ -9,5 +10,6 @@ export default combineReducers({ topics, clusters, brokers, + consumerGroups, loader, }); From 0915be8c2ebb6f37aa436c80c2e72d3c8075cc59 Mon Sep 17 00:00:00 2001 From: Roman Nedzvetskiy Date: Wed, 1 Apr 2020 23:20:49 +0300 Subject: [PATCH 2/7] added endpoint for group consumers --- .../ui/cluster/service/ClusterService.java | 34 +++++++++++++++++- .../kafka/ui/rest/MetricsRestController.java | 6 ++++ .../main/resources/swagger/kafka-ui-api.yaml | 36 ++++++++++++++++++- 3 files changed, 74 insertions(+), 2 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java index f58762c29db..32c26a84efd 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java @@ -5,13 +5,25 @@ import com.provectus.kafka.ui.kafka.KafkaService; import com.provectus.kafka.ui.model.*; import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import org.apache.kafka.clients.admin.ConsumerGroupDescription; +import org.apache.kafka.clients.admin.ConsumerGroupListing; +import org.apache.kafka.clients.admin.KafkaAdminClient; +import org.apache.kafka.clients.admin.ListConsumerGroupsResult; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.util.List; +import javax.annotation.PostConstruct; +import java.util.*; +import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import java.util.stream.Stream; @Service @RequiredArgsConstructor @@ -58,4 +70,24 @@ public Mono> createTopic(String name, Mono if (cluster == null) return null; return kafkaService.createTopic(cluster, topicFormData); } + + @SneakyThrows + public Mono>> getConsumerGroup (String clusterName) { + KafkaCluster cluster = clustersStorage.getClusterByName(clusterName); + List consumerGroups = new ArrayList<>(); + List stringIds = cluster.getAdminClient().listConsumerGroups().all().get().stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList()); + System.out.println(cluster.getAdminClient().listConsumerGroupOffsets(stringIds.get(0))); + Map consumerGroupDescription = cluster.getAdminClient().describeConsumerGroups(stringIds).all().get(); + consumerGroupDescription.entrySet().forEach(s -> { + ConsumerGroup consumerGroup = new ConsumerGroup(); + consumerGroup.setClusterId(cluster.getCluster().getId()); + consumerGroup.setConsumerGroupId(s.getValue().groupId()); + consumerGroup.setNumConsumers(s.getValue().members().size()); + Set topics = new HashSet<>(); + s.getValue().members().forEach(s1 -> s1.assignment().topicPartitions().forEach(s2 -> topics.add(s2.topic()))); + consumerGroup.setNumTopics(topics.size()); + consumerGroups.add(consumerGroup); + }); + return Mono.just(ResponseEntity.ok(Flux.fromIterable(consumerGroups))); + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java index 60755c183d9..0842610bf38 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java @@ -4,6 +4,7 @@ import com.provectus.kafka.ui.cluster.service.ClusterService; import com.provectus.kafka.ui.model.*; import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.admin.ListConsumerGroupsResult; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.server.ServerWebExchange; @@ -53,4 +54,9 @@ public Mono> createTopic(String clusterId, @Valid Mono>> getBrokers(String clusterId, ServerWebExchange exchange) { return Mono.just(ResponseEntity.ok(Flux.fromIterable(new ArrayList<>()))); } + + @Override + public Mono>> getConsumerGroup(String clusterName, ServerWebExchange exchange) { + return clusterService.getConsumerGroup(clusterName); + } } diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 6fc71e20357..a85a88040c4 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -169,6 +169,28 @@ paths: items: $ref: '#/components/schemas/TopicConfig' + /api/clusters/{clusterName}/consumerGroups: + get: + tags: + - /api/clusters + summary: getConsumerGroup + operationId: getConsumerGroup + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/ConsumerGroup' + components: schemas: Cluster: @@ -307,4 +329,16 @@ components: type: object properties: id: - type: string \ No newline at end of file + type: string + + ConsumerGroup: + type: object + properties: + clusterId: + type: string + consumerGroupId: + type: string + numConsumers: + type: integer + numTopics: + type: integer \ No newline at end of file From 1310c21ebab88c2c7c3b24fa4414b8000fe58d9d Mon Sep 17 00:00:00 2001 From: Roman Nedzvetskiy Date: Thu, 2 Apr 2020 10:55:39 +0300 Subject: [PATCH 3/7] removed redundand code and imports --- .../kafka/ui/cluster/service/ClusterService.java | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java index 32c26a84efd..f5c9bf0387e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java @@ -8,22 +8,13 @@ import lombok.SneakyThrows; import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.ConsumerGroupListing; -import org.apache.kafka.clients.admin.KafkaAdminClient; -import org.apache.kafka.clients.admin.ListConsumerGroupsResult; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.TopicPartition; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import javax.annotation.PostConstruct; import java.util.*; -import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; -import java.util.stream.Stream; @Service @RequiredArgsConstructor @@ -76,7 +67,6 @@ public Mono>> getConsumerGroup (String cluste KafkaCluster cluster = clustersStorage.getClusterByName(clusterName); List consumerGroups = new ArrayList<>(); List stringIds = cluster.getAdminClient().listConsumerGroups().all().get().stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList()); - System.out.println(cluster.getAdminClient().listConsumerGroupOffsets(stringIds.get(0))); Map consumerGroupDescription = cluster.getAdminClient().describeConsumerGroups(stringIds).all().get(); consumerGroupDescription.entrySet().forEach(s -> { ConsumerGroup consumerGroup = new ConsumerGroup(); From 556c416432e6578f78baa0f78c021f1fee2828a5 Mon Sep 17 00:00:00 2001 From: Roman Nedzvetskiy Date: Thu, 2 Apr 2020 20:27:57 +0300 Subject: [PATCH 4/7] changed method to async mono --- .../ui/cluster/service/ClusterService.java | 53 +++++++++++++------ 1 file changed, 36 insertions(+), 17 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java index f5c9bf0387e..92458dc696f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java @@ -6,14 +6,17 @@ import com.provectus.kafka.ui.model.*; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; -import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.ConsumerGroupListing; +import org.apache.kafka.common.KafkaFuture; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.util.*; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import java.util.stream.Collectors; @Service @@ -62,22 +65,38 @@ public Mono> createTopic(String name, Mono return kafkaService.createTopic(cluster, topicFormData); } + private Mono toMono(KafkaFuture future){ + return Mono.create(sink-> future.whenComplete((res, ex)->{ + if(ex!=null) { + sink.error(ex); + } else { + sink.success(res); + } + })); + } + @SneakyThrows public Mono>> getConsumerGroup (String clusterName) { - KafkaCluster cluster = clustersStorage.getClusterByName(clusterName); - List consumerGroups = new ArrayList<>(); - List stringIds = cluster.getAdminClient().listConsumerGroups().all().get().stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList()); - Map consumerGroupDescription = cluster.getAdminClient().describeConsumerGroups(stringIds).all().get(); - consumerGroupDescription.entrySet().forEach(s -> { - ConsumerGroup consumerGroup = new ConsumerGroup(); - consumerGroup.setClusterId(cluster.getCluster().getId()); - consumerGroup.setConsumerGroupId(s.getValue().groupId()); - consumerGroup.setNumConsumers(s.getValue().members().size()); - Set topics = new HashSet<>(); - s.getValue().members().forEach(s1 -> s1.assignment().topicPartitions().forEach(s2 -> topics.add(s2.topic()))); - consumerGroup.setNumTopics(topics.size()); - consumerGroups.add(consumerGroup); - }); - return Mono.just(ResponseEntity.ok(Flux.fromIterable(consumerGroups))); + var cluster = clustersStorage.getClusterByName(clusterName); + return toMono(cluster.getAdminClient().listConsumerGroups().all()) + .map(s -> cluster.getAdminClient().describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all()) + .flatMap(s -> { + return toMono(s).map(c -> { + ArrayList result = new ArrayList<>(); + c.values().forEach(c1 -> { + ConsumerGroup consumerGroup = new ConsumerGroup(); + consumerGroup.setClusterId(cluster.getCluster().getId()); + consumerGroup.setConsumerGroupId(c1.groupId()); + consumerGroup.setNumConsumers(c1.members().size()); + Set topics = new HashSet<>(); + c1.members().forEach(s1 -> s1.assignment().topicPartitions().forEach(s2 -> topics.add(s2.topic()))); + consumerGroup.setNumTopics(topics.size()); + result.add(consumerGroup); + }); + return result; + }); + }).map(s -> { + return ResponseEntity.ok(Flux.fromIterable(s)); + }); } } From 4c6021552f36bad284f58b887400408278618894 Mon Sep 17 00:00:00 2001 From: Roman Nedzvetskiy Date: Thu, 2 Apr 2020 20:46:00 +0300 Subject: [PATCH 5/7] method located better --- .../ui/cluster/service/ClusterService.java | 42 +++++++++---------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java index 92458dc696f..e044248f40f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java @@ -77,26 +77,26 @@ private Mono toMono(KafkaFuture future){ @SneakyThrows public Mono>> getConsumerGroup (String clusterName) { - var cluster = clustersStorage.getClusterByName(clusterName); - return toMono(cluster.getAdminClient().listConsumerGroups().all()) - .map(s -> cluster.getAdminClient().describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all()) - .flatMap(s -> { - return toMono(s).map(c -> { - ArrayList result = new ArrayList<>(); - c.values().forEach(c1 -> { - ConsumerGroup consumerGroup = new ConsumerGroup(); - consumerGroup.setClusterId(cluster.getCluster().getId()); - consumerGroup.setConsumerGroupId(c1.groupId()); - consumerGroup.setNumConsumers(c1.members().size()); - Set topics = new HashSet<>(); - c1.members().forEach(s1 -> s1.assignment().topicPartitions().forEach(s2 -> topics.add(s2.topic()))); - consumerGroup.setNumTopics(topics.size()); - result.add(consumerGroup); - }); - return result; - }); - }).map(s -> { - return ResponseEntity.ok(Flux.fromIterable(s)); - }); + var cluster = clustersStorage.getClusterByName(clusterName); + return toMono(cluster.getAdminClient().listConsumerGroups().all()) + .map(s -> cluster.getAdminClient().describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all()) + .flatMap(s -> { + return toMono(s).map(c -> { + ArrayList result = new ArrayList<>(); + c.values().forEach(c1 -> { + ConsumerGroup consumerGroup = new ConsumerGroup(); + consumerGroup.setClusterId(cluster.getCluster().getId()); + consumerGroup.setConsumerGroupId(c1.groupId()); + consumerGroup.setNumConsumers(c1.members().size()); + Set topics = new HashSet<>(); + c1.members().forEach(s1 -> s1.assignment().topicPartitions().forEach(s2 -> topics.add(s2.topic()))); + consumerGroup.setNumTopics(topics.size()); + result.add(consumerGroup); + }); + return result; + }); + }).map(s -> { + return ResponseEntity.ok(Flux.fromIterable(s)); + }); } } From 6da3ea77a9b09d5ba49af780d26f559ed51f4df4 Mon Sep 17 00:00:00 2001 From: Roman Nedzvetskiy Date: Mon, 6 Apr 2020 13:00:49 +0300 Subject: [PATCH 6/7] changes after review --- .../ui/cluster/service/ClusterService.java | 39 +++++-------------- .../kafka/ui/cluster/util/ClusterUtil.java | 34 ++++++++++++++++ 2 files changed, 43 insertions(+), 30 deletions(-) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java index e044248f40f..10c87b9fca4 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java @@ -2,21 +2,19 @@ import com.provectus.kafka.ui.cluster.model.ClustersStorage; import com.provectus.kafka.ui.cluster.model.KafkaCluster; +import com.provectus.kafka.ui.cluster.util.ClusterUtil; import com.provectus.kafka.ui.kafka.KafkaService; import com.provectus.kafka.ui.model.*; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import org.apache.kafka.clients.admin.ConsumerGroupListing; -import org.apache.kafka.common.KafkaFuture; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.stream.Collectors; @Service @@ -65,38 +63,19 @@ public Mono> createTopic(String name, Mono return kafkaService.createTopic(cluster, topicFormData); } - private Mono toMono(KafkaFuture future){ - return Mono.create(sink-> future.whenComplete((res, ex)->{ - if(ex!=null) { - sink.error(ex); - } else { - sink.success(res); - } - })); - } - @SneakyThrows public Mono>> getConsumerGroup (String clusterName) { var cluster = clustersStorage.getClusterByName(clusterName); - return toMono(cluster.getAdminClient().listConsumerGroups().all()) - .map(s -> cluster.getAdminClient().describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all()) - .flatMap(s -> { - return toMono(s).map(c -> { + return ClusterUtil.toMono(cluster.getAdminClient().listConsumerGroups().all()) + .flatMap(s -> ClusterUtil.toMono(cluster.getAdminClient() + .describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all())) + .map(s -> { ArrayList result = new ArrayList<>(); - c.values().forEach(c1 -> { - ConsumerGroup consumerGroup = new ConsumerGroup(); - consumerGroup.setClusterId(cluster.getCluster().getId()); - consumerGroup.setConsumerGroupId(c1.groupId()); - consumerGroup.setNumConsumers(c1.members().size()); - Set topics = new HashSet<>(); - c1.members().forEach(s1 -> s1.assignment().topicPartitions().forEach(s2 -> topics.add(s2.topic()))); - consumerGroup.setNumTopics(topics.size()); + s.values().forEach(c -> { + ConsumerGroup consumerGroup = ClusterUtil.convertToConsumerGroup(c, cluster); result.add(consumerGroup); - }); + }); return result; - }); - }).map(s -> { - return ResponseEntity.ok(Flux.fromIterable(s)); - }); + }).map(s -> ResponseEntity.ok(Flux.fromIterable(s))); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java new file mode 100644 index 00000000000..159e9305173 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java @@ -0,0 +1,34 @@ +package com.provectus.kafka.ui.cluster.util; + +import com.provectus.kafka.ui.cluster.model.KafkaCluster; +import com.provectus.kafka.ui.model.ConsumerGroup; +import org.apache.kafka.clients.admin.ConsumerGroupDescription; +import org.apache.kafka.common.KafkaFuture; +import reactor.core.publisher.Mono; + +import java.util.HashSet; +import java.util.Set; + +public class ClusterUtil { + + public static Mono toMono(KafkaFuture future){ + return Mono.create(sink -> future.whenComplete((res, ex)->{ + if (ex!=null) { + sink.error(ex); + } else { + sink.success(res); + } + })); + } + + public static ConsumerGroup convertToConsumerGroup(ConsumerGroupDescription c, KafkaCluster cluster) { + ConsumerGroup consumerGroup = new ConsumerGroup(); + consumerGroup.setClusterId(cluster.getCluster().getId()); + consumerGroup.setConsumerGroupId(c.groupId()); + consumerGroup.setNumConsumers(c.members().size()); + Set topics = new HashSet<>(); + c.members().forEach(s1 -> s1.assignment().topicPartitions().forEach(s2 -> topics.add(s2.topic()))); + consumerGroup.setNumTopics(topics.size()); + return consumerGroup; + } +} From b8afb8b33a930a1d7642dbfc5ce55dc8f745be7d Mon Sep 17 00:00:00 2001 From: Roman Nedzvetskiy Date: Mon, 6 Apr 2020 14:23:36 +0300 Subject: [PATCH 7/7] changed foreach to map --- .../kafka/ui/cluster/service/ClusterService.java | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java index 10c87b9fca4..870b7b58f75 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java @@ -13,7 +13,6 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; @@ -69,13 +68,8 @@ public Mono>> getConsumerGroup (String cluste return ClusterUtil.toMono(cluster.getAdminClient().listConsumerGroups().all()) .flatMap(s -> ClusterUtil.toMono(cluster.getAdminClient() .describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all())) - .map(s -> { - ArrayList result = new ArrayList<>(); - s.values().forEach(c -> { - ConsumerGroup consumerGroup = ClusterUtil.convertToConsumerGroup(c, cluster); - result.add(consumerGroup); - }); - return result; - }).map(s -> ResponseEntity.ok(Flux.fromIterable(s))); + .map(s -> s.values().stream() + .map(c -> ClusterUtil.convertToConsumerGroup(c, cluster)).collect(Collectors.toList())) + .map(s -> ResponseEntity.ok(Flux.fromIterable(s))); } }