Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

broker管理查询分区接口改为从nameserver中查询 #324

Merged
merged 9 commits into from
Dec 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
*/
package org.joyqueue.repository;

import org.apache.ibatis.annotations.Param;
import org.joyqueue.model.Uniqueable;
import org.joyqueue.model.domain.ApplicationUser;
import org.joyqueue.model.query.QApplicationUser;
import org.springframework.stereotype.Repository;

import java.util.List;

/**
* 应用-用户关联关系 仓库
* Created by chenyanying3 on 2018-10-15
Expand All @@ -28,4 +31,5 @@
public interface ApplicationUserRepository extends PageRepository<ApplicationUser, QApplicationUser>, Uniqueable<ApplicationUser> {
ApplicationUser findByUserApp(ApplicationUser applicationUser);
int deleteByAppId(long appId);
List<String> findAppByUser(@Param("user") String user);
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,15 @@
<include refid="baseColumn"/>
FROM
application_user
WHERE app_code = #{application.code} and user_code=#{user.code}
WHERE app_code = #{application.code} and user_code=#{user.code} and status != -1
</select>

<select id="findAppByUser" resultMap="baseResultMap" parameterType="java.lang.String">
SELECT
app_code
FROM
application_user
WHERE user_code=#{user} and status != -1
</select>

</mapper>
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ public List<Replica> getByBrokerId(Integer brokerId) {
ReplicaQuery replicaQuery = new ReplicaQuery();
replicaQuery.setBrokerId(brokerId);
String result = post(POSTBY_REPLICA_BROKER, replicaQuery);
List<Replica> replicas = JSON.parseArray(result, Replica.class);
return null;
return JSON.parseArray(result, Replica.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
import org.joyqueue.model.domain.ApplicationUser;
import org.joyqueue.model.query.QApplicationUser;

import java.util.List;

/**
* 应用-用户关联关系 服务
* Created by chenyanying on 2018-10-17.
*/
public interface ApplicationUserService extends PageService<ApplicationUser, QApplicationUser>, Uniqueable<ApplicationUser> {
ApplicationUser findByUserApp(String user, String app);
int deleteByAppId(long appId);
List<String> findAppByUser(String user);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,5 @@ public interface BrokerTopicMonitorService {
BrokerMonitorInfo findBrokerMonitor(Long brokerId);
BrokerStartupInfo getStartupInfo(Long brokerId) throws Exception;
List<String> queryTopicList(Long brokerId) throws Exception;
List<BrokerTopicMonitor> queryTopicsPartitionMonitors(Integer brokerId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,6 @@ public interface ConsumerService extends NsrService<Consumer, String> {

List<String> findAppsByTopic(String topic) throws Exception;

void updateAllConsumerRegion(String app, String subscribeGroup, String region) throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.joyqueue.service.ApplicationUserService;
import org.springframework.stereotype.Service;

import java.util.List;

/**
* 应用-用户关联关系 服务
* Created by chenyanying3 on 2018-10-15
Expand All @@ -41,4 +43,9 @@ public ApplicationUser findByUserApp(String user, String app){
public int deleteByAppId(long appId) {
return repository.deleteByAppId(appId);
}

@Override
public List<String> findAppByUser(String user) {
return repository.findAppByUser(user);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.alibaba.fastjson.JSONObject;
import org.joyqueue.convert.CodeConverter;
import org.joyqueue.domain.Replica;
import org.joyqueue.manage.PartitionGroupMetric;
import org.joyqueue.model.PageResult;
import org.joyqueue.model.Pagination;
Expand All @@ -35,6 +36,7 @@
import org.joyqueue.monitor.ConsumerMonitorInfo;
import org.joyqueue.monitor.ProducerMonitorInfo;
import org.joyqueue.monitor.RestResponse;
import org.joyqueue.nsr.PartitionGroupServerService;
import org.joyqueue.other.HttpRestService;
import org.joyqueue.service.BrokerService;
import org.joyqueue.service.BrokerTopicMonitorService;
Expand All @@ -48,7 +50,9 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
Expand All @@ -65,6 +69,8 @@ public class BrokerTopicMonitorServiceImpl implements BrokerTopicMonitorService
private ConsumerService consumerService;
@Autowired
private ProducerService producerService;
@Autowired
private PartitionGroupServerService partitionGroupServerService;


@Override
Expand Down Expand Up @@ -181,6 +187,23 @@ public List<String> queryTopicList(Long brokerId) throws Exception {
return queryTopicList(broker);
}

@Override
public List<BrokerTopicMonitor> queryTopicsPartitionMonitors(Integer brokerId) {
List<BrokerTopicMonitor> brokerTopicMonitorList = new ArrayList<>();
try {
Map<String, List<PartitionGroupMetric>> partitionGroupMetricMap = getPartitionGroupMetricMap(brokerId);
for (Map.Entry<String, List<PartitionGroupMetric>> entry: partitionGroupMetricMap.entrySet()) {
BrokerTopicMonitor brokerTopicMonitor = new BrokerTopicMonitor();
brokerTopicMonitor.setTopic(entry.getKey());
brokerTopicMonitor.setPartitionGroupMetricList(entry.getValue());
brokerTopicMonitorList.add(brokerTopicMonitor);
}
} catch (Exception e) {
logger.error("queryTopicsPartitionMointor exception", e);
}
return brokerTopicMonitorList;
}

private BrokerTopicMonitor getMonitorByAppAndTopic(String topic, List<String> appList, Broker broker, SubscribeType type) throws Exception {
BrokerTopicMonitor brokerTopicMonitor = new BrokerTopicMonitor();
List<BrokerTopicMonitorRecord> brokerMonitorRecordList = new ArrayList<>();
Expand Down Expand Up @@ -335,6 +358,22 @@ private List<PartitionGroupMetric> getPartitionGroup(String topic, Broker broker
return null;
}

private Map<String, List<PartitionGroupMetric>> getPartitionGroupMetricMap(Integer brokerId) {
Map<String, List<Replica>> replicaMap = partitionGroupServerService.getByBrokerId(brokerId)
.stream().collect(Collectors.groupingBy(replica -> replica.getTopic().getCode()));
Map<String, List<PartitionGroupMetric>> map = new HashMap<>();
for (Map.Entry<String, List<Replica>> entry: replicaMap.entrySet()) {
List<PartitionGroupMetric> metrics = new ArrayList<>(entry.getValue().size());
for (Replica replica: entry.getValue()) {
PartitionGroupMetric metric = new PartitionGroupMetric();
metric.setPartitionGroup(replica.getGroup());
metrics.add(metric);
}
map.put(entry.getKey(), metrics);
}
return map;
}

private JSONObject queryMonitorConsumers(Broker broker, int page, int pageSize) {
String path = "consumerInfos";
String[] args = new String[4];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,4 +217,26 @@ protected Consumer fillConsumer(Consumer consumer) {
return consumer;
}

@Override
public void updateAllConsumerRegion(String app, String subscribeGroup, String region) throws Exception {
List<Consumer> consumers = findByApp(app);
if (StringUtils.isNotBlank(subscribeGroup)) {
consumers = consumers.stream().filter(item -> StringUtils.isNotBlank(item.getSubscribeGroup())
&& subscribeGroup.equals(item.getSubscribeGroup()))
.collect(Collectors.toList());
} else {
consumers = consumers.stream().filter(item -> StringUtils.isBlank(item.getSubscribeGroup()))
.collect(Collectors.toList());
}
for (Consumer consumerItem: consumers) {
if (consumerItem.getConfig() != null) {
if (StringUtils.isNotBlank(region)) {
consumerItem.getConfig().setRegion(region);
} else {
consumerItem.getConfig().setRegion(null);
}
this.update(consumerItem);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,24 +59,6 @@ export default {
}
return h('div', {}, html)
}
},
{
title: 'partition',
key: 'partitionGroupMetricList',
render: (h, params) => {
var list = params.item.partitionGroupMetricList;
if (list!= null && list.length > 0) {
list = list.slice().sort((a,b) => a.partitionGroup-b.partitionGroup)
}
var html = []
if (list != undefined) {
for (var i = 0; i < list.length; i++) {
var p = h('div', {style: 'border-bottom: 1px solid #ECECEC;'}, mergePartitionGroup(JSON.parse('[' + list[i].partitions + ']')))
html.push(p)
}
}
return h('div', {}, html)
}
}
]
// 表格操作,如果需要根据特定值隐藏显示, 设置bindKey对应的属性名和bindVal对应的属性值
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,12 @@
<route inherit="get"
path="/v1/consumer/get/:id" handlers="consumer#get render"
errors="error"/>
<route inherit="get"
path="/v1/consumer/checkRegion" handlers="consumer#checkRegion render"
errors="error"/>
<route inherit="get"
path="/v1/consumer/updateRegion" handlers="consumer#updateRegion render"
errors="error"/>
<!--<route inherit="get"-->
<!--path="/v1/consumer/syncMqttClient" handlers="consumer#syncMqttClient render"-->
<!--errors="error"/>-->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.joyqueue.handler.error.ErrorCode;
import org.joyqueue.model.BrokerMetadata;
import org.joyqueue.model.PageResult;
import org.joyqueue.model.Pagination;
import org.joyqueue.model.QPageQuery;
import org.joyqueue.model.domain.BrokerClient;
import org.joyqueue.model.domain.BrokerMonitorInfoWithDC;
Expand Down Expand Up @@ -244,7 +245,10 @@ public Response brokerMonitor(@PageQuery QPageQuery<QMonitor> qPageQuery){
@Path("partitionGroupMonitor")
public Response partitionGroupMonitor(@PageQuery QPageQuery<QMonitor> qPageQuery){
try {
PageResult<BrokerTopicMonitor> pageResult = brokerTopicMonitorService.queryTopicsPartitionMointor(qPageQuery);
List<BrokerTopicMonitor> brokerTopicMonitors = brokerTopicMonitorService.queryTopicsPartitionMonitors(Integer.valueOf(String.valueOf(qPageQuery.getQuery().getBrokerId())));
PageResult<BrokerTopicMonitor> pageResult = new PageResult<>();
pageResult.setResult(brokerTopicMonitors);
pageResult.setPagination(new Pagination(0, brokerTopicMonitors.size() - 1));
return new Response(pageResult.getResult(), pageResult.getPagination());
} catch (Exception e) {
logger.error("query broker monitor info error.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,4 +249,38 @@ public Response findAppsByTopic(@QueryParam(Constants.TOPIC)String topic) throws
public Response update(@QueryParam(ID)String id,@Body Consumer model) throws Exception {
return super.update(id, model);
}

@Path("checkRegion")
public Response checkRegion(@QueryParam("app") String app,
@QueryParam("subscribeGroup") String subscribeGroup,
@QueryParam("region") String region) throws Exception {
if (StringUtils.isNotBlank(app) && StringUtils.isNotBlank(region)) {
List<Consumer> consumers = consumerNameServerService.findByApp(app);
if (StringUtils.isNotBlank(subscribeGroup)) {
consumers = consumers.stream().filter(consumer -> subscribeGroup.equals(consumer.getSubscribeGroup()))
.collect(Collectors.toList());
}
consumers = consumers.stream().filter(consumer -> consumer.getConfig() != null
&& StringUtils.isNotBlank(consumer.getConfig().getRegion())
&& !region.equals(consumer.getConfig().getRegion()))
.collect(Collectors.toList());
return Responses.success(consumers.size() == 0);

}
return Responses.error(500, "app, region can't be empty");
}

@Path("updateRegion")
public Response updateRegion(@QueryParam("consumerId") String consumerId,
@QueryParam("region") String region) throws Exception {
if (StringUtils.isNotBlank(consumerId)) {
Consumer consumer = service.findById(consumerId);
String app = consumer.getApp().getCode();
String subscribeGroup = consumer.getSubscribeGroup();
service.updateAllConsumerRegion(app, subscribeGroup, region);
return Responses.success();

}
return Responses.error(500, "consumerId can't be empty");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ public ReadResult read(short partition, long index, int count, long maxSize) thr
continue;
}
if (indexItem.getOffset() >= commitPosition()) {
continue;
break;
}
try {
ByteBuffer log;
Expand Down