From b6d41a184808bb5a473f19f91dbc0216ef80fbdd Mon Sep 17 00:00:00 2001 From: iamazy <1448588084@qq.com> Date: Thu, 15 Oct 2020 15:36:25 +0800 Subject: [PATCH] =?UTF-8?q?topic=E4=B8=BB=E9=A1=B5=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E6=9F=A5=E7=9C=8B=E6=B6=88=E8=B4=B9=E8=80=85=E5=BC=B9=E6=A1=86?= =?UTF-8?q?=20(#310)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit topic主页添加查看消费者弹框 --- .../joyqueue/model/domain/ConsumerConfig.java | 52 +++++------ .../joyqueue/model/domain/ProducerConfig.java | 20 ++-- .../convert/NsrConsumerConverter.java | 4 +- .../impl/BrokerGroupRelatedServiceImpl.java | 14 +-- .../service/impl/ConsumerServiceImpl.java | 1 + .../src/main/resources/routing.xml | 9 ++ .../command/monitor/ConsumerCommand.java | 91 +++---------------- .../command/monitor/ProducerCommand.java | 13 +++ .../ProducerConfigAddOrUpdateCommand.java | 50 +--------- 9 files changed, 86 insertions(+), 168 deletions(-) diff --git a/joyqueue-console/joyqueue-data/joyqueue-data-model/src/main/java/org/joyqueue/model/domain/ConsumerConfig.java b/joyqueue-console/joyqueue-data/joyqueue-data-model/src/main/java/org/joyqueue/model/domain/ConsumerConfig.java index 250d197c8..3e68f28be 100644 --- a/joyqueue-console/joyqueue-data/joyqueue-data-model/src/main/java/org/joyqueue/model/domain/ConsumerConfig.java +++ b/joyqueue-console/joyqueue-data/joyqueue-data-model/src/main/java/org/joyqueue/model/domain/ConsumerConfig.java @@ -30,17 +30,17 @@ public class ConsumerConfig extends BaseNsrModel { /** * 开启就近机房消费 **/ - private Boolean nearBy; + private Boolean nearBy = false; /** * 是否暂停消费 **/ - private Boolean paused; + private Boolean paused = false; /** * 是否需要归档,默认不归档 **/ - private Boolean archive; + private Boolean archive = false; /** * 是否启用重试服务,默认开启 @@ -75,19 +75,19 @@ public class ConsumerConfig extends BaseNsrModel { * 最大重试次数(无限制) **/ @Min(0) - private Integer maxRetrys; + private Integer maxRetrys = 0; /** * 最大重试间隔(默认5分钟) **/ @Min(0) - private Integer maxRetryDelay; + private Integer maxRetryDelay = 0; /** * 重试间隔 */ @Min(0) - private Integer retryDelay; + private Integer retryDelay = 0; private String region; @@ -100,13 +100,13 @@ public class ConsumerConfig extends BaseNsrModel { * 指数系数 */ @Min(0) - private Double backOffMultiplier; + private Double backOffMultiplier = 0D; /** * 过期时间(默认3天) **/ @Min(0) - private Integer expireTime; + private Integer expireTime = 0; /** * 单队列并行度 @@ -120,9 +120,9 @@ public class ConsumerConfig extends BaseNsrModel { private String filters; - private Integer limitTps; + private Integer limitTps = 0; - private Integer limitTraffic; + private Integer limitTraffic = 0; private Map params; @@ -131,7 +131,7 @@ public Boolean isNearBy() { return nearBy; } - public void setNearBy(boolean nearBy) { + public void setNearBy(Boolean nearBy) { this.nearBy = nearBy; } @@ -147,7 +147,7 @@ public Boolean isPaused() { return paused; } - public void setPaused(boolean paused) { + public void setPaused(Boolean paused) { this.paused = paused; } @@ -155,7 +155,7 @@ public Boolean isArchive() { return archive; } - public void setArchive(boolean archive) { + public void setArchive(Boolean archive) { this.archive = archive; } @@ -163,7 +163,7 @@ public Boolean isRetry() { return retry; } - public void setRetry(boolean retry) { + public void setRetry(Boolean retry) { this.retry = retry; } @@ -171,7 +171,7 @@ public Integer getDelay() { return delay; } - public void setDelay(int delay) { + public void setDelay(Integer delay) { this.delay = delay; } @@ -179,7 +179,7 @@ public Integer getAckTimeout() { return ackTimeout; } - public void setAckTimeout(int ackTimeout) { + public void setAckTimeout(Integer ackTimeout) { this.ackTimeout = ackTimeout; } @@ -187,7 +187,7 @@ public Integer getBatchSize() { return batchSize; } - public void setBatchSize(int batchSize) { + public void setBatchSize(Integer batchSize) { this.batchSize = batchSize; } @@ -195,7 +195,7 @@ public Integer getMaxRetrys() { return maxRetrys; } - public void setMaxRetrys(int maxRetrys) { + public void setMaxRetrys(Integer maxRetrys) { this.maxRetrys = maxRetrys; } @@ -203,7 +203,7 @@ public Integer getMaxRetryDelay() { return maxRetryDelay; } - public void setMaxRetryDelay(int maxRetryDelay) { + public void setMaxRetryDelay(Integer maxRetryDelay) { this.maxRetryDelay = maxRetryDelay; } @@ -211,7 +211,7 @@ public Integer getRetryDelay() { return retryDelay; } - public void setRetryDelay(int retryDelay) { + public void setRetryDelay(Integer retryDelay) { this.retryDelay = retryDelay; } @@ -219,7 +219,7 @@ public Boolean isUseExponentialBackOff() { return useExponentialBackOff; } - public void setUseExponentialBackOff(boolean useExponentialBackOff) { + public void setUseExponentialBackOff(Boolean useExponentialBackOff) { this.useExponentialBackOff = useExponentialBackOff; } @@ -227,7 +227,7 @@ public Double getBackOffMultiplier() { return backOffMultiplier; } - public void setBackOffMultiplier(double backOffMultiplier) { + public void setBackOffMultiplier(Double backOffMultiplier) { this.backOffMultiplier = backOffMultiplier; } @@ -235,7 +235,7 @@ public Integer getExpireTime() { return expireTime; } - public void setExpireTime(int expireTime) { + public void setExpireTime(Integer expireTime) { this.expireTime = expireTime; } @@ -243,7 +243,7 @@ public Integer getConcurrent() { return concurrent; } - public void setConcurrent(int concurrent) { + public void setConcurrent(Integer concurrent) { this.concurrent = concurrent; } @@ -263,7 +263,7 @@ public void setFilters(String filters) { this.filters = filters; } - public void setLimitTps(int limitTps) { + public void setLimitTps(Integer limitTps) { this.limitTps = limitTps; } @@ -271,7 +271,7 @@ public Integer getLimitTps() { return limitTps; } - public void setLimitTraffic(int limitTraffic) { + public void setLimitTraffic(Integer limitTraffic) { this.limitTraffic = limitTraffic; } diff --git a/joyqueue-console/joyqueue-data/joyqueue-data-model/src/main/java/org/joyqueue/model/domain/ProducerConfig.java b/joyqueue-console/joyqueue-data/joyqueue-data-model/src/main/java/org/joyqueue/model/domain/ProducerConfig.java index 59382fe9f..8ea676daa 100644 --- a/joyqueue-console/joyqueue-data/joyqueue-data-model/src/main/java/org/joyqueue/model/domain/ProducerConfig.java +++ b/joyqueue-console/joyqueue-data/joyqueue-data-model/src/main/java/org/joyqueue/model/domain/ProducerConfig.java @@ -28,7 +28,7 @@ public class ProducerConfig extends BaseNsrModel { /** * 是否就近发送 **/ - private Boolean nearBy; + private Boolean nearBy = false; /** * 集群实例发送权重 @@ -38,7 +38,7 @@ public class ProducerConfig extends BaseNsrModel { /** * 是否归档 */ - private Boolean archive; + private Boolean archive = false; /** * 单个发送者 @@ -47,11 +47,11 @@ public class ProducerConfig extends BaseNsrModel { private String blackList; - private Integer limitTps; + private Integer limitTps = 0; - private Integer limitTraffic; + private Integer limitTraffic = 0; - private Integer timeout; + private Integer timeout = 2000; private Integer qosLevel; private String region; @@ -70,7 +70,7 @@ public Boolean isNearBy() { return nearBy; } - public void setNearBy(boolean nearBy) { + public void setNearBy(Boolean nearBy) { this.nearBy = nearBy; } @@ -86,7 +86,7 @@ public Boolean isSingle() { return single; } - public void setSingle(boolean single) { + public void setSingle(Boolean single) { this.single = single; } @@ -94,7 +94,7 @@ public Boolean isArchive() { return archive; } - public void setArchive(boolean archive) { + public void setArchive(Boolean archive) { this.archive = archive; } @@ -106,7 +106,7 @@ public void setBlackList(String blackList) { this.blackList = blackList; } - public void setLimitTps(int limitTps) { + public void setLimitTps(Integer limitTps) { this.limitTps = limitTps; } @@ -114,7 +114,7 @@ public Integer getLimitTps() { return limitTps; } - public void setLimitTraffic(int limitTraffic) { + public void setLimitTraffic(Integer limitTraffic) { this.limitTraffic = limitTraffic; } diff --git a/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/convert/NsrConsumerConverter.java b/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/convert/NsrConsumerConverter.java index 090a46b58..014e10249 100644 --- a/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/convert/NsrConsumerConverter.java +++ b/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/convert/NsrConsumerConverter.java @@ -99,7 +99,9 @@ protected Consumer backward(org.joyqueue.domain.Consumer nsrConsumer) { consumerConfig.setBlackList(StringUtils.join(consumerPolicy.getBlackList(), ",")); consumerConfig.setAckTimeout(consumerPolicy.getAckTimeout()); consumerConfig.setArchive(consumerPolicy.getArchive()); - consumerConfig.setBatchSize(consumerPolicy.getBatchSize()); + if (consumerPolicy.getBatchSize() != null) { + consumerConfig.setBatchSize(Integer.parseInt(consumerPolicy.getBatchSize().toString())); + } consumerConfig.setConcurrent(consumerPolicy.getConcurrent()); consumerConfig.setDelay(consumerPolicy.getDelay()); consumerConfig.setPaused(consumerPolicy.getPaused()); diff --git a/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/service/impl/BrokerGroupRelatedServiceImpl.java b/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/service/impl/BrokerGroupRelatedServiceImpl.java index e7acd26a4..3bfd96f83 100644 --- a/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/service/impl/BrokerGroupRelatedServiceImpl.java +++ b/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/service/impl/BrokerGroupRelatedServiceImpl.java @@ -47,12 +47,14 @@ public int deleteByGroupId(long groupId) { @Override public Map findGroupByBrokerIds(List brokerIds) { - List brokerGroupRelateds = repository.findByBrokerIds(brokerIds); - if (CollectionUtils.isNotEmpty(brokerGroupRelateds)) { - return brokerGroupRelateds.stream() - .filter(brokerGroupRelated -> brokerGroupRelated.getGroup() != null) - .collect(Collectors.toMap(BrokerGroupRelated::getId, - BrokerGroupRelated::getGroup)); + if (CollectionUtils.isNotEmpty(brokerIds)) { + List brokerGroupRelateds = repository.findByBrokerIds(brokerIds); + if (CollectionUtils.isNotEmpty(brokerGroupRelateds)) { + return brokerGroupRelateds.stream() + .filter(brokerGroupRelated -> brokerGroupRelated.getGroup() != null) + .collect(Collectors.toMap(BrokerGroupRelated::getId, + BrokerGroupRelated::getGroup)); + } } return Collections.emptyMap(); } diff --git a/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/service/impl/ConsumerServiceImpl.java b/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/service/impl/ConsumerServiceImpl.java index 2ac672252..82d327c24 100644 --- a/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/service/impl/ConsumerServiceImpl.java +++ b/joyqueue-console/joyqueue-data/joyqueue-data-service/src/main/java/org/joyqueue/service/impl/ConsumerServiceImpl.java @@ -212,6 +212,7 @@ protected Consumer fillConsumer(Consumer consumer) { if (application != null) { app.setId(application.getId()); app.setName(application.getName()); + consumer.setOwner(application.getOwner()); } return consumer; } diff --git a/joyqueue-console/joyqueue-web/joyqueue-web-application/src/main/resources/routing.xml b/joyqueue-console/joyqueue-web/joyqueue-web-application/src/main/resources/routing.xml index bfc1f626c..71da37473 100644 --- a/joyqueue-console/joyqueue-web/joyqueue-web-application/src/main/resources/routing.xml +++ b/joyqueue-console/joyqueue-web/joyqueue-web-application/src/main/resources/routing.xml @@ -301,6 +301,9 @@ + + + diff --git a/joyqueue-console/joyqueue-web/joyqueue-web-handler/src/main/java/org/joyqueue/handler/routing/command/monitor/ConsumerCommand.java b/joyqueue-console/joyqueue-web/joyqueue-web-handler/src/main/java/org/joyqueue/handler/routing/command/monitor/ConsumerCommand.java index a20524452..2ff0cde5f 100644 --- a/joyqueue-console/joyqueue-web/joyqueue-web-handler/src/main/java/org/joyqueue/handler/routing/command/monitor/ConsumerCommand.java +++ b/joyqueue-console/joyqueue-web/joyqueue-web-handler/src/main/java/org/joyqueue/handler/routing/command/monitor/ConsumerCommand.java @@ -24,7 +24,6 @@ import com.jd.laf.web.vertx.response.Response; import com.jd.laf.web.vertx.response.Responses; import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; import org.joyqueue.handler.Constants; import org.joyqueue.handler.annotation.PageQuery; @@ -147,90 +146,30 @@ public Response delete(@QueryParam(Constants.ID) String id) throws Exception { return Responses.success(); } + @Path("queryByTopic") + public Response queryByTopic(@Body QConsumer qConsumer) throws Exception { + if (qConsumer.getTopic() == null || qConsumer.getTopic().getCode() == null) { + return Responses.error(Response.HTTP_BAD_REQUEST, "Empty topic!"); + } + String namespace = null; + String topic = qConsumer.getTopic().getCode(); + if (null != qConsumer.getTopic().getNamespace()) { + namespace = qConsumer.getTopic().getNamespace().getCode(); + } + List consumers = service.findByTopic(topic, namespace); + return Responses.success(consumers); + } + @Path("configAddOrUpdate") public Response configAddOrUpdate(@Body ConsumerConfig config) throws Exception { if (config != null) { Consumer consumer = service.findById(config.getConsumerId()); - mergeConsumerConfig(consumer, config); + consumer.setConfig(config); service.update(consumer); } return Responses.success(); } - private void mergeConsumerConfig(Consumer consumer, ConsumerConfig config) { - if (consumer.getConfig() == null) { - consumer.setConfig(config); - return; - } - if (config.getAckTimeout() !=null) { - consumer.getConfig().setAckTimeout(config.getAckTimeout()); - } - if (config.getBackOffMultiplier() != null) { - consumer.getConfig().setBackOffMultiplier(config.getBackOffMultiplier()); - } - if (config.getBatchSize() != null) { - consumer.getConfig().setBatchSize(config.getBatchSize()); - } - if (config.getConcurrent() != null) { - consumer.getConfig().setConcurrent(config.getConcurrent()); - } - if (config.getDelay() != null) { - consumer.getConfig().setDelay(config.getDelay()); - } - if (config.getRetryDelay() != null) { - consumer.getConfig().setRetryDelay(config.getRetryDelay()); - } - if (config.getMaxRetrys() != null) { - consumer.getConfig().setMaxRetrys(config.getMaxRetrys()); - } - if (config.getMaxRetryDelay() != null) { - consumer.getConfig().setMaxRetryDelay(config.getMaxRetryDelay()); - } - if (config.getLimitTps() != null) { - consumer.getConfig().setLimitTps(config.getLimitTps()); - } - if (config.getLimitTraffic() != null) { - consumer.getConfig().setLimitTraffic(config.getLimitTraffic()); - } - if (StringUtils.isNotBlank(config.getRegion())) { - consumer.getConfig().setRegion(config.getRegion()); - } - if (StringUtils.isNotBlank(config.getBlackList())) { - consumer.getConfig().setBlackList(config.getBlackList()); - } - if (StringUtils.isNotBlank(config.getFilters())) { - consumer.getConfig().setFilters(config.getFilters()); - } - if (config.isArchive() != null) { - consumer.getConfig().setArchive(config.isArchive()); - } - if (config.isNearBy() != null) { - consumer.getConfig().setNearBy(config.isNearBy()); - } - if (config.isPaused() != null) { - consumer.getConfig().setPaused(config.isPaused()); - } - if (config.isRetry() != null) { - consumer.getConfig().setRetry(config.isRetry()); - } - if (config.isUseExponentialBackOff() != null) { - consumer.getConfig().setUseExponentialBackOff(config.isUseExponentialBackOff()); - } - if (MapUtils.isNotEmpty(config.getParams())) { - if (consumer.getConfig().getParams() == null) { - consumer.getConfig().setParams(config.getParams()); - }else { - consumer.getConfig().getParams().putAll(config.getParams()); - } - } - if (config.getOffsetMode() != null) { - consumer.getConfig().setOffsetMode(config.getOffsetMode()); - } - if (config.getExpireTime() != null) { - consumer.getConfig().setExpireTime(config.getExpireTime()); - } - } - /** * 同步producer * @return diff --git a/joyqueue-console/joyqueue-web/joyqueue-web-handler/src/main/java/org/joyqueue/handler/routing/command/monitor/ProducerCommand.java b/joyqueue-console/joyqueue-web/joyqueue-web-handler/src/main/java/org/joyqueue/handler/routing/command/monitor/ProducerCommand.java index 0807cdec9..83b2f0fbd 100644 --- a/joyqueue-console/joyqueue-web/joyqueue-web-handler/src/main/java/org/joyqueue/handler/routing/command/monitor/ProducerCommand.java +++ b/joyqueue-console/joyqueue-web/joyqueue-web-handler/src/main/java/org/joyqueue/handler/routing/command/monitor/ProducerCommand.java @@ -179,6 +179,19 @@ public Response findPartitionGroupWeight(@QueryParam(Constants.ID) String id) th return Responses.success(currentWeights); } + @Path("updateWeight") + public Response updateWeight(@QueryParam(Constants.ID) String id, @Body Map body) throws Exception { + Producer producer = service.findById(id); + if (body.containsKey("weight")) { + if (producer.getConfig() == null) { + producer.setConfig(new ProducerConfig()); + } + producer.getConfig().setWeight(body.get("weight").toString()); + } + service.update(producer); + return Responses.success(); + } + /** * 同步producer * @return diff --git a/joyqueue-console/joyqueue-web/joyqueue-web-handler/src/main/java/org/joyqueue/handler/routing/command/monitor/ProducerConfigAddOrUpdateCommand.java b/joyqueue-console/joyqueue-web/joyqueue-web-handler/src/main/java/org/joyqueue/handler/routing/command/monitor/ProducerConfigAddOrUpdateCommand.java index 6dc988c5a..eb9a320b7 100644 --- a/joyqueue-console/joyqueue-web/joyqueue-web-handler/src/main/java/org/joyqueue/handler/routing/command/monitor/ProducerConfigAddOrUpdateCommand.java +++ b/joyqueue-console/joyqueue-web/joyqueue-web-handler/src/main/java/org/joyqueue/handler/routing/command/monitor/ProducerConfigAddOrUpdateCommand.java @@ -15,9 +15,6 @@ */ package org.joyqueue.handler.routing.command.monitor; - -import org.apache.commons.collections.MapUtils; -import org.apache.commons.lang3.StringUtils; import org.joyqueue.handler.annotation.Operator; import org.joyqueue.model.domain.Identity; import org.joyqueue.model.domain.Producer; @@ -58,7 +55,7 @@ public String type() { public Response execute() throws Exception { Preconditions.checkArgument(null!=producerConfig, "invalid argument"); Producer producer = producerService.findById(producerConfig.getProducerId()); - mergeProducerConfig(producer, producerConfig); + producer.setConfig(producerConfig); return Responses.success(producerService.update(producer)); } @@ -66,49 +63,4 @@ public Response execute() throws Exception { public void clean() { producerConfig = null; } - - private void mergeProducerConfig(Producer producer, ProducerConfig config) { - if (producer.getConfig() == null) { - producer.setConfig(config); - return; - } - if (StringUtils.isNotBlank(config.getBlackList())) { - producer.getConfig().setBlackList(config.getBlackList()); - } - if (StringUtils.isNotBlank(config.getRegion())) { - producer.getConfig().setRegion(config.getRegion()); - } - if (StringUtils.isNotBlank(config.getWeight())) { - producer.getConfig().setWeight(config.getWeight()); - } - if (config.isArchive() != null) { - producer.getConfig().setArchive(config.isArchive()); - } - if (config.isNearBy() != null) { - producer.getConfig().setNearBy(config.isNearBy()); - } - if (config.isSingle() != null) { - producer.getConfig().setSingle(config.isSingle()); - } - if (config.getQosLevel() != null) { - producer.getConfig().setQosLevel(config.getQosLevel()); - } - if (config.getLimitTps() != null) { - producer.getConfig().setLimitTps(config.getLimitTps()); - } - if (config.getLimitTraffic() !=null) { - producer.getConfig().setLimitTraffic(config.getLimitTraffic()); - } - if (MapUtils.isNotEmpty(config.getParams())) { - if (producer.getConfig().getParams() != null) { - producer.getConfig().getParams().putAll(config.getParams()); - }else { - producer.getConfig().setParams(config.getParams()); - } - } - if (config.getTimeout() !=null) { - producer.getConfig().setTimeout(config.getTimeout()); - } - - } }