From f1496b5084ced263f74e76039125c6596287cd0e Mon Sep 17 00:00:00 2001 From: EightMonth Date: Fri, 12 Apr 2024 10:16:30 +0800 Subject: [PATCH] =?UTF-8?q?rocketmq-starter=E5=BA=94=E7=94=A8=E4=B8=BE?= =?UTF-8?q?=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../jeecg-system-cloud-start/pom.xml | 6 ++ .../jeecg-cloud-test-rocketmq/pom.xml | 22 +++++++ .../test/rocketmq/constant/CloudConstant.java | 28 +++++++++ .../controller/JeecgMqTestController.java | 61 +++++++++++++++++++ .../test/rocketmq/event/DemoBusEvent.java | 29 +++++++++ .../rocketmq/listener/HelloReceiver1.java | 27 ++++++++ .../rocketmq/listener/HelloReceiver2.java | 27 ++++++++ .../rocketmq/listener/HelloReceiver3.java | 27 ++++++++ .../rocketmq/listener/HelloTimeReceiver.java | 24 ++++++++ .../jeecg-visual/jeecg-cloud-test/pom.xml | 1 + pom.xml | 6 ++ 11 files changed, 258 insertions(+) create mode 100644 jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/jeecg-cloud-test-rocketmq/pom.xml create mode 100644 jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/jeecg-cloud-test-rocketmq/src/main/java/org/jeecg/modules/test/rocketmq/constant/CloudConstant.java create mode 100644 jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/jeecg-cloud-test-rocketmq/src/main/java/org/jeecg/modules/test/rocketmq/controller/JeecgMqTestController.java create mode 100644 jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/jeecg-cloud-test-rocketmq/src/main/java/org/jeecg/modules/test/rocketmq/event/DemoBusEvent.java create mode 100644 jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/jeecg-cloud-test-rocketmq/src/main/java/org/jeecg/modules/test/rocketmq/listener/HelloReceiver1.java create mode 100644 jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/jeecg-cloud-test-rocketmq/src/main/java/org/jeecg/modules/test/rocketmq/listener/HelloReceiver2.java create mode 100644 jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/jeecg-cloud-test-rocketmq/src/main/java/org/jeecg/modules/test/rocketmq/listener/HelloReceiver3.java create mode 100644 jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/jeecg-cloud-test-rocketmq/src/main/java/org/jeecg/modules/test/rocketmq/listener/HelloTimeReceiver.java diff --git a/jeecg-server-cloud/jeecg-system-cloud-start/pom.xml b/jeecg-server-cloud/jeecg-system-cloud-start/pom.xml index e444422b3b..e4387cd25e 100644 --- a/jeecg-server-cloud/jeecg-system-cloud-start/pom.xml +++ b/jeecg-server-cloud/jeecg-system-cloud-start/pom.xml @@ -49,6 +49,12 @@ jeecg-cloud-test-rabbitmq ${jeecgboot.version} --> + + + + org.jeecgframework.boot + jeecg-boot-starter-rocketmq + + + + \ No newline at end of file diff --git a/jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/jeecg-cloud-test-rocketmq/src/main/java/org/jeecg/modules/test/rocketmq/constant/CloudConstant.java b/jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/jeecg-cloud-test-rocketmq/src/main/java/org/jeecg/modules/test/rocketmq/constant/CloudConstant.java new file mode 100644 index 0000000000..bb60ac9598 --- /dev/null +++ b/jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/jeecg-cloud-test-rocketmq/src/main/java/org/jeecg/modules/test/rocketmq/constant/CloudConstant.java @@ -0,0 +1,28 @@ +package org.jeecg.modules.test.rocketmq.constant; + +/** + * 微服务单元测试常量定义 + * @author: zyf + * @date: 2022/04/21 + */ +public interface CloudConstant { + + + /** + * MQ测试队列名字 + */ + public final static String MQ_JEECG_PLACE_ORDER = "jeecg_place_order"; + public final static String MQ_JEECG_PLACE_ORDER_TIME = "jeecg_place_order_time"; + + /** + * MQ测试消息总线 + */ + public final static String MQ_DEMO_BUS_EVENT = "demoBusEvent"; + + /** + * 分布式锁lock key + */ + public final static String REDISSON_DEMO_LOCK_KEY1 = "demoLockKey1"; + public final static String REDISSON_DEMO_LOCK_KEY2 = "demoLockKey2"; + +} diff --git a/jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/jeecg-cloud-test-rocketmq/src/main/java/org/jeecg/modules/test/rocketmq/controller/JeecgMqTestController.java b/jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/jeecg-cloud-test-rocketmq/src/main/java/org/jeecg/modules/test/rocketmq/controller/JeecgMqTestController.java new file mode 100644 index 0000000000..b42bcbf127 --- /dev/null +++ b/jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/jeecg-cloud-test-rocketmq/src/main/java/org/jeecg/modules/test/rocketmq/controller/JeecgMqTestController.java @@ -0,0 +1,61 @@ +package org.jeecg.modules.test.rocketmq.controller; + + +import cn.hutool.core.util.RandomUtil; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import org.jeecg.boot.starter.rabbitmq.client.RabbitMqClient; +import org.jeecg.common.api.vo.Result; +import org.jeecg.common.base.BaseMap; +import org.jeecg.modules.test.rocketmq.constant.CloudConstant; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import javax.servlet.http.HttpServletRequest; + + +/** + * RocketMqClient发送消息 + * @author: zyf + * @date: 2022/04/21 + */ +@RestController +@RequestMapping("/sys/test") +@Api(tags = "【微服务】MQ单元测试") +public class JeecgMqTestController { + + @Autowired + private RabbitMqClient rabbitMqClient; + + + /** + * 测试方法:快速点击发送MQ消息 + * 观察三个接受者如何分配处理消息:HelloReceiver1、HelloReceiver2、HelloReceiver3,会均衡分配 + * + * @param req + * @return + */ + @GetMapping(value = "/rocketmq") + @ApiOperation(value = "测试rocketmq", notes = "测试rocketmq") + public Result rabbitMqClientTest(HttpServletRequest req) { + //rabbitmq消息队列测试 + BaseMap map = new BaseMap(); + map.put("orderId", RandomUtil.randomNumbers(10)); + rabbitMqClient.sendMessage(CloudConstant.MQ_JEECG_PLACE_ORDER, map); + rabbitMqClient.sendMessage(CloudConstant.MQ_JEECG_PLACE_ORDER_TIME, map,2); + return Result.OK("MQ发送消息成功"); + } + + @GetMapping(value = "/rocketmq2") + @ApiOperation(value = "rocketmq消息总线测试", notes = "rocketmq消息总线测试") + public Result rabbitmq2(HttpServletRequest req) { + + //rabbitmq消息总线测试 + BaseMap params = new BaseMap(); + params.put("orderId", "123456"); + rabbitMqClient.publishEvent(CloudConstant.MQ_DEMO_BUS_EVENT, params); + return Result.OK("MQ发送消息成功"); + } +} diff --git a/jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/jeecg-cloud-test-rocketmq/src/main/java/org/jeecg/modules/test/rocketmq/event/DemoBusEvent.java b/jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/jeecg-cloud-test-rocketmq/src/main/java/org/jeecg/modules/test/rocketmq/event/DemoBusEvent.java new file mode 100644 index 0000000000..76d248d492 --- /dev/null +++ b/jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/jeecg-cloud-test-rocketmq/src/main/java/org/jeecg/modules/test/rocketmq/event/DemoBusEvent.java @@ -0,0 +1,29 @@ +package org.jeecg.modules.test.rocketmq.event; + +import cn.hutool.core.util.ObjectUtil; +import lombok.extern.slf4j.Slf4j; +import org.jeecg.boot.starter.rabbitmq.event.EventObj; +import org.jeecg.boot.starter.rabbitmq.event.JeecgBusEventHandler; +import org.jeecg.common.base.BaseMap; +import org.jeecg.modules.test.rocketmq.constant.CloudConstant; +import org.springframework.stereotype.Component; + +/** + * 消息处理器【发布订阅】 + * @author: zyf + * @date: 2022/04/21 + */ +@Slf4j +@Component(CloudConstant.MQ_DEMO_BUS_EVENT) +public class DemoBusEvent implements JeecgBusEventHandler { + + + @Override + public void onMessage(EventObj obj) { + if (ObjectUtil.isNotEmpty(obj)) { + BaseMap baseMap = obj.getBaseMap(); + String orderId = baseMap.get("orderId"); + log.info("业务处理----订单ID:" + orderId); + } + } +} diff --git a/jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/jeecg-cloud-test-rocketmq/src/main/java/org/jeecg/modules/test/rocketmq/listener/HelloReceiver1.java b/jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/jeecg-cloud-test-rocketmq/src/main/java/org/jeecg/modules/test/rocketmq/listener/HelloReceiver1.java new file mode 100644 index 0000000000..2a3b07c0ea --- /dev/null +++ b/jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/jeecg-cloud-test-rocketmq/src/main/java/org/jeecg/modules/test/rocketmq/listener/HelloReceiver1.java @@ -0,0 +1,27 @@ +package org.jeecg.modules.test.rocketmq.listener; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.jeecg.common.base.BaseMap; +import org.jeecg.modules.test.rocketmq.constant.CloudConstant; +import org.springframework.stereotype.Component; + +/** + * 定义接收者(可以定义N个接受者,消息会均匀的发送到N个接收者中) + * + * RabbitMq接受者1 + * (@RabbitListener声明类上,一个类只能监听一个队列) + * @author: zyf + * @date: 2022/04/21 + */ +@Slf4j +@Component +@RocketMQMessageListener(topic = CloudConstant.MQ_JEECG_PLACE_ORDER, consumerGroup = "helloReceiver1") +public class HelloReceiver1 implements RocketMQListener { + + public void onMessage(BaseMap baseMap) { + log.info("helloReceiver1接收消息:" + baseMap); + } + +} \ No newline at end of file diff --git a/jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/jeecg-cloud-test-rocketmq/src/main/java/org/jeecg/modules/test/rocketmq/listener/HelloReceiver2.java b/jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/jeecg-cloud-test-rocketmq/src/main/java/org/jeecg/modules/test/rocketmq/listener/HelloReceiver2.java new file mode 100644 index 0000000000..c9fb1b1343 --- /dev/null +++ b/jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/jeecg-cloud-test-rocketmq/src/main/java/org/jeecg/modules/test/rocketmq/listener/HelloReceiver2.java @@ -0,0 +1,27 @@ +package org.jeecg.modules.test.rocketmq.listener;//package org.jeecg.modules.cloud.rabbitmq; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.jeecg.common.base.BaseMap; +import org.jeecg.modules.test.rocketmq.constant.CloudConstant; +import org.springframework.stereotype.Component; + +/** + * 定义接收者(可以定义N个接受者,消息会均匀的发送到N个接收者中) + * + * RabbitMq接受者2 + * (@RabbitListener声明类上,一个类只能监听一个队列) + * @author: zyf + * @date: 2022/04/21 + */ +@Slf4j +@Component +@RocketMQMessageListener(topic = CloudConstant.MQ_JEECG_PLACE_ORDER, consumerGroup = "helloReceiver2") +public class HelloReceiver2 implements RocketMQListener { + + public void onMessage(BaseMap baseMap) { + log.info("helloReceiver2接收消息:" + baseMap); + } + +} \ No newline at end of file diff --git a/jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/jeecg-cloud-test-rocketmq/src/main/java/org/jeecg/modules/test/rocketmq/listener/HelloReceiver3.java b/jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/jeecg-cloud-test-rocketmq/src/main/java/org/jeecg/modules/test/rocketmq/listener/HelloReceiver3.java new file mode 100644 index 0000000000..3362b796e3 --- /dev/null +++ b/jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/jeecg-cloud-test-rocketmq/src/main/java/org/jeecg/modules/test/rocketmq/listener/HelloReceiver3.java @@ -0,0 +1,27 @@ +package org.jeecg.modules.test.rocketmq.listener;//package org.jeecg.modules.cloud.rabbitmq; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.jeecg.common.base.BaseMap; +import org.jeecg.modules.test.rocketmq.constant.CloudConstant; +import org.springframework.stereotype.Component; + +/** + * 定义接收者(可以定义N个接受者,消息会均匀的发送到N个接收者中) + * + * RabbitMq接受者3【我是处理人3】 + * (@RabbitListener声明类方法上,一个类可以多监听多个队列) + * @author: zyf + * @date: 2022/04/21 + */ +@Slf4j +@Component +@RocketMQMessageListener(topic = CloudConstant.MQ_JEECG_PLACE_ORDER, consumerGroup = "helloReceiver3") +public class HelloReceiver3 implements RocketMQListener { + + public void onMessage(BaseMap baseMap) { + log.info("helloReceiver3接收消息:" + baseMap); + } + +} \ No newline at end of file diff --git a/jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/jeecg-cloud-test-rocketmq/src/main/java/org/jeecg/modules/test/rocketmq/listener/HelloTimeReceiver.java b/jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/jeecg-cloud-test-rocketmq/src/main/java/org/jeecg/modules/test/rocketmq/listener/HelloTimeReceiver.java new file mode 100644 index 0000000000..3e6dfe78a2 --- /dev/null +++ b/jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/jeecg-cloud-test-rocketmq/src/main/java/org/jeecg/modules/test/rocketmq/listener/HelloTimeReceiver.java @@ -0,0 +1,24 @@ +package org.jeecg.modules.test.rocketmq.listener; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.jeecg.common.base.BaseMap; +import org.jeecg.modules.test.rocketmq.constant.CloudConstant; +import org.springframework.stereotype.Component; + +/** + * 定义接收者(可以定义N个接受者,消息会均匀的发送到N个接收者中) + * @author: zyf + * @date: 2022/04/21 + */ +@Slf4j +@Component +@RocketMQMessageListener(topic = CloudConstant.MQ_JEECG_PLACE_ORDER_TIME, consumerGroup = "helloTimeReceiver") +public class HelloTimeReceiver implements RocketMQListener { + + public void onMessage(BaseMap baseMap) { + log.info("helloTimeReceiver接收消息:" + baseMap); + } + +} \ No newline at end of file diff --git a/jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/pom.xml b/jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/pom.xml index dd27b94495..097532a20a 100644 --- a/jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/pom.xml +++ b/jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/pom.xml @@ -24,5 +24,6 @@ jeecg-cloud-test-more jeecg-cloud-test-rabbitmq jeecg-cloud-test-seata + jeecg-cloud-test-rocketmq \ No newline at end of file diff --git a/pom.xml b/pom.xml index d8750a3666..2885d4b71b 100644 --- a/pom.xml +++ b/pom.xml @@ -225,6 +225,12 @@ jeecg-boot-starter-rabbitmq ${jeecgboot.version} + + + org.jeecgframework.boot + jeecg-boot-starter-rocketmq + ${jeecgboot.version} + org.jeecgframework.boot