Skip to content

Commit

Permalink
Merge branch 'master' into feature/14-add-custom-params-for-topics-cr…
Browse files Browse the repository at this point in the history
…eation
  • Loading branch information
Gataniel committed Apr 10, 2020
2 parents 3c4c9c1 + bdea709 commit e4dbebb
Show file tree
Hide file tree
Showing 29 changed files with 447 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@

import com.provectus.kafka.ui.cluster.model.ClustersStorage;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import com.provectus.kafka.ui.cluster.util.ClusterUtil;
import com.provectus.kafka.ui.kafka.KafkaService;
import com.provectus.kafka.ui.model.*;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -58,4 +61,15 @@ public Mono<ResponseEntity<Topic>> createTopic(String name, Mono<TopicFormData>
if (cluster == null) return null;
return kafkaService.createTopic(cluster, topicFormData);
}

@SneakyThrows
public Mono<ResponseEntity<Flux<ConsumerGroup>>> getConsumerGroup (String clusterName) {
var cluster = clustersStorage.getClusterByName(clusterName);
return ClusterUtil.toMono(cluster.getAdminClient().listConsumerGroups().all())
.flatMap(s -> ClusterUtil.toMono(cluster.getAdminClient()
.describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all()))
.map(s -> s.values().stream()
.map(c -> ClusterUtil.convertToConsumerGroup(c, cluster)).collect(Collectors.toList()))
.map(s -> ResponseEntity.ok(Flux.fromIterable(s)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.provectus.kafka.ui.cluster.util;

import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import com.provectus.kafka.ui.model.ConsumerGroup;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.common.KafkaFuture;
import reactor.core.publisher.Mono;

import java.util.HashSet;
import java.util.Set;

public class ClusterUtil {

public static <T> Mono<T> toMono(KafkaFuture<T> future){
return Mono.create(sink -> future.whenComplete((res, ex)->{
if (ex!=null) {
sink.error(ex);
} else {
sink.success(res);
}
}));
}

public static ConsumerGroup convertToConsumerGroup(ConsumerGroupDescription c, KafkaCluster cluster) {
ConsumerGroup consumerGroup = new ConsumerGroup();
consumerGroup.setClusterId(cluster.getCluster().getId());
consumerGroup.setConsumerGroupId(c.groupId());
consumerGroup.setNumConsumers(c.members().size());
Set<String> topics = new HashSet<>();
c.members().forEach(s1 -> s1.assignment().topicPartitions().forEach(s2 -> topics.add(s2.topic())));
consumerGroup.setNumTopics(topics.size());
return consumerGroup;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.provectus.kafka.ui.cluster.service.ClusterService;
import com.provectus.kafka.ui.model.*;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
Expand Down Expand Up @@ -53,4 +54,9 @@ public Mono<ResponseEntity<Topic>> createTopic(String clusterId, @Valid Mono<Top
public Mono<ResponseEntity<Flux<Broker>>> getBrokers(String clusterId, ServerWebExchange exchange) {
return Mono.just(ResponseEntity.ok(Flux.fromIterable(new ArrayList<>())));
}

@Override
public Mono<ResponseEntity<Flux<ConsumerGroup>>> getConsumerGroup(String clusterName, ServerWebExchange exchange) {
return clusterService.getConsumerGroup(clusterName);
}
}
36 changes: 35 additions & 1 deletion kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,28 @@ paths:
items:
$ref: '#/components/schemas/TopicConfig'

/api/clusters/{clusterName}/consumerGroups:
get:
tags:
- /api/clusters
summary: getConsumerGroup
operationId: getConsumerGroup
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
responses:
200:
description: OK
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/ConsumerGroup'

components:
schemas:
Cluster:
Expand Down Expand Up @@ -307,4 +329,16 @@ components:
type: object
properties:
id:
type: string
type: string

ConsumerGroup:
type: object
properties:
clusterId:
type: string
consumerGroupId:
type: string
numConsumers:
type: integer
numTopics:
type: integer
2 changes: 2 additions & 0 deletions kafka-ui-react-app/mock/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const brokerMetrics = require('./payload/brokerMetrics.json');
const topics = require('./payload/topics.json');
const topicDetails = require('./payload/topicDetails.json');
const topicConfigs = require('./payload/topicConfigs.json');
const consumerGroups = require('./payload/consumerGroups.json');

const db = {
clusters,
Expand All @@ -13,6 +14,7 @@ const db = {
topics: topics.map((topic) => ({...topic, id: topic.name})),
topicDetails,
topicConfigs,
consumerGroups: consumerGroups.map((group) => ({...group, id: group.consumerGroupId}))
};
const server = jsonServer.create();
const router = jsonServer.router(db);
Expand Down
39 changes: 39 additions & 0 deletions kafka-ui-react-app/mock/payload/consumerGroups.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
[
{
"clusterId": "fake.cluster",
"consumerGroupId": "_fake.cluster.consumer_1",
"numConsumers": 1,
"numTopics": 11
},
{
"clusterId": "fake.cluster",
"consumerGroupId": "_fake.cluster.consumer_2",
"numConsumers": 2,
"numTopics": 22
},
{
"clusterId": "fake.cluster",
"consumerGroupId": "_fake.cluster.consumer_3",
"numConsumers": 3,
"numTopics": 33
},

{
"clusterId": "kafka-ui.cluster",
"consumerGroupId": "_kafka-ui.cluster.consumer_1",
"numConsumers": 4,
"numTopics": 44
},
{
"clusterId": "kafka-ui.cluster",
"consumerGroupId": "_kafka-ui.cluster.consumer_2",
"numConsumers": 5,
"numTopics": 55
},
{
"clusterId": "kafka-ui.cluster",
"consumerGroupId": "_kafka-ui.cluster.consumer_3",
"numConsumers": 6,
"numTopics": 66
}
]
13 changes: 13 additions & 0 deletions kafka-ui-react-app/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions kafka-ui-react-app/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"json-server": "^0.15.1",
"lodash": "^4.17.15",
"node-sass": "^4.13.1",
"pretty-ms": "^6.0.1",
"react": "^16.12.0",
"react-dom": "^16.12.0",
"react-hook-form": "^4.5.5",
Expand Down
2 changes: 2 additions & 0 deletions kafka-ui-react-app/src/components/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import TopicsContainer from './Topics/TopicsContainer';
import NavConatiner from './Nav/NavConatiner';
import PageLoader from './common/PageLoader/PageLoader';
import Dashboard from './Dashboard/Dashboard';
import ConsumersGroupsContainer from './ConsumerGroups/ConsumersGroupsContainer';

interface AppProps {
isClusterListFetched: boolean;
Expand Down Expand Up @@ -39,6 +40,7 @@ const App: React.FC<AppProps> = ({
<Route exact path="/clusters" component={Dashboard} />
<Route path="/clusters/:clusterName/topics" component={TopicsContainer} />
<Route path="/clusters/:clusterName/brokers" component={BrokersContainer} />
<Route path="/clusters/:clusterName/consumer-groups" component={ConsumersGroupsContainer} />
<Redirect from="/clusters/:clusterName" to="/clusters/:clusterName/brokers" />
</Switch>
) : (
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import React from 'react';
import { ClusterName } from 'redux/interfaces';
import {
Switch,
Route,
} from 'react-router-dom';
import ListContainer from './List/ListContainer';
import PageLoader from 'components/common/PageLoader/PageLoader';

interface Props {
clusterName: ClusterName;
isFetched: boolean;
fetchConsumerGroupsList: (clusterName: ClusterName) => void;
}

const ConsumerGroups: React.FC<Props> = ({
clusterName,
isFetched,
fetchConsumerGroupsList,
}) => {
React.useEffect(() => { fetchConsumerGroupsList(clusterName); }, [fetchConsumerGroupsList, clusterName]);

if (isFetched) {
return (
<Switch>
<Route exact path="/clusters/:clusterName/consumer-groups" component={ListContainer} />
</Switch>
);
}

return (<PageLoader />);
};

export default ConsumerGroups;
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { connect } from 'react-redux';
import { fetchConsumerGroupsList } from 'redux/actions';
import { RootState, ClusterName } from 'redux/interfaces';
import { RouteComponentProps } from 'react-router-dom';
import ConsumerGroups from './ConsumerGroups';
import { getIsConsumerGroupsListFetched } from '../../redux/reducers/consumerGroups/selectors';


interface RouteProps {
clusterName: ClusterName;
}

interface OwnProps extends RouteComponentProps<RouteProps> { }

const mapStateToProps = (state: RootState, { match: { params: { clusterName } }}: OwnProps) => ({
isFetched: getIsConsumerGroupsListFetched(state),
clusterName,
});

const mapDispatchToProps = {
fetchConsumerGroupsList: (clusterName: ClusterName) => fetchConsumerGroupsList(clusterName),
};

export default connect(mapStateToProps, mapDispatchToProps)(ConsumerGroups);
64 changes: 64 additions & 0 deletions kafka-ui-react-app/src/components/ConsumerGroups/List/List.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import React, { ChangeEvent } from 'react';
import { ConsumerGroup, ClusterName } from 'redux/interfaces';
import ListItem from './ListItem';
import Breadcrumb from 'components/common/Breadcrumb/Breadcrumb';

interface Props {
clusterName: ClusterName;
consumerGroups: (ConsumerGroup)[];
}

const List: React.FC<Props> = ({
consumerGroups,
}) => {

const [searchText, setSearchText] = React.useState<string>('');

const handleInputChange = (event: React.ChangeEvent<HTMLInputElement>) => {
setSearchText(event.target.value);
};

const items = consumerGroups;

return (
<div className="section">
<Breadcrumb>All Consumer Groups</Breadcrumb>

<div className="box">
<div className="columns">
<div className="column is-half is-offset-half">
<input id="searchText"
type="text"
name="searchText"
className="input"
placeholder="Search"
value={searchText}
onChange={handleInputChange}
/>
</div>
</div>
<table className="table is-striped is-fullwidth">
<thead>
<tr>
<th>Consumer group ID</th>
<th>Num of consumers</th>
<th>Num of topics</th>
</tr>
</thead>
<tbody>
{items
.filter( (consumerGroup) => !searchText || consumerGroup?.consumerGroupId?.indexOf(searchText) >= 0)
.map((consumerGroup, index) => (
<ListItem
key={`consumer-group-list-item-key-${index}`}
{...consumerGroup}
/>
))}
</tbody>
</table>
</div>
</div>
);
};

export default List;
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { connect } from 'react-redux';
import {ClusterName, RootState} from 'redux/interfaces';
import { getConsumerGroupsList } from 'redux/reducers/consumerGroups/selectors';
import List from './List';
import { withRouter, RouteComponentProps } from 'react-router-dom';

interface RouteProps {
clusterName: ClusterName;
}

interface OwnProps extends RouteComponentProps<RouteProps> { }

const mapStateToProps = (state: RootState, { match: { params: { clusterName } } }: OwnProps) => ({
clusterName,
consumerGroups: getConsumerGroupsList(state)
});

export default withRouter(
connect(mapStateToProps)(List)
);
Loading

0 comments on commit e4dbebb

Please sign in to comment.