From d752dc5976d4bee5ef946b6f53bb2309b2b7c1f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B1=9F=E6=A5=A0?= Date: Mon, 7 Sep 2020 18:10:26 +0800 Subject: [PATCH] =?UTF-8?q?broker=E7=AE=A1=E7=90=86=E6=8E=A5=E5=8F=A3?= =?UTF-8?q?=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../domain/BrokerTopicMonitorRecord.java | 9 ++ .../impl/BrokerRestUrlMappingServiceImpl.java | 2 + .../impl/BrokerTopicMonitorServiceImpl.java | 124 ++++++++++++++--- .../src/views/setting/brokerMonitor.vue | 130 ++++++++++++++++-- .../joyqueue-portal/src/views/topic/index.vue | 8 +- 5 files changed, 239 insertions(+), 34 deletions(-) diff --git a/joyqueue-console/joyqueue-data/joyqueue-data-model/src/main/java/org/joyqueue/model/domain/BrokerTopicMonitorRecord.java b/joyqueue-console/joyqueue-data/joyqueue-data-model/src/main/java/org/joyqueue/model/domain/BrokerTopicMonitorRecord.java index b464789d2..c08e85782 100644 --- a/joyqueue-console/joyqueue-data/joyqueue-data-model/src/main/java/org/joyqueue/model/domain/BrokerTopicMonitorRecord.java +++ b/joyqueue-console/joyqueue-data/joyqueue-data-model/src/main/java/org/joyqueue/model/domain/BrokerTopicMonitorRecord.java @@ -29,6 +29,7 @@ public class BrokerTopicMonitorRecord { private long retryCount; private long retryTps; private long backlog; + private long traffic; public String getApp() { return app; @@ -101,4 +102,12 @@ public long getTps() { public void setTps(long tps) { this.tps = tps; } + + public long getTraffic() { + return traffic; + } + + public void setTraffic(long traffic) { + this.traffic = traffic; + } } diff --git a/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/service/impl/BrokerRestUrlMappingServiceImpl.java b/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/service/impl/BrokerRestUrlMappingServiceImpl.java index f9457d63f..9e3cb7d51 100644 --- a/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/service/impl/BrokerRestUrlMappingServiceImpl.java +++ b/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/service/impl/BrokerRestUrlMappingServiceImpl.java @@ -61,6 +61,8 @@ public class BrokerRestUrlMappingServiceImpl implements BrokerRestUrlMappingServ private String appConnectionDetailPath = "/monitor/connections/detail"; // /manage/topic/:topic/partitionGroup/:partitionGroup/store/metric private String partitiongroupIndexPath="/manage/topic/%s/partitionGroup/%s/store/metric"; + private String consumerInfosPath = "/monitor/consumers?page=%s&pageSize=%s"; + private String producerInfosPath = "/monitor/producers?page=%s&pageSize=%s"; /** offset management*/ private String removeProducersConnectionsPath= "/manage/topic/%s/app/%s/producers"; diff --git a/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/service/impl/BrokerTopicMonitorServiceImpl.java b/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/service/impl/BrokerTopicMonitorServiceImpl.java index e60183b53..8307ea529 100644 --- a/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/service/impl/BrokerTopicMonitorServiceImpl.java +++ b/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/service/impl/BrokerTopicMonitorServiceImpl.java @@ -15,6 +15,7 @@ */ package org.joyqueue.service.impl; +import com.alibaba.fastjson.JSONObject; import org.joyqueue.convert.CodeConverter; import org.joyqueue.manage.PartitionGroupMetric; import org.joyqueue.model.PageResult; @@ -46,6 +47,7 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.stream.Collectors; @@ -135,35 +137,15 @@ public PageResult queryClientConnectionDetail(QPageQuery qPage */ @Override public PageResult queryTopicsMointor(QPageQuery qPageQuery) { - - PageResult pageResult = new PageResult<>(); try { Pagination pagination = qPageQuery.getPagination(); QMonitor qMonitor = qPageQuery.getQuery(); Broker broker = brokerService.findById(Integer.valueOf(String.valueOf(qMonitor.getBrokerId()))); - List toplicList = queryTopicList(broker); - pagination.setTotalRecord(toplicList.size()); - int fromIndx = pagination.getStart() + pagination.getSize(); - if (fromIndx > pagination.getTotalRecord()) { - fromIndx = pagination.getTotalRecord(); - } - - List brokerTopicMonitors = new ArrayList<>(pagination.getSize()); - - for (String topic : toplicList.subList(pagination.getStart(), fromIndx)) { - List appList = getAppByTopic(qMonitor.getType(), topic); - - BrokerTopicMonitor brokerTopicMonitor = getMonitorByAppAndTopic(topic, appList, broker, qMonitor.getType()); - if (brokerTopicMonitor.getBrokerTopicMonitorRecordList().size() > 0) { - brokerTopicMonitors.add(brokerTopicMonitor); - } - } - pageResult.setPagination(pagination); - pageResult.setResult(brokerTopicMonitors); + return getMonitorByBrokerPage(broker, qMonitor.getType(), pagination.getPage(), pagination.getSize()); } catch (Exception e) { logger.error("queryTopicsMointor exception", e); } - return pageResult; + return new PageResult<>(); } /** @@ -222,6 +204,7 @@ private BrokerTopicMonitor getMonitorByAppAndTopic(String topic, List ap brokerTopicMonitorRecord.setConnections(consumerMonitorInfo.getConnections()); brokerTopicMonitorRecord.setCount(consumerMonitorInfo.getDeQueue().getCount()); brokerTopicMonitorRecord.setTotalSize(consumerMonitorInfo.getDeQueue().getTotalSize()); + brokerTopicMonitorRecord.setTraffic(consumerMonitorInfo.getDeQueue().getTraffic()); } } else if (type == SubscribeType.PRODUCER) { ProducerMonitorInfo producerMonitorInfo = null; @@ -234,17 +217,86 @@ private BrokerTopicMonitor getMonitorByAppAndTopic(String topic, List ap brokerTopicMonitorRecord.setConnections(producerMonitorInfo.getConnections()); brokerTopicMonitorRecord.setCount(producerMonitorInfo.getEnQueue().getCount()); brokerTopicMonitorRecord.setTotalSize(producerMonitorInfo.getEnQueue().getTotalSize()); + brokerTopicMonitorRecord.setTraffic(producerMonitorInfo.getEnQueue().getTraffic()); brokerTopicMonitorRecord.setTps(producerMonitorInfo.getEnQueue().getTps()); } } brokerTopicMonitorRecord.setApp(app); brokerMonitorRecordList.add(brokerTopicMonitorRecord); } + brokerMonitorRecordList.sort(Comparator.comparingLong(BrokerTopicMonitorRecord::getBacklog)); brokerTopicMonitor.setBrokerTopicMonitorRecordList(brokerMonitorRecordList); brokerTopicMonitor.setTopic(topic); return brokerTopicMonitor; } + private PageResult getMonitorByBrokerPage(Broker broker, SubscribeType type, int page, int pageSize) throws Exception { + Pagination pagination = new Pagination(); + pagination.setPage(page); + pagination.setSize(pageSize); + PageResult pageResult = new PageResult<>(); + List brokerTopicMonitors = new ArrayList<>(); + if (type == SubscribeType.CONSUMER) { + JSONObject map = queryMonitorConsumers(broker, page, pageSize); + pagination.setTotalRecord(Integer.parseInt(map.getOrDefault("total", 0).toString())); + List consumerMonitorInfos = map.getJSONArray("data").toJavaList(ConsumerMonitorInfo.class); + for (ConsumerMonitorInfo consumerMonitorInfo: consumerMonitorInfos) { + BrokerTopicMonitor brokerTopicMonitor = new BrokerTopicMonitor(); + BrokerTopicMonitorRecord brokerTopicMonitorRecord = new BrokerTopicMonitorRecord(); + if (consumerMonitorInfo.getRetry() != null) { + brokerTopicMonitorRecord.setRetryCount(consumerMonitorInfo.getRetry().getCount()); + brokerTopicMonitorRecord.setRetryTps(consumerMonitorInfo.getRetry().getCurrent()); + } + if (consumerMonitorInfo.getPending() != null) { + brokerTopicMonitorRecord.setBacklog(consumerMonitorInfo.getPending().getCount()); + } + brokerTopicMonitorRecord.setConnections(consumerMonitorInfo.getConnections()); + brokerTopicMonitorRecord.setCount(consumerMonitorInfo.getDeQueue().getCount()); + brokerTopicMonitorRecord.setTotalSize(consumerMonitorInfo.getDeQueue().getTotalSize()); + brokerTopicMonitorRecord.setTraffic(consumerMonitorInfo.getDeQueue().getTraffic()); + brokerTopicMonitorRecord.setApp(consumerMonitorInfo.getApp()); + List brokerMonitorRecordList = new ArrayList<>(); + brokerMonitorRecordList.add(brokerTopicMonitorRecord); + brokerTopicMonitor.setBrokerTopicMonitorRecordList(brokerMonitorRecordList); + brokerTopicMonitor.setTopic(consumerMonitorInfo.getTopic()); + brokerTopicMonitors.add(brokerTopicMonitor); + } + brokerTopicMonitors.sort((o1, o2) -> { + BrokerTopicMonitorRecord brokerTopicMonitorRecord1 = o1.getBrokerTopicMonitorRecordList().get(0); + BrokerTopicMonitorRecord brokerTopicMonitorRecord2 = o2.getBrokerTopicMonitorRecordList().get(0); + return Long.compare(brokerTopicMonitorRecord2.getBacklog(), brokerTopicMonitorRecord1.getBacklog()); + }); + } else if (type == SubscribeType.PRODUCER) { + JSONObject map = queryMonitorProducers(broker, page, pageSize); + pagination.setTotalRecord(Integer.parseInt(map.getOrDefault("total", 0).toString())); + List producerMonitorInfos = map.getJSONArray("data").toJavaList(ProducerMonitorInfo.class); + for (ProducerMonitorInfo producerMonitorInfo: producerMonitorInfos) { + BrokerTopicMonitor brokerTopicMonitor = new BrokerTopicMonitor(); + BrokerTopicMonitorRecord brokerTopicMonitorRecord = new BrokerTopicMonitorRecord(); + brokerTopicMonitorRecord.setConnections(producerMonitorInfo.getConnections()); + brokerTopicMonitorRecord.setCount(producerMonitorInfo.getEnQueue().getCount()); + brokerTopicMonitorRecord.setTotalSize(producerMonitorInfo.getEnQueue().getTotalSize()); + brokerTopicMonitorRecord.setTraffic(producerMonitorInfo.getEnQueue().getTraffic()); + brokerTopicMonitorRecord.setTps(producerMonitorInfo.getEnQueue().getTps()); + brokerTopicMonitorRecord.setApp(producerMonitorInfo.getApp()); + List brokerMonitorRecordList = new ArrayList<>(); + brokerMonitorRecordList.add(brokerTopicMonitorRecord); + brokerTopicMonitor.setBrokerTopicMonitorRecordList(brokerMonitorRecordList); + brokerTopicMonitor.setTopic(producerMonitorInfo.getTopic()); + brokerTopicMonitors.add(brokerTopicMonitor); + } + brokerTopicMonitors.sort((o1, o2) -> { + BrokerTopicMonitorRecord brokerTopicMonitorRecord1 = o1.getBrokerTopicMonitorRecordList().get(0); + BrokerTopicMonitorRecord brokerTopicMonitorRecord2 = o2.getBrokerTopicMonitorRecordList().get(0); + return Long.compare(brokerTopicMonitorRecord2.getCount(), brokerTopicMonitorRecord1.getCount()); + }); + } + pageResult.setResult(brokerTopicMonitors); + pageResult.setPagination(pagination); + return pageResult; + } + + private List getAppByTopic(SubscribeType subscribeType, String topic) throws Exception { if (subscribeType == SubscribeType.CONSUMER) { List consumerList; @@ -283,6 +335,34 @@ private List getPartitionGroup(String topic, Broker broker return null; } + private JSONObject queryMonitorConsumers(Broker broker, int page, int pageSize) { + String path = "consumerInfos"; + String[] args = new String[4]; + args[0] = broker.getIp(); + args[1] = String.valueOf(broker.getMonitorPort()); + args[2] = String.valueOf(page); + args[3] = String.valueOf(pageSize); + RestResponse restResponse = httpRestService.get(path, JSONObject.class, false, args); + if (restResponse != null && restResponse.getData() != null) { + return restResponse.getData(); + } + return new JSONObject(); + } + + private JSONObject queryMonitorProducers(Broker broker, int page, int pageSize) { + String path = "producerInfos"; + String[] args = new String[4]; + args[0] = broker.getIp(); + args[1] = String.valueOf(broker.getMonitorPort()); + args[2] = String.valueOf(page); + args[3] = String.valueOf(pageSize); + RestResponse restResponse = httpRestService.get(path, JSONObject.class, false, args); + if (restResponse != null && restResponse.getData() != null) { + return restResponse.getData(); + } + return new JSONObject(); + } + /** * 查询消费者详情 * @return diff --git a/joyqueue-console/joyqueue-portal/src/views/setting/brokerMonitor.vue b/joyqueue-console/joyqueue-portal/src/views/setting/brokerMonitor.vue index bbd542d28..8b5a02d82 100644 --- a/joyqueue-console/joyqueue-portal/src/views/setting/brokerMonitor.vue +++ b/joyqueue-console/joyqueue-portal/src/views/setting/brokerMonitor.vue @@ -19,17 +19,23 @@ 刷新 - + 加载更多 + + 刷新 - + 加载更多 + + @@ -120,6 +126,9 @@ export default { }, searchRules: { }, + appIdMap: new Map(), + curIndex: 0, + cacheList: [], tableData: { rowData: [], colData: [ @@ -247,7 +256,7 @@ export default { for (let i = 0; i < list.length; i++) { let p = h('div', '') if (list[i].id) { - p = h('router-link', { + p = h('router-link', { style: { 'text-decoration': 'underline', color: 'dodgerblue' @@ -313,6 +322,23 @@ export default { return h('div', html) } }, + { + title: '入队实时流量', + key: 'brokerTopicMonitorRecordList', + render: (h, params) => { + let list = params.item.brokerTopicMonitorRecordList + let html = [] + if (list !== undefined) { + for (let i = 0; i < list.length; i++) { + if (list[i].traffic !== undefined) { + let p = h('div', bytesToSize(list[i].traffic, 2, false)) + html.push(p) + } + } + } + return h('div', html) + } + }, { title: '入队TPS', key: 'brokerTopicMonitorRecordList', @@ -362,7 +388,7 @@ export default { for (let i = 0; i < list.length; i++) { let p = h('div', '') if (list[i].id) { - p = h('router-link', { + p = h('router-link', { style: { 'text-decoration': 'underline', color: 'dodgerblue' @@ -428,6 +454,23 @@ export default { return h('div', html) } }, + { + title: '出队实时流量', + key: 'brokerTopicMonitorRecordList', + render: (h, params) => { + let list = params.item.brokerTopicMonitorRecordList + let html = [] + if (list !== undefined) { + for (let i = 0; i < list.length; i++) { + if (list[i].traffic !== undefined) { + let p = h('div', bytesToSize(list[i].traffic, 2, false)) + html.push(p) + } + } + } + return h('div', html) + } + }, { title: '出队TPS', key: 'brokerTopicMonitorRecordList', @@ -515,6 +558,21 @@ export default { this.$refs[item.name].getList() } }, + getSearchVal () { + let obj = { + pagination: { + page: 1, + size: 10000 + }, + query: {} + } + for (let key in this.searchData) { + if (this.searchData.hasOwnProperty(key)) { + obj.query[key] = this.searchData[key] + } + } + return obj + }, getList () { this.showTablePin = true let data = this.getSearchVal() @@ -526,10 +584,15 @@ export default { data.pagination = data.pagination || { totalRecord: data.data.length } - this.page.total = data.pagination.totalRecord - this.page.page = data.pagination.page - this.page.size = data.pagination.size - this.tableData.rowData = data.data + + if (data.data.length > this.page.size) { + this.tableData.rowData = data.data.slice(0, this.page.size) + this.curIndex = this.page.size - 1 + } else { + this.tableData.rowData = data.data + this.curIndex = data.data.length - 1 + } + this.cacheList = data.data for (let i in this.tableData.rowData) { if (this.tableData.rowData.hasOwnProperty(i)) { if (this.tableData.rowData[i].brokerTopicMonitorRecordList) { @@ -540,17 +603,64 @@ export default { if (idx !== -1) { app = app.substring(0, idx) } + if (this.appIdMap.has(app)) { + this.tableData.rowData[i].brokerTopicMonitorRecordList[j].id = this.appIdMap.get(app) + this.$set(this.tableData.rowData, i, this.tableData.rowData[i]) + } else { + apiRequest.get('/application/getByCode/' + app, {}).then((data) => { + this.tableData.rowData[i].brokerTopicMonitorRecordList[j].id = data.data.id + this.$set(this.tableData.rowData, i, this.tableData.rowData[i]) + this.appIdMap.set(app, data.data.id) + }) + } + } + } + } + } + } + this.showTablePin = false + }) + }, + // 滚动事件触发下拉加载 + getRestList () { + let index = this.curIndex + if (this.curIndex < this.cacheList.length - 1) { + for (let i = 0; i < this.page.size; i++) { + if (this.curIndex < this.cacheList.length - 1) { + this.curIndex += 1 + if (!this.tableData.rowData.includes(this.cacheList[this.curIndex])) { + this.tableData.rowData.push(this.cacheList[this.curIndex]) + } + } else { + break + } + } + } + for (let i = index; i < this.tableData.rowData.length; i++) { + if (this.tableData.rowData.hasOwnProperty(i)) { + if (this.tableData.rowData[i].brokerTopicMonitorRecordList) { + for (let j in this.tableData.rowData[i].brokerTopicMonitorRecordList) { + if (this.tableData.rowData[i].brokerTopicMonitorRecordList.hasOwnProperty(j)) { + let app = this.tableData.rowData[i].brokerTopicMonitorRecordList[j].app + let idx = app.indexOf('.') + if (idx !== -1) { + app = app.substring(0, idx) + } + if (this.appIdMap.has(app)) { + this.tableData.rowData[i].brokerTopicMonitorRecordList[j].id = this.appIdMap.get(app) + this.$set(this.tableData.rowData, i, this.tableData.rowData[i]) + } else { apiRequest.get('/application/getByCode/' + app, {}).then((data) => { this.tableData.rowData[i].brokerTopicMonitorRecordList[j].id = data.data.id this.$set(this.tableData.rowData, i, this.tableData.rowData[i]) + this.appIdMap.set(app, data.data.id) }) } } } } } - this.showTablePin = false - }) + } }, }, mounted () { diff --git a/joyqueue-console/joyqueue-portal/src/views/topic/index.vue b/joyqueue-console/joyqueue-portal/src/views/topic/index.vue index 97ea8a006..5378c85f7 100644 --- a/joyqueue-console/joyqueue-portal/src/views/topic/index.vue +++ b/joyqueue-console/joyqueue-portal/src/views/topic/index.vue @@ -246,7 +246,11 @@ export default { this.topic.policy = {} for (let policy in this.policies) { if (this.policies.hasOwnProperty(policy)) { - this.topic.policy[this.policies[policy].key] = this.policies[policy].value + if (this.policies[policy].key === 'storeMaxTime') { + this.topic.policy[this.policies[policy].key] = this.policies[policy].value * (1000 * 60 * 60) + } else { + this.topic.policy[this.policies[policy].key] = this.policies[policy].value + } } } apiRequest.put(this.urlOrigin.edit + '/' + encodeURIComponent(this.topic.id), {}, this.topic).then((data) => { @@ -274,7 +278,7 @@ export default { } this.policies.push({ key: '存储最长时间', - value: item.policy.storeMaxTime + value: item.policy.storeMaxTime / (1000 * 60 * 60) }) this.policies.push({ key: '保留未消费数据',