Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer groups list with search. #17

Merged
merged 7 commits into from
Apr 7, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,18 @@
import com.provectus.kafka.ui.kafka.KafkaService;
import com.provectus.kafka.ui.model.*;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.common.KafkaFuture;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

@Service
Expand Down Expand Up @@ -58,4 +64,39 @@ public Mono<ResponseEntity<Topic>> createTopic(String name, Mono<TopicFormData>
if (cluster == null) return null;
return kafkaService.createTopic(cluster, topicFormData);
}

private <T> Mono<T> toMono(KafkaFuture<T> future){
germanosin marked this conversation as resolved.
Show resolved Hide resolved
return Mono.create(sink-> future.whenComplete((res, ex)->{
if(ex!=null) {
sink.error(ex);
} else {
sink.success(res);
}
}));
}

@SneakyThrows
public Mono<ResponseEntity<Flux<ConsumerGroup>>> getConsumerGroup (String clusterName) {
var cluster = clustersStorage.getClusterByName(clusterName);
return toMono(cluster.getAdminClient().listConsumerGroups().all())
.map(s -> cluster.getAdminClient().describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all())
germanosin marked this conversation as resolved.
Show resolved Hide resolved
.flatMap(s -> {
return toMono(s).map(c -> {
ArrayList<ConsumerGroup> result = new ArrayList<>();
germanosin marked this conversation as resolved.
Show resolved Hide resolved
c.values().forEach(c1 -> {
ConsumerGroup consumerGroup = new ConsumerGroup();
consumerGroup.setClusterId(cluster.getCluster().getId());
consumerGroup.setConsumerGroupId(c1.groupId());
consumerGroup.setNumConsumers(c1.members().size());
Set<String> topics = new HashSet<>();
c1.members().forEach(s1 -> s1.assignment().topicPartitions().forEach(s2 -> topics.add(s2.topic())));
consumerGroup.setNumTopics(topics.size());
result.add(consumerGroup);
});
return result;
});
}).map(s -> {
return ResponseEntity.ok(Flux.fromIterable(s));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.provectus.kafka.ui.cluster.service.ClusterService;
import com.provectus.kafka.ui.model.*;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
Expand Down Expand Up @@ -53,4 +54,9 @@ public Mono<ResponseEntity<Topic>> createTopic(String clusterId, @Valid Mono<Top
public Mono<ResponseEntity<Flux<Broker>>> getBrokers(String clusterId, ServerWebExchange exchange) {
return Mono.just(ResponseEntity.ok(Flux.fromIterable(new ArrayList<>())));
}

@Override
public Mono<ResponseEntity<Flux<ConsumerGroup>>> getConsumerGroup(String clusterName, ServerWebExchange exchange) {
return clusterService.getConsumerGroup(clusterName);
}
}
36 changes: 35 additions & 1 deletion kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,28 @@ paths:
items:
$ref: '#/components/schemas/TopicConfig'

/api/clusters/{clusterName}/consumerGroups:
get:
tags:
- /api/clusters
summary: getConsumerGroup
operationId: getConsumerGroup
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
responses:
200:
description: OK
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/ConsumerGroup'

components:
schemas:
Cluster:
Expand Down Expand Up @@ -307,4 +329,16 @@ components:
type: object
properties:
id:
type: string
type: string

ConsumerGroup:
type: object
properties:
clusterId:
type: string
consumerGroupId:
type: string
numConsumers:
type: integer
numTopics:
type: integer
2 changes: 2 additions & 0 deletions kafka-ui-react-app/mock/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const brokerMetrics = require('./payload/brokerMetrics.json');
const topics = require('./payload/topics.json');
const topicDetails = require('./payload/topicDetails.json');
const topicConfigs = require('./payload/topicConfigs.json');
const consumerGroups = require('./payload/consumerGroups.json');

const db = {
clusters,
Expand All @@ -13,6 +14,7 @@ const db = {
topics: topics.map((topic) => ({...topic, id: topic.name})),
topicDetails,
topicConfigs,
consumerGroups: consumerGroups.map((group) => ({...group, id: group.consumerGroupId}))
};
const server = jsonServer.create();
const router = jsonServer.router(db);
Expand Down
39 changes: 39 additions & 0 deletions kafka-ui-react-app/mock/payload/consumerGroups.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
[
{
"clusterId": "fake.cluster",
"consumerGroupId": "_fake.cluster.consumer_1",
"numConsumers": 1,
"numTopics": 11
},
{
"clusterId": "fake.cluster",
"consumerGroupId": "_fake.cluster.consumer_2",
"numConsumers": 2,
"numTopics": 22
},
{
"clusterId": "fake.cluster",
"consumerGroupId": "_fake.cluster.consumer_3",
"numConsumers": 3,
"numTopics": 33
},

{
"clusterId": "kafka-ui.cluster",
"consumerGroupId": "_kafka-ui.cluster.consumer_1",
"numConsumers": 4,
"numTopics": 44
},
{
"clusterId": "kafka-ui.cluster",
"consumerGroupId": "_kafka-ui.cluster.consumer_2",
"numConsumers": 5,
"numTopics": 55
},
{
"clusterId": "kafka-ui.cluster",
"consumerGroupId": "_kafka-ui.cluster.consumer_3",
"numConsumers": 6,
"numTopics": 66
}
]
2 changes: 2 additions & 0 deletions kafka-ui-react-app/src/components/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import TopicsContainer from './Topics/TopicsContainer';
import NavConatiner from './Nav/NavConatiner';
import PageLoader from './common/PageLoader/PageLoader';
import Dashboard from './Dashboard/Dashboard';
import ConsumersGroupsContainer from './ConsumerGroups/ConsumersGroupsContainer';

interface AppProps {
isClusterListFetched: boolean;
Expand Down Expand Up @@ -39,6 +40,7 @@ const App: React.FC<AppProps> = ({
<Route exact path="/clusters" component={Dashboard} />
<Route path="/clusters/:clusterName/topics" component={TopicsContainer} />
<Route path="/clusters/:clusterName/brokers" component={BrokersContainer} />
<Route path="/clusters/:clusterName/consumer-groups" component={ConsumersGroupsContainer} />
<Redirect from="/clusters/:clusterName" to="/clusters/:clusterName/brokers" />
</Switch>
) : (
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import React from 'react';
import { ClusterName } from 'redux/interfaces';
import {
Switch,
Route,
} from 'react-router-dom';
import ListContainer from './List/ListContainer';
import PageLoader from 'components/common/PageLoader/PageLoader';

interface Props {
clusterName: ClusterName;
isFetched: boolean;
fetchConsumerGroupsList: (clusterName: ClusterName) => void;
}

const ConsumerGroups: React.FC<Props> = ({
clusterName,
isFetched,
fetchConsumerGroupsList,
}) => {
React.useEffect(() => { fetchConsumerGroupsList(clusterName); }, [fetchConsumerGroupsList, clusterName]);

if (isFetched) {
return (
<Switch>
<Route exact path="/clusters/:clusterName/consumer-groups" component={ListContainer} />
</Switch>
);
}

return (<PageLoader />);
};

export default ConsumerGroups;
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { connect } from 'react-redux';
import { fetchConsumerGroupsList } from 'redux/actions';
import { RootState, ClusterName } from 'redux/interfaces';
import { RouteComponentProps } from 'react-router-dom';
import ConsumerGroups from './ConsumerGroups';
import { getIsConsumerGroupsListFetched } from '../../redux/reducers/consumerGroups/selectors';


interface RouteProps {
clusterName: ClusterName;
}

interface OwnProps extends RouteComponentProps<RouteProps> { }

const mapStateToProps = (state: RootState, { match: { params: { clusterName } }}: OwnProps) => ({
isFetched: getIsConsumerGroupsListFetched(state),
clusterName,
});

const mapDispatchToProps = {
fetchConsumerGroupsList: (clusterName: ClusterName) => fetchConsumerGroupsList(clusterName),
};

export default connect(mapStateToProps, mapDispatchToProps)(ConsumerGroups);
64 changes: 64 additions & 0 deletions kafka-ui-react-app/src/components/ConsumerGroups/List/List.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import React, { ChangeEvent } from 'react';
import { ConsumerGroup, ClusterName } from 'redux/interfaces';
import ListItem from './ListItem';
import Breadcrumb from 'components/common/Breadcrumb/Breadcrumb';

interface Props {
clusterName: ClusterName;
consumerGroups: (ConsumerGroup)[];
}

const List: React.FC<Props> = ({
consumerGroups,
}) => {

const [searchText, setSearchText] = React.useState<string>('');

const handleInputChange = (event: React.ChangeEvent<HTMLInputElement>) => {
setSearchText(event.target.value);
};

const items = consumerGroups;

return (
<div className="section">
<Breadcrumb>All Consumer Groups</Breadcrumb>

<div className="box">
<div className="columns">
<div className="column is-half is-offset-half">
<input id="searchText"
type="text"
name="searchText"
className="input"
placeholder="Search"
value={searchText}
onChange={handleInputChange}
/>
</div>
</div>
<table className="table is-striped is-fullwidth">
<thead>
<tr>
<th>Consumer group ID</th>
<th>Num of consumers</th>
<th>Num of topics</th>
</tr>
</thead>
<tbody>
{items
.filter( (consumerGroup) => !searchText || consumerGroup?.consumerGroupId?.indexOf(searchText) >= 0)
.map((consumerGroup, index) => (
<ListItem
key={`consumer-group-list-item-key-${index}`}
{...consumerGroup}
/>
))}
</tbody>
</table>
</div>
</div>
);
};

export default List;
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { connect } from 'react-redux';
import {ClusterName, RootState} from 'redux/interfaces';
import { getConsumerGroupsList } from 'redux/reducers/consumerGroups/selectors';
import List from './List';
import { withRouter, RouteComponentProps } from 'react-router-dom';

interface RouteProps {
clusterName: ClusterName;
}

interface OwnProps extends RouteComponentProps<RouteProps> { }

const mapStateToProps = (state: RootState, { match: { params: { clusterName } } }: OwnProps) => ({
clusterName,
consumerGroups: getConsumerGroupsList(state)
});

export default withRouter(
connect(mapStateToProps)(List)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import React from 'react';
import { NavLink } from 'react-router-dom';
import { ConsumerGroup } from 'redux/interfaces';

const ListItem: React.FC<ConsumerGroup> = ({
consumerGroupId,
numConsumers,
numTopics,
}) => {
return (
<tr>
{/* <td>
<NavLink exact to={`consumer-groups/${consumerGroupId}`} activeClassName="is-active" className="title is-6">
{consumerGroupId}
</NavLink>
</td> */}
<td>{consumerGroupId}</td>
<td>{numConsumers}</td>
<td>{numTopics}</td>
</tr>
);
}

export default ListItem;
5 changes: 4 additions & 1 deletion kafka-ui-react-app/src/components/Nav/ClusterMenu.tsx
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import React, { CSSProperties } from 'react';
import { Cluster } from 'redux/interfaces';
import { NavLink } from 'react-router-dom';
import { clusterBrokersPath, clusterTopicsPath } from 'lib/paths';
import { clusterBrokersPath, clusterTopicsPath, clusterConsumerGroupsPath } from 'lib/paths';

interface Props extends Cluster {}

Expand Down Expand Up @@ -37,6 +37,9 @@ const ClusterMenu: React.FC<Props> = ({
<NavLink to={clusterTopicsPath(name)} activeClassName="is-active" title="Topics">
Topics
</NavLink>
<NavLink to={clusterConsumerGroupsPath(name)} activeClassName="is-active" title="Consumers">
Consumers
</NavLink>
</ul>
</li>
</ul>
Expand Down
2 changes: 2 additions & 0 deletions kafka-ui-react-app/src/lib/paths.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@ export const clusterTopicNewPath = (clusterName: ClusterName) => `${clusterPath(
export const clusterTopicPath = (clusterName: ClusterName, topicName: TopicName) => `${clusterTopicsPath(clusterName)}/${topicName}`;
export const clusterTopicSettingsPath = (clusterName: ClusterName, topicName: TopicName) => `${clusterTopicsPath(clusterName)}/${topicName}/settings`;
export const clusterTopicMessagesPath = (clusterName: ClusterName, topicName: TopicName) => `${clusterTopicsPath(clusterName)}/${topicName}/messages`;

export const clusterConsumerGroupsPath = (clusterName: ClusterName) => `${clusterPath(clusterName)}/consumer-groups`;
Loading