From d752dc5976d4bee5ef946b6f53bb2309b2b7c1f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B1=9F=E6=A5=A0?= Date: Mon, 7 Sep 2020 18:10:26 +0800 Subject: [PATCH 01/14] =?UTF-8?q?broker=E7=AE=A1=E7=90=86=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../domain/BrokerTopicMonitorRecord.java | 9 ++ .../impl/BrokerRestUrlMappingServiceImpl.java | 2 + .../impl/BrokerTopicMonitorServiceImpl.java | 124 ++++++++++++++--- .../src/views/setting/brokerMonitor.vue | 130 ++++++++++++++++-- .../joyqueue-portal/src/views/topic/index.vue | 8 +- 5 files changed, 239 insertions(+), 34 deletions(-) diff --git a/joyqueue-console/joyqueue-data/joyqueue-data-model/src/main/java/org/joyqueue/model/domain/BrokerTopicMonitorRecord.java b/joyqueue-console/joyqueue-data/joyqueue-data-model/src/main/java/org/joyqueue/model/domain/BrokerTopicMonitorRecord.java index b464789d2..c08e85782 100644 --- a/joyqueue-console/joyqueue-data/joyqueue-data-model/src/main/java/org/joyqueue/model/domain/BrokerTopicMonitorRecord.java +++ b/joyqueue-console/joyqueue-data/joyqueue-data-model/src/main/java/org/joyqueue/model/domain/BrokerTopicMonitorRecord.java @@ -29,6 +29,7 @@ public class BrokerTopicMonitorRecord { private long retryCount; private long retryTps; private long backlog; + private long traffic; public String getApp() { return app; @@ -101,4 +102,12 @@ public long getTps() { public void setTps(long tps) { this.tps = tps; } + + public long getTraffic() { + return traffic; + } + + public void setTraffic(long traffic) { + this.traffic = traffic; + } } diff --git a/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/service/impl/BrokerRestUrlMappingServiceImpl.java b/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/service/impl/BrokerRestUrlMappingServiceImpl.java index f9457d63f..9e3cb7d51 100644 --- a/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/service/impl/BrokerRestUrlMappingServiceImpl.java +++ b/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/service/impl/BrokerRestUrlMappingServiceImpl.java @@ -61,6 +61,8 @@ public class BrokerRestUrlMappingServiceImpl implements BrokerRestUrlMappingServ private String appConnectionDetailPath = "/monitor/connections/detail"; // /manage/topic/:topic/partitionGroup/:partitionGroup/store/metric private String partitiongroupIndexPath="/manage/topic/%s/partitionGroup/%s/store/metric"; + private String consumerInfosPath = "/monitor/consumers?page=%s&pageSize=%s"; + private String producerInfosPath = "/monitor/producers?page=%s&pageSize=%s"; /** offset management*/ private String removeProducersConnectionsPath= "/manage/topic/%s/app/%s/producers"; diff --git a/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/service/impl/BrokerTopicMonitorServiceImpl.java b/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/service/impl/BrokerTopicMonitorServiceImpl.java index e60183b53..8307ea529 100644 --- a/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/service/impl/BrokerTopicMonitorServiceImpl.java +++ b/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/service/impl/BrokerTopicMonitorServiceImpl.java @@ -15,6 +15,7 @@ */ package org.joyqueue.service.impl; +import com.alibaba.fastjson.JSONObject; import org.joyqueue.convert.CodeConverter; import org.joyqueue.manage.PartitionGroupMetric; import org.joyqueue.model.PageResult; @@ -46,6 +47,7 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.stream.Collectors; @@ -135,35 +137,15 @@ public PageResult queryClientConnectionDetail(QPageQuery qPage */ @Override public PageResult queryTopicsMointor(QPageQuery qPageQuery) { - - PageResult pageResult = new PageResult<>(); try { Pagination pagination = qPageQuery.getPagination(); QMonitor qMonitor = qPageQuery.getQuery(); Broker broker = brokerService.findById(Integer.valueOf(String.valueOf(qMonitor.getBrokerId()))); - List toplicList = queryTopicList(broker); - pagination.setTotalRecord(toplicList.size()); - int fromIndx = pagination.getStart() + pagination.getSize(); - if (fromIndx > pagination.getTotalRecord()) { - fromIndx = pagination.getTotalRecord(); - } - - List brokerTopicMonitors = new ArrayList<>(pagination.getSize()); - - for (String topic : toplicList.subList(pagination.getStart(), fromIndx)) { - List appList = getAppByTopic(qMonitor.getType(), topic); - - BrokerTopicMonitor brokerTopicMonitor = getMonitorByAppAndTopic(topic, appList, broker, qMonitor.getType()); - if (brokerTopicMonitor.getBrokerTopicMonitorRecordList().size() > 0) { - brokerTopicMonitors.add(brokerTopicMonitor); - } - } - pageResult.setPagination(pagination); - pageResult.setResult(brokerTopicMonitors); + return getMonitorByBrokerPage(broker, qMonitor.getType(), pagination.getPage(), pagination.getSize()); } catch (Exception e) { logger.error("queryTopicsMointor exception", e); } - return pageResult; + return new PageResult<>(); } /** @@ -222,6 +204,7 @@ private BrokerTopicMonitor getMonitorByAppAndTopic(String topic, List ap brokerTopicMonitorRecord.setConnections(consumerMonitorInfo.getConnections()); brokerTopicMonitorRecord.setCount(consumerMonitorInfo.getDeQueue().getCount()); brokerTopicMonitorRecord.setTotalSize(consumerMonitorInfo.getDeQueue().getTotalSize()); + brokerTopicMonitorRecord.setTraffic(consumerMonitorInfo.getDeQueue().getTraffic()); } } else if (type == SubscribeType.PRODUCER) { ProducerMonitorInfo producerMonitorInfo = null; @@ -234,17 +217,86 @@ private BrokerTopicMonitor getMonitorByAppAndTopic(String topic, List ap brokerTopicMonitorRecord.setConnections(producerMonitorInfo.getConnections()); brokerTopicMonitorRecord.setCount(producerMonitorInfo.getEnQueue().getCount()); brokerTopicMonitorRecord.setTotalSize(producerMonitorInfo.getEnQueue().getTotalSize()); + brokerTopicMonitorRecord.setTraffic(producerMonitorInfo.getEnQueue().getTraffic()); brokerTopicMonitorRecord.setTps(producerMonitorInfo.getEnQueue().getTps()); } } brokerTopicMonitorRecord.setApp(app); brokerMonitorRecordList.add(brokerTopicMonitorRecord); } + brokerMonitorRecordList.sort(Comparator.comparingLong(BrokerTopicMonitorRecord::getBacklog)); brokerTopicMonitor.setBrokerTopicMonitorRecordList(brokerMonitorRecordList); brokerTopicMonitor.setTopic(topic); return brokerTopicMonitor; } + private PageResult getMonitorByBrokerPage(Broker broker, SubscribeType type, int page, int pageSize) throws Exception { + Pagination pagination = new Pagination(); + pagination.setPage(page); + pagination.setSize(pageSize); + PageResult pageResult = new PageResult<>(); + List brokerTopicMonitors = new ArrayList<>(); + if (type == SubscribeType.CONSUMER) { + JSONObject map = queryMonitorConsumers(broker, page, pageSize); + pagination.setTotalRecord(Integer.parseInt(map.getOrDefault("total", 0).toString())); + List consumerMonitorInfos = map.getJSONArray("data").toJavaList(ConsumerMonitorInfo.class); + for (ConsumerMonitorInfo consumerMonitorInfo: consumerMonitorInfos) { + BrokerTopicMonitor brokerTopicMonitor = new BrokerTopicMonitor(); + BrokerTopicMonitorRecord brokerTopicMonitorRecord = new BrokerTopicMonitorRecord(); + if (consumerMonitorInfo.getRetry() != null) { + brokerTopicMonitorRecord.setRetryCount(consumerMonitorInfo.getRetry().getCount()); + brokerTopicMonitorRecord.setRetryTps(consumerMonitorInfo.getRetry().getCurrent()); + } + if (consumerMonitorInfo.getPending() != null) { + brokerTopicMonitorRecord.setBacklog(consumerMonitorInfo.getPending().getCount()); + } + brokerTopicMonitorRecord.setConnections(consumerMonitorInfo.getConnections()); + brokerTopicMonitorRecord.setCount(consumerMonitorInfo.getDeQueue().getCount()); + brokerTopicMonitorRecord.setTotalSize(consumerMonitorInfo.getDeQueue().getTotalSize()); + brokerTopicMonitorRecord.setTraffic(consumerMonitorInfo.getDeQueue().getTraffic()); + brokerTopicMonitorRecord.setApp(consumerMonitorInfo.getApp()); + List brokerMonitorRecordList = new ArrayList<>(); + brokerMonitorRecordList.add(brokerTopicMonitorRecord); + brokerTopicMonitor.setBrokerTopicMonitorRecordList(brokerMonitorRecordList); + brokerTopicMonitor.setTopic(consumerMonitorInfo.getTopic()); + brokerTopicMonitors.add(brokerTopicMonitor); + } + brokerTopicMonitors.sort((o1, o2) -> { + BrokerTopicMonitorRecord brokerTopicMonitorRecord1 = o1.getBrokerTopicMonitorRecordList().get(0); + BrokerTopicMonitorRecord brokerTopicMonitorRecord2 = o2.getBrokerTopicMonitorRecordList().get(0); + return Long.compare(brokerTopicMonitorRecord2.getBacklog(), brokerTopicMonitorRecord1.getBacklog()); + }); + } else if (type == SubscribeType.PRODUCER) { + JSONObject map = queryMonitorProducers(broker, page, pageSize); + pagination.setTotalRecord(Integer.parseInt(map.getOrDefault("total", 0).toString())); + List producerMonitorInfos = map.getJSONArray("data").toJavaList(ProducerMonitorInfo.class); + for (ProducerMonitorInfo producerMonitorInfo: producerMonitorInfos) { + BrokerTopicMonitor brokerTopicMonitor = new BrokerTopicMonitor(); + BrokerTopicMonitorRecord brokerTopicMonitorRecord = new BrokerTopicMonitorRecord(); + brokerTopicMonitorRecord.setConnections(producerMonitorInfo.getConnections()); + brokerTopicMonitorRecord.setCount(producerMonitorInfo.getEnQueue().getCount()); + brokerTopicMonitorRecord.setTotalSize(producerMonitorInfo.getEnQueue().getTotalSize()); + brokerTopicMonitorRecord.setTraffic(producerMonitorInfo.getEnQueue().getTraffic()); + brokerTopicMonitorRecord.setTps(producerMonitorInfo.getEnQueue().getTps()); + brokerTopicMonitorRecord.setApp(producerMonitorInfo.getApp()); + List brokerMonitorRecordList = new ArrayList<>(); + brokerMonitorRecordList.add(brokerTopicMonitorRecord); + brokerTopicMonitor.setBrokerTopicMonitorRecordList(brokerMonitorRecordList); + brokerTopicMonitor.setTopic(producerMonitorInfo.getTopic()); + brokerTopicMonitors.add(brokerTopicMonitor); + } + brokerTopicMonitors.sort((o1, o2) -> { + BrokerTopicMonitorRecord brokerTopicMonitorRecord1 = o1.getBrokerTopicMonitorRecordList().get(0); + BrokerTopicMonitorRecord brokerTopicMonitorRecord2 = o2.getBrokerTopicMonitorRecordList().get(0); + return Long.compare(brokerTopicMonitorRecord2.getCount(), brokerTopicMonitorRecord1.getCount()); + }); + } + pageResult.setResult(brokerTopicMonitors); + pageResult.setPagination(pagination); + return pageResult; + } + + private List getAppByTopic(SubscribeType subscribeType, String topic) throws Exception { if (subscribeType == SubscribeType.CONSUMER) { List consumerList; @@ -283,6 +335,34 @@ private List getPartitionGroup(String topic, Broker broker return null; } + private JSONObject queryMonitorConsumers(Broker broker, int page, int pageSize) { + String path = "consumerInfos"; + String[] args = new String[4]; + args[0] = broker.getIp(); + args[1] = String.valueOf(broker.getMonitorPort()); + args[2] = String.valueOf(page); + args[3] = String.valueOf(pageSize); + RestResponse restResponse = httpRestService.get(path, JSONObject.class, false, args); + if (restResponse != null && restResponse.getData() != null) { + return restResponse.getData(); + } + return new JSONObject(); + } + + private JSONObject queryMonitorProducers(Broker broker, int page, int pageSize) { + String path = "producerInfos"; + String[] args = new String[4]; + args[0] = broker.getIp(); + args[1] = String.valueOf(broker.getMonitorPort()); + args[2] = String.valueOf(page); + args[3] = String.valueOf(pageSize); + RestResponse restResponse = httpRestService.get(path, JSONObject.class, false, args); + if (restResponse != null && restResponse.getData() != null) { + return restResponse.getData(); + } + return new JSONObject(); + } + /** * 查询消费者详情 * @return diff --git a/joyqueue-console/joyqueue-portal/src/views/setting/brokerMonitor.vue b/joyqueue-console/joyqueue-portal/src/views/setting/brokerMonitor.vue index bbd542d28..8b5a02d82 100644 --- a/joyqueue-console/joyqueue-portal/src/views/setting/brokerMonitor.vue +++ b/joyqueue-console/joyqueue-portal/src/views/setting/brokerMonitor.vue @@ -19,17 +19,23 @@ 刷新 - + 加载更多 + + 刷新 - + 加载更多 + + @@ -120,6 +126,9 @@ export default { }, searchRules: { }, + appIdMap: new Map(), + curIndex: 0, + cacheList: [], tableData: { rowData: [], colData: [ @@ -247,7 +256,7 @@ export default { for (let i = 0; i < list.length; i++) { let p = h('div', '') if (list[i].id) { - p = h('router-link', { + p = h('router-link', { style: { 'text-decoration': 'underline', color: 'dodgerblue' @@ -313,6 +322,23 @@ export default { return h('div', html) } }, + { + title: '入队实时流量', + key: 'brokerTopicMonitorRecordList', + render: (h, params) => { + let list = params.item.brokerTopicMonitorRecordList + let html = [] + if (list !== undefined) { + for (let i = 0; i < list.length; i++) { + if (list[i].traffic !== undefined) { + let p = h('div', bytesToSize(list[i].traffic, 2, false)) + html.push(p) + } + } + } + return h('div', html) + } + }, { title: '入队TPS', key: 'brokerTopicMonitorRecordList', @@ -362,7 +388,7 @@ export default { for (let i = 0; i < list.length; i++) { let p = h('div', '') if (list[i].id) { - p = h('router-link', { + p = h('router-link', { style: { 'text-decoration': 'underline', color: 'dodgerblue' @@ -428,6 +454,23 @@ export default { return h('div', html) } }, + { + title: '出队实时流量', + key: 'brokerTopicMonitorRecordList', + render: (h, params) => { + let list = params.item.brokerTopicMonitorRecordList + let html = [] + if (list !== undefined) { + for (let i = 0; i < list.length; i++) { + if (list[i].traffic !== undefined) { + let p = h('div', bytesToSize(list[i].traffic, 2, false)) + html.push(p) + } + } + } + return h('div', html) + } + }, { title: '出队TPS', key: 'brokerTopicMonitorRecordList', @@ -515,6 +558,21 @@ export default { this.$refs[item.name].getList() } }, + getSearchVal () { + let obj = { + pagination: { + page: 1, + size: 10000 + }, + query: {} + } + for (let key in this.searchData) { + if (this.searchData.hasOwnProperty(key)) { + obj.query[key] = this.searchData[key] + } + } + return obj + }, getList () { this.showTablePin = true let data = this.getSearchVal() @@ -526,10 +584,15 @@ export default { data.pagination = data.pagination || { totalRecord: data.data.length } - this.page.total = data.pagination.totalRecord - this.page.page = data.pagination.page - this.page.size = data.pagination.size - this.tableData.rowData = data.data + + if (data.data.length > this.page.size) { + this.tableData.rowData = data.data.slice(0, this.page.size) + this.curIndex = this.page.size - 1 + } else { + this.tableData.rowData = data.data + this.curIndex = data.data.length - 1 + } + this.cacheList = data.data for (let i in this.tableData.rowData) { if (this.tableData.rowData.hasOwnProperty(i)) { if (this.tableData.rowData[i].brokerTopicMonitorRecordList) { @@ -540,17 +603,64 @@ export default { if (idx !== -1) { app = app.substring(0, idx) } + if (this.appIdMap.has(app)) { + this.tableData.rowData[i].brokerTopicMonitorRecordList[j].id = this.appIdMap.get(app) + this.$set(this.tableData.rowData, i, this.tableData.rowData[i]) + } else { + apiRequest.get('/application/getByCode/' + app, {}).then((data) => { + this.tableData.rowData[i].brokerTopicMonitorRecordList[j].id = data.data.id + this.$set(this.tableData.rowData, i, this.tableData.rowData[i]) + this.appIdMap.set(app, data.data.id) + }) + } + } + } + } + } + } + this.showTablePin = false + }) + }, + // 滚动事件触发下拉加载 + getRestList () { + let index = this.curIndex + if (this.curIndex < this.cacheList.length - 1) { + for (let i = 0; i < this.page.size; i++) { + if (this.curIndex < this.cacheList.length - 1) { + this.curIndex += 1 + if (!this.tableData.rowData.includes(this.cacheList[this.curIndex])) { + this.tableData.rowData.push(this.cacheList[this.curIndex]) + } + } else { + break + } + } + } + for (let i = index; i < this.tableData.rowData.length; i++) { + if (this.tableData.rowData.hasOwnProperty(i)) { + if (this.tableData.rowData[i].brokerTopicMonitorRecordList) { + for (let j in this.tableData.rowData[i].brokerTopicMonitorRecordList) { + if (this.tableData.rowData[i].brokerTopicMonitorRecordList.hasOwnProperty(j)) { + let app = this.tableData.rowData[i].brokerTopicMonitorRecordList[j].app + let idx = app.indexOf('.') + if (idx !== -1) { + app = app.substring(0, idx) + } + if (this.appIdMap.has(app)) { + this.tableData.rowData[i].brokerTopicMonitorRecordList[j].id = this.appIdMap.get(app) + this.$set(this.tableData.rowData, i, this.tableData.rowData[i]) + } else { apiRequest.get('/application/getByCode/' + app, {}).then((data) => { this.tableData.rowData[i].brokerTopicMonitorRecordList[j].id = data.data.id this.$set(this.tableData.rowData, i, this.tableData.rowData[i]) + this.appIdMap.set(app, data.data.id) }) } } } } } - this.showTablePin = false - }) + } }, }, mounted () { diff --git a/joyqueue-console/joyqueue-portal/src/views/topic/index.vue b/joyqueue-console/joyqueue-portal/src/views/topic/index.vue index 97ea8a006..5378c85f7 100644 --- a/joyqueue-console/joyqueue-portal/src/views/topic/index.vue +++ b/joyqueue-console/joyqueue-portal/src/views/topic/index.vue @@ -246,7 +246,11 @@ export default { this.topic.policy = {} for (let policy in this.policies) { if (this.policies.hasOwnProperty(policy)) { - this.topic.policy[this.policies[policy].key] = this.policies[policy].value + if (this.policies[policy].key === 'storeMaxTime') { + this.topic.policy[this.policies[policy].key] = this.policies[policy].value * (1000 * 60 * 60) + } else { + this.topic.policy[this.policies[policy].key] = this.policies[policy].value + } } } apiRequest.put(this.urlOrigin.edit + '/' + encodeURIComponent(this.topic.id), {}, this.topic).then((data) => { @@ -274,7 +278,7 @@ export default { } this.policies.push({ key: '存储最长时间', - value: item.policy.storeMaxTime + value: item.policy.storeMaxTime / (1000 * 60 * 60) }) this.policies.push({ key: '保留未消费数据', From ea207ed5abd4cc6da467f5a9226cd305041cc99c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B1=9F=E6=A5=A0?= Date: Tue, 15 Sep 2020 11:09:39 +0800 Subject: [PATCH 02/14] =?UTF-8?q?openapi=E6=B7=BB=E5=8A=A0=E6=B6=88?= =?UTF-8?q?=E8=B4=B9=E8=80=85=E7=9B=91=E6=8E=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/org/joyqueue/api/Impl/OpenAPIServiceImpl.java | 5 +++++ .../src/main/java/org/joyqueue/api/OpenAPIService.java | 2 ++ 2 files changed, 7 insertions(+) diff --git a/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/api/Impl/OpenAPIServiceImpl.java b/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/api/Impl/OpenAPIServiceImpl.java index 79fb0e64a..fdc61f77d 100644 --- a/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/api/Impl/OpenAPIServiceImpl.java +++ b/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/api/Impl/OpenAPIServiceImpl.java @@ -495,6 +495,11 @@ public List tokens(String app) { } + @Override + public BrokerMonitorRecord brokerMonitor(Subscribe subscribe) { + return brokerMonitorService.find(subscribe); + } + /** * * Check the subscription legal or not diff --git a/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/api/OpenAPIService.java b/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/api/OpenAPIService.java index 391ff0c01..20deba6e4 100644 --- a/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/api/OpenAPIService.java +++ b/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/api/OpenAPIService.java @@ -201,5 +201,7 @@ public interface OpenAPIService { **/ List tokens(String app); + BrokerMonitorRecord brokerMonitor(Subscribe subscribe); + } From d7f472bdad30eba11407d490aca42564818050be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B1=9F=E6=A5=A0?= Date: Wed, 16 Sep 2020 12:47:46 +0800 Subject: [PATCH 03/14] =?UTF-8?q?openapi=E6=B7=BB=E5=8A=A0=E6=B6=88?= =?UTF-8?q?=E8=B4=B9=E8=80=85=E7=9B=91=E6=8E=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/java/org/joyqueue/api/Impl/OpenAPIServiceImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/api/Impl/OpenAPIServiceImpl.java b/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/api/Impl/OpenAPIServiceImpl.java index fdc61f77d..2e821e4e2 100644 --- a/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/api/Impl/OpenAPIServiceImpl.java +++ b/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/api/Impl/OpenAPIServiceImpl.java @@ -497,7 +497,7 @@ public List tokens(String app) { @Override public BrokerMonitorRecord brokerMonitor(Subscribe subscribe) { - return brokerMonitorService.find(subscribe); + return brokerMonitorService.find(subscribe, true); } /** From 73e1c4f8d1666394969a490e75c9479f01d799e9 Mon Sep 17 00:00:00 2001 From: iamazy <1448588084@qq.com> Date: Fri, 25 Sep 2020 10:11:56 +0800 Subject: [PATCH 04/14] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dproducer=EF=BC=8Cconsum?= =?UTF-8?q?er=E9=85=8D=E7=BD=AE=E6=8E=A5=E5=8F=A3=E5=A4=9A=E5=A4=84?= =?UTF-8?q?=E8=B0=83=E7=94=A8=E8=A2=AB=E8=A6=86=E7=9B=96=E7=9A=84bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../joyqueue/model/domain/ConsumerConfig.java | 64 ++++++++-------- .../joyqueue/model/domain/ProducerConfig.java | 20 ++--- .../command/monitor/ConsumerCommand.java | 75 ++++++++++++++++++- .../ProducerConfigAddOrUpdateCommand.java | 49 +++++++++++- 4 files changed, 164 insertions(+), 44 deletions(-) diff --git a/joyqueue-console/joyqueue-data/joyqueue-data-model/src/main/java/org/joyqueue/model/domain/ConsumerConfig.java b/joyqueue-console/joyqueue-data/joyqueue-data-model/src/main/java/org/joyqueue/model/domain/ConsumerConfig.java index 56095ae28..250d197c8 100644 --- a/joyqueue-console/joyqueue-data/joyqueue-data-model/src/main/java/org/joyqueue/model/domain/ConsumerConfig.java +++ b/joyqueue-console/joyqueue-data/joyqueue-data-model/src/main/java/org/joyqueue/model/domain/ConsumerConfig.java @@ -30,28 +30,28 @@ public class ConsumerConfig extends BaseNsrModel { /** * 开启就近机房消费 **/ - private boolean nearBy; + private Boolean nearBy; /** * 是否暂停消费 **/ - private boolean paused; + private Boolean paused; /** * 是否需要归档,默认不归档 **/ - private boolean archive; + private Boolean archive; /** * 是否启用重试服务,默认开启 **/ - private boolean retry = true; + private Boolean retry = true; /** * 延迟时间,最大延迟1小时 */ @Max(3600000) - private int delay = 0; + private Integer delay = 0; /** * 偏移量管理类型 @@ -62,56 +62,56 @@ public class ConsumerConfig extends BaseNsrModel { * 应答超时时间,默认2min */ @Min(0) - private int ackTimeout = 120000; + private Integer ackTimeout = 120000; /** * 批量大小,默认10 **/ @Min(0) @Max(127) - private int batchSize = 10; + private Integer batchSize = 10; /** * 最大重试次数(无限制) **/ @Min(0) - private int maxRetrys; + private Integer maxRetrys; /** * 最大重试间隔(默认5分钟) **/ @Min(0) - private int maxRetryDelay; + private Integer maxRetryDelay; /** * 重试间隔 */ @Min(0) - private int retryDelay; + private Integer retryDelay; private String region; /** * 指数增加间隔时间 **/ - private boolean useExponentialBackOff = true; + private Boolean useExponentialBackOff = true; /** * 指数系数 */ @Min(0) - private double backOffMultiplier; + private Double backOffMultiplier; /** * 过期时间(默认3天) **/ @Min(0) - private int expireTime; + private Integer expireTime; /** * 单队列并行度 **/ - private int concurrent = 1; + private Integer concurrent = 1; /** * 黑名单 @@ -120,14 +120,14 @@ public class ConsumerConfig extends BaseNsrModel { private String filters; - private int limitTps; + private Integer limitTps; - private int limitTraffic; + private Integer limitTraffic; private Map params; - public boolean isNearBy() { + public Boolean isNearBy() { return nearBy; } @@ -143,7 +143,7 @@ public void setConsumerId(String consumerId) { this.consumerId = consumerId; } - public boolean isPaused() { + public Boolean isPaused() { return paused; } @@ -151,7 +151,7 @@ public void setPaused(boolean paused) { this.paused = paused; } - public boolean isArchive() { + public Boolean isArchive() { return archive; } @@ -159,7 +159,7 @@ public void setArchive(boolean archive) { this.archive = archive; } - public boolean isRetry() { + public Boolean isRetry() { return retry; } @@ -167,7 +167,7 @@ public void setRetry(boolean retry) { this.retry = retry; } - public int getDelay() { + public Integer getDelay() { return delay; } @@ -175,7 +175,7 @@ public void setDelay(int delay) { this.delay = delay; } - public int getAckTimeout() { + public Integer getAckTimeout() { return ackTimeout; } @@ -183,7 +183,7 @@ public void setAckTimeout(int ackTimeout) { this.ackTimeout = ackTimeout; } - public int getBatchSize() { + public Integer getBatchSize() { return batchSize; } @@ -191,7 +191,7 @@ public void setBatchSize(int batchSize) { this.batchSize = batchSize; } - public int getMaxRetrys() { + public Integer getMaxRetrys() { return maxRetrys; } @@ -199,7 +199,7 @@ public void setMaxRetrys(int maxRetrys) { this.maxRetrys = maxRetrys; } - public int getMaxRetryDelay() { + public Integer getMaxRetryDelay() { return maxRetryDelay; } @@ -207,7 +207,7 @@ public void setMaxRetryDelay(int maxRetryDelay) { this.maxRetryDelay = maxRetryDelay; } - public int getRetryDelay() { + public Integer getRetryDelay() { return retryDelay; } @@ -215,7 +215,7 @@ public void setRetryDelay(int retryDelay) { this.retryDelay = retryDelay; } - public boolean isUseExponentialBackOff() { + public Boolean isUseExponentialBackOff() { return useExponentialBackOff; } @@ -223,7 +223,7 @@ public void setUseExponentialBackOff(boolean useExponentialBackOff) { this.useExponentialBackOff = useExponentialBackOff; } - public double getBackOffMultiplier() { + public Double getBackOffMultiplier() { return backOffMultiplier; } @@ -231,7 +231,7 @@ public void setBackOffMultiplier(double backOffMultiplier) { this.backOffMultiplier = backOffMultiplier; } - public int getExpireTime() { + public Integer getExpireTime() { return expireTime; } @@ -239,7 +239,7 @@ public void setExpireTime(int expireTime) { this.expireTime = expireTime; } - public int getConcurrent() { + public Integer getConcurrent() { return concurrent; } @@ -267,7 +267,7 @@ public void setLimitTps(int limitTps) { this.limitTps = limitTps; } - public int getLimitTps() { + public Integer getLimitTps() { return limitTps; } @@ -275,7 +275,7 @@ public void setLimitTraffic(int limitTraffic) { this.limitTraffic = limitTraffic; } - public int getLimitTraffic() { + public Integer getLimitTraffic() { return limitTraffic; } diff --git a/joyqueue-console/joyqueue-data/joyqueue-data-model/src/main/java/org/joyqueue/model/domain/ProducerConfig.java b/joyqueue-console/joyqueue-data/joyqueue-data-model/src/main/java/org/joyqueue/model/domain/ProducerConfig.java index e84b6f184..59382fe9f 100644 --- a/joyqueue-console/joyqueue-data/joyqueue-data-model/src/main/java/org/joyqueue/model/domain/ProducerConfig.java +++ b/joyqueue-console/joyqueue-data/joyqueue-data-model/src/main/java/org/joyqueue/model/domain/ProducerConfig.java @@ -28,7 +28,7 @@ public class ProducerConfig extends BaseNsrModel { /** * 是否就近发送 **/ - private boolean nearBy; + private Boolean nearBy; /** * 集群实例发送权重 @@ -38,18 +38,18 @@ public class ProducerConfig extends BaseNsrModel { /** * 是否归档 */ - private boolean archive; + private Boolean archive; /** * 单个发送者 **/ - private boolean single = false; + private Boolean single = false; private String blackList; - private int limitTps; + private Integer limitTps; - private int limitTraffic; + private Integer limitTraffic; private Integer timeout; @@ -66,7 +66,7 @@ public void setProducerId(String producerId) { this.producerId = producerId; } - public boolean isNearBy() { + public Boolean isNearBy() { return nearBy; } @@ -82,7 +82,7 @@ public void setWeight(String weight) { this.weight = weight; } - public boolean isSingle() { + public Boolean isSingle() { return single; } @@ -90,7 +90,7 @@ public void setSingle(boolean single) { this.single = single; } - public boolean isArchive() { + public Boolean isArchive() { return archive; } @@ -110,7 +110,7 @@ public void setLimitTps(int limitTps) { this.limitTps = limitTps; } - public int getLimitTps() { + public Integer getLimitTps() { return limitTps; } @@ -118,7 +118,7 @@ public void setLimitTraffic(int limitTraffic) { this.limitTraffic = limitTraffic; } - public int getLimitTraffic() { + public Integer getLimitTraffic() { return limitTraffic; } diff --git a/joyqueue-console/joyqueue-web/joyqueue-web-handler/src/main/java/org/joyqueue/handler/routing/command/monitor/ConsumerCommand.java b/joyqueue-console/joyqueue-web/joyqueue-web-handler/src/main/java/org/joyqueue/handler/routing/command/monitor/ConsumerCommand.java index e82e7e14b..9dd842398 100644 --- a/joyqueue-console/joyqueue-web/joyqueue-web-handler/src/main/java/org/joyqueue/handler/routing/command/monitor/ConsumerCommand.java +++ b/joyqueue-console/joyqueue-web/joyqueue-web-handler/src/main/java/org/joyqueue/handler/routing/command/monitor/ConsumerCommand.java @@ -24,6 +24,7 @@ import com.jd.laf.web.vertx.response.Response; import com.jd.laf.web.vertx.response.Responses; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; import org.joyqueue.handler.Constants; import org.joyqueue.handler.annotation.PageQuery; @@ -150,12 +151,84 @@ public Response delete(@QueryParam(Constants.ID) String id) throws Exception { public Response configAddOrUpdate(@Body ConsumerConfig config) throws Exception { if (config != null) { Consumer consumer = service.findById(config.getConsumerId()); - consumer.setConfig(config); + mergeConsumerConfig(consumer, config); service.update(consumer); } return Responses.success(); } + private void mergeConsumerConfig(Consumer consumer, ConsumerConfig config) { + if (consumer.getConfig() == null) { + consumer.setConfig(config); + } + if (config.getAckTimeout() !=null) { + consumer.getConfig().setAckTimeout(config.getAckTimeout()); + } + if (config.getBackOffMultiplier() != null) { + consumer.getConfig().setBackOffMultiplier(config.getBackOffMultiplier()); + } + if (config.getBatchSize() != null) { + consumer.getConfig().setBatchSize(config.getBatchSize()); + } + if (config.getConcurrent() != null) { + consumer.getConfig().setConcurrent(config.getConcurrent()); + } + if (config.getDelay() != null) { + consumer.getConfig().setDelay(config.getDelay()); + } + if (config.getRetryDelay() != null) { + consumer.getConfig().setRetryDelay(config.getRetryDelay()); + } + if (config.getMaxRetrys() != null) { + consumer.getConfig().setMaxRetrys(config.getMaxRetrys()); + } + if (config.getMaxRetryDelay() != null) { + consumer.getConfig().setMaxRetryDelay(config.getMaxRetryDelay()); + } + if (config.getLimitTps() != null) { + consumer.getConfig().setLimitTps(config.getLimitTps()); + } + if (config.getLimitTraffic() != null) { + consumer.getConfig().setLimitTraffic(config.getLimitTraffic()); + } + if (StringUtils.isNotBlank(config.getRegion())) { + consumer.getConfig().setRegion(config.getRegion()); + } + if (StringUtils.isNotBlank(config.getBlackList())) { + consumer.getConfig().setBlackList(config.getBlackList()); + } + if (StringUtils.isNotBlank(config.getFilters())) { + consumer.getConfig().setFilters(config.getFilters()); + } + if (config.isArchive() != null) { + consumer.getConfig().setArchive(config.isArchive()); + } + if (config.isNearBy() != null) { + consumer.getConfig().setNearBy(config.isNearBy()); + } + if (config.isPaused() != null) { + consumer.getConfig().setPaused(config.isPaused()); + } + if (config.isRetry() != null) { + consumer.getConfig().setRetry(config.isRetry()); + } + if (config.isUseExponentialBackOff() != null) { + consumer.getConfig().setUseExponentialBackOff(config.isUseExponentialBackOff()); + } + if (MapUtils.isNotEmpty(config.getParams())) { + if (consumer.getConfig().getParams() == null) { + consumer.getConfig().setParams(config.getParams()); + }else { + consumer.getConfig().getParams().putAll(config.getParams()); + } + } + if (config.getOffsetMode() != null) { + consumer.getConfig().setOffsetMode(config.getOffsetMode()); + } + if (config.getExpireTime() != null) { + consumer.getConfig().setExpireTime(config.getExpireTime()); + } + } /** * 同步producer diff --git a/joyqueue-console/joyqueue-web/joyqueue-web-handler/src/main/java/org/joyqueue/handler/routing/command/monitor/ProducerConfigAddOrUpdateCommand.java b/joyqueue-console/joyqueue-web/joyqueue-web-handler/src/main/java/org/joyqueue/handler/routing/command/monitor/ProducerConfigAddOrUpdateCommand.java index 941fde38b..6dc988c5a 100644 --- a/joyqueue-console/joyqueue-web/joyqueue-web-handler/src/main/java/org/joyqueue/handler/routing/command/monitor/ProducerConfigAddOrUpdateCommand.java +++ b/joyqueue-console/joyqueue-web/joyqueue-web-handler/src/main/java/org/joyqueue/handler/routing/command/monitor/ProducerConfigAddOrUpdateCommand.java @@ -16,6 +16,8 @@ package org.joyqueue.handler.routing.command.monitor; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; import org.joyqueue.handler.annotation.Operator; import org.joyqueue.model.domain.Identity; import org.joyqueue.model.domain.Producer; @@ -56,7 +58,7 @@ public String type() { public Response execute() throws Exception { Preconditions.checkArgument(null!=producerConfig, "invalid argument"); Producer producer = producerService.findById(producerConfig.getProducerId()); - producer.setConfig(producerConfig); + mergeProducerConfig(producer, producerConfig); return Responses.success(producerService.update(producer)); } @@ -64,4 +66,49 @@ public Response execute() throws Exception { public void clean() { producerConfig = null; } + + private void mergeProducerConfig(Producer producer, ProducerConfig config) { + if (producer.getConfig() == null) { + producer.setConfig(config); + return; + } + if (StringUtils.isNotBlank(config.getBlackList())) { + producer.getConfig().setBlackList(config.getBlackList()); + } + if (StringUtils.isNotBlank(config.getRegion())) { + producer.getConfig().setRegion(config.getRegion()); + } + if (StringUtils.isNotBlank(config.getWeight())) { + producer.getConfig().setWeight(config.getWeight()); + } + if (config.isArchive() != null) { + producer.getConfig().setArchive(config.isArchive()); + } + if (config.isNearBy() != null) { + producer.getConfig().setNearBy(config.isNearBy()); + } + if (config.isSingle() != null) { + producer.getConfig().setSingle(config.isSingle()); + } + if (config.getQosLevel() != null) { + producer.getConfig().setQosLevel(config.getQosLevel()); + } + if (config.getLimitTps() != null) { + producer.getConfig().setLimitTps(config.getLimitTps()); + } + if (config.getLimitTraffic() !=null) { + producer.getConfig().setLimitTraffic(config.getLimitTraffic()); + } + if (MapUtils.isNotEmpty(config.getParams())) { + if (producer.getConfig().getParams() != null) { + producer.getConfig().getParams().putAll(config.getParams()); + }else { + producer.getConfig().setParams(config.getParams()); + } + } + if (config.getTimeout() !=null) { + producer.getConfig().setTimeout(config.getTimeout()); + } + + } } From 460f7ed6c235ea7ff1e8dd5ed2535091e9108004 Mon Sep 17 00:00:00 2001 From: iamazy <1448588084@qq.com> Date: Sun, 27 Sep 2020 09:38:29 +0800 Subject: [PATCH 05/14] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dproducer=EF=BC=8Cconsum?= =?UTF-8?q?er=E9=85=8D=E7=BD=AE=E6=8E=A5=E5=8F=A3=E5=A4=9A=E5=A4=84?= =?UTF-8?q?=E8=B0=83=E7=94=A8=E8=A2=AB=E8=A6=86=E7=9B=96=E7=9A=84bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../handler/routing/command/monitor/ConsumerCommand.java | 1 + 1 file changed, 1 insertion(+) diff --git a/joyqueue-console/joyqueue-web/joyqueue-web-handler/src/main/java/org/joyqueue/handler/routing/command/monitor/ConsumerCommand.java b/joyqueue-console/joyqueue-web/joyqueue-web-handler/src/main/java/org/joyqueue/handler/routing/command/monitor/ConsumerCommand.java index 9dd842398..a20524452 100644 --- a/joyqueue-console/joyqueue-web/joyqueue-web-handler/src/main/java/org/joyqueue/handler/routing/command/monitor/ConsumerCommand.java +++ b/joyqueue-console/joyqueue-web/joyqueue-web-handler/src/main/java/org/joyqueue/handler/routing/command/monitor/ConsumerCommand.java @@ -160,6 +160,7 @@ public Response configAddOrUpdate(@Body ConsumerConfig config) throws Exception private void mergeConsumerConfig(Consumer consumer, ConsumerConfig config) { if (consumer.getConfig() == null) { consumer.setConfig(config); + return; } if (config.getAckTimeout() !=null) { consumer.getConfig().setAckTimeout(config.getAckTimeout()); From abe2de6472579f77188f911054b2682015eacff1 Mon Sep 17 00:00:00 2001 From: iamazy <1448588084@qq.com> Date: Fri, 9 Oct 2020 14:31:16 +0800 Subject: [PATCH 06/14] =?UTF-8?q?broker=E7=AE=A1=E7=90=86=E9=A1=B5?= =?UTF-8?q?=E6=90=9C=E7=B4=A2=E9=80=9F=E5=BA=A6=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../BrokerGroupRelatedRepository.java | 4 ++++ .../resources/mapper/BrokerGroupRelated.xml | 8 ++++++++ .../service/BrokerGroupRelatedService.java | 4 ++++ .../impl/BrokerGroupRelatedServiceImpl.java | 18 ++++++++++++++++++ .../service/impl/BrokerServiceImpl.java | 8 +++++--- .../service/impl/ProducerServiceImpl.java | 16 ++++++++++++++++ .../command/monitor/ProducerCommand.java | 8 +------- 7 files changed, 56 insertions(+), 10 deletions(-) diff --git a/joyqueue-console/joyqueue-data/joyqueue-data-repository/joyqueue-data-repository-api/src/main/java/org/joyqueue/repository/BrokerGroupRelatedRepository.java b/joyqueue-console/joyqueue-data/joyqueue-data-repository/joyqueue-data-repository-api/src/main/java/org/joyqueue/repository/BrokerGroupRelatedRepository.java index 3b3db3164..995348071 100644 --- a/joyqueue-console/joyqueue-data/joyqueue-data-repository/joyqueue-data-repository-api/src/main/java/org/joyqueue/repository/BrokerGroupRelatedRepository.java +++ b/joyqueue-console/joyqueue-data/joyqueue-data-repository/joyqueue-data-repository-api/src/main/java/org/joyqueue/repository/BrokerGroupRelatedRepository.java @@ -16,10 +16,13 @@ package org.joyqueue.repository; +import org.apache.ibatis.annotations.Param; import org.joyqueue.model.domain.BrokerGroupRelated; import org.joyqueue.model.query.QBrokerGroupRelated; import org.springframework.stereotype.Repository; +import java.util.List; + /** * Created by lining on 16-11-28. */ @@ -27,4 +30,5 @@ public interface BrokerGroupRelatedRepository extends PageRepository { int updateGroupByGroupId(BrokerGroupRelated brokerGroupRelated); int deleteByGroupId(long groupId); + List findByBrokerIds(@Param("brokerIds") List brokerIds); } diff --git a/joyqueue-console/joyqueue-data/joyqueue-data-repository/joyqueue-data-repository-mybatis/src/main/resources/mapper/BrokerGroupRelated.xml b/joyqueue-console/joyqueue-data/joyqueue-data-repository/joyqueue-data-repository-mybatis/src/main/resources/mapper/BrokerGroupRelated.xml index 7031e539d..b27fc8b2c 100644 --- a/joyqueue-console/joyqueue-data/joyqueue-data-repository/joyqueue-data-repository-mybatis/src/main/resources/mapper/BrokerGroupRelated.xml +++ b/joyqueue-console/joyqueue-data/joyqueue-data-repository/joyqueue-data-repository-mybatis/src/main/resources/mapper/BrokerGroupRelated.xml @@ -54,6 +54,14 @@ WHERE b.id=#{id} + +