Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rocketmq-starter应用举例 #6114

Merged
merged 1 commit into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions jeecg-server-cloud/jeecg-system-cloud-start/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@
<artifactId>jeecg-cloud-test-rabbitmq</artifactId>
<version>${jeecgboot.version}</version>
</dependency>-->
<!-- rocketmq例子-->
<!--<dependency>
<groupId>org.jeecgframework.boot</groupId>
<artifactId>jeecg-cloud-test-rocketmq</artifactId>
<version>${jeecgboot.version}</version>
</dependency>-->
<!-- 分布式事务例子
<dependency>
<groupId>org.jeecgframework.boot</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.jeecgframework.boot</groupId>
<artifactId>jeecg-cloud-test</artifactId>
<version>3.6.3</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<description>消息队列测试模块</description>
<artifactId>jeecg-cloud-test-rocketmq</artifactId>

<dependencies>
<!-- rocketmq消息队列-->
<dependency>
<groupId>org.jeecgframework.boot</groupId>
<artifactId>jeecg-boot-starter-rocketmq</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.jeecg.modules.test.rocketmq.constant;

/**
* 微服务单元测试常量定义
* @author: zyf
* @date: 2022/04/21
*/
public interface CloudConstant {


/**
* MQ测试队列名字
*/
public final static String MQ_JEECG_PLACE_ORDER = "jeecg_place_order";
public final static String MQ_JEECG_PLACE_ORDER_TIME = "jeecg_place_order_time";

/**
* MQ测试消息总线
*/
public final static String MQ_DEMO_BUS_EVENT = "demoBusEvent";

/**
* 分布式锁lock key
*/
public final static String REDISSON_DEMO_LOCK_KEY1 = "demoLockKey1";
public final static String REDISSON_DEMO_LOCK_KEY2 = "demoLockKey2";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package org.jeecg.modules.test.rocketmq.controller;


import cn.hutool.core.util.RandomUtil;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.jeecg.boot.starter.rabbitmq.client.RabbitMqClient;
import org.jeecg.common.api.vo.Result;
import org.jeecg.common.base.BaseMap;
import org.jeecg.modules.test.rocketmq.constant.CloudConstant;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.http.HttpServletRequest;


/**
* RocketMqClient发送消息
* @author: zyf
* @date: 2022/04/21
*/
@RestController
@RequestMapping("/sys/test")
@Api(tags = "【微服务】MQ单元测试")
public class JeecgMqTestController {

@Autowired
private RabbitMqClient rabbitMqClient;


/**
* 测试方法:快速点击发送MQ消息
* 观察三个接受者如何分配处理消息:HelloReceiver1、HelloReceiver2、HelloReceiver3,会均衡分配
*
* @param req
* @return
*/
@GetMapping(value = "/rocketmq")
@ApiOperation(value = "测试rocketmq", notes = "测试rocketmq")
public Result<?> rabbitMqClientTest(HttpServletRequest req) {
//rabbitmq消息队列测试
BaseMap map = new BaseMap();
map.put("orderId", RandomUtil.randomNumbers(10));
rabbitMqClient.sendMessage(CloudConstant.MQ_JEECG_PLACE_ORDER, map);
rabbitMqClient.sendMessage(CloudConstant.MQ_JEECG_PLACE_ORDER_TIME, map,2);
return Result.OK("MQ发送消息成功");
}

@GetMapping(value = "/rocketmq2")
@ApiOperation(value = "rocketmq消息总线测试", notes = "rocketmq消息总线测试")
public Result<?> rabbitmq2(HttpServletRequest req) {

//rabbitmq消息总线测试
BaseMap params = new BaseMap();
params.put("orderId", "123456");
rabbitMqClient.publishEvent(CloudConstant.MQ_DEMO_BUS_EVENT, params);
return Result.OK("MQ发送消息成功");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.jeecg.modules.test.rocketmq.event;

import cn.hutool.core.util.ObjectUtil;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.boot.starter.rabbitmq.event.EventObj;
import org.jeecg.boot.starter.rabbitmq.event.JeecgBusEventHandler;
import org.jeecg.common.base.BaseMap;
import org.jeecg.modules.test.rocketmq.constant.CloudConstant;
import org.springframework.stereotype.Component;

/**
* 消息处理器【发布订阅】
* @author: zyf
* @date: 2022/04/21
*/
@Slf4j
@Component(CloudConstant.MQ_DEMO_BUS_EVENT)
public class DemoBusEvent implements JeecgBusEventHandler {


@Override
public void onMessage(EventObj obj) {
if (ObjectUtil.isNotEmpty(obj)) {
BaseMap baseMap = obj.getBaseMap();
String orderId = baseMap.get("orderId");
log.info("业务处理----订单ID:" + orderId);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.jeecg.modules.test.rocketmq.listener;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.jeecg.common.base.BaseMap;
import org.jeecg.modules.test.rocketmq.constant.CloudConstant;
import org.springframework.stereotype.Component;

/**
* 定义接收者(可以定义N个接受者,消息会均匀的发送到N个接收者中)
*
* RabbitMq接受者1
* (@RabbitListener声明类上,一个类只能监听一个队列)
* @author: zyf
* @date: 2022/04/21
*/
@Slf4j
@Component
@RocketMQMessageListener(topic = CloudConstant.MQ_JEECG_PLACE_ORDER, consumerGroup = "helloReceiver1")
public class HelloReceiver1 implements RocketMQListener<BaseMap> {

public void onMessage(BaseMap baseMap) {
log.info("helloReceiver1接收消息:" + baseMap);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.jeecg.modules.test.rocketmq.listener;//package org.jeecg.modules.cloud.rabbitmq;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.jeecg.common.base.BaseMap;
import org.jeecg.modules.test.rocketmq.constant.CloudConstant;
import org.springframework.stereotype.Component;

/**
* 定义接收者(可以定义N个接受者,消息会均匀的发送到N个接收者中)
*
* RabbitMq接受者2
* (@RabbitListener声明类上,一个类只能监听一个队列)
* @author: zyf
* @date: 2022/04/21
*/
@Slf4j
@Component
@RocketMQMessageListener(topic = CloudConstant.MQ_JEECG_PLACE_ORDER, consumerGroup = "helloReceiver2")
public class HelloReceiver2 implements RocketMQListener<BaseMap> {

public void onMessage(BaseMap baseMap) {
log.info("helloReceiver2接收消息:" + baseMap);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.jeecg.modules.test.rocketmq.listener;//package org.jeecg.modules.cloud.rabbitmq;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.jeecg.common.base.BaseMap;
import org.jeecg.modules.test.rocketmq.constant.CloudConstant;
import org.springframework.stereotype.Component;

/**
* 定义接收者(可以定义N个接受者,消息会均匀的发送到N个接收者中)
*
* RabbitMq接受者3【我是处理人3】
* (@RabbitListener声明类方法上,一个类可以多监听多个队列)
* @author: zyf
* @date: 2022/04/21
*/
@Slf4j
@Component
@RocketMQMessageListener(topic = CloudConstant.MQ_JEECG_PLACE_ORDER, consumerGroup = "helloReceiver3")
public class HelloReceiver3 implements RocketMQListener<BaseMap> {

public void onMessage(BaseMap baseMap) {
log.info("helloReceiver3接收消息:" + baseMap);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.jeecg.modules.test.rocketmq.listener;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.jeecg.common.base.BaseMap;
import org.jeecg.modules.test.rocketmq.constant.CloudConstant;
import org.springframework.stereotype.Component;

/**
* 定义接收者(可以定义N个接受者,消息会均匀的发送到N个接收者中)
* @author: zyf
* @date: 2022/04/21
*/
@Slf4j
@Component
@RocketMQMessageListener(topic = CloudConstant.MQ_JEECG_PLACE_ORDER_TIME, consumerGroup = "helloTimeReceiver")
public class HelloTimeReceiver implements RocketMQListener<BaseMap> {

public void onMessage(BaseMap baseMap) {
log.info("helloTimeReceiver接收消息:" + baseMap);
}

}
1 change: 1 addition & 0 deletions jeecg-server-cloud/jeecg-visual/jeecg-cloud-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@
<module>jeecg-cloud-test-more</module>
<module>jeecg-cloud-test-rabbitmq</module>
<module>jeecg-cloud-test-seata</module>
<module>jeecg-cloud-test-rocketmq</module>
</modules>
</project>
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,12 @@
<artifactId>jeecg-boot-starter-rabbitmq</artifactId>
<version>${jeecgboot.version}</version>
</dependency>
<!--rocketmq-->
<dependency>
<groupId>org.jeecgframework.boot</groupId>
<artifactId>jeecg-boot-starter-rocketmq</artifactId>
<version>${jeecgboot.version}</version>
</dependency>
<!--分库分表shardingsphere-->
<dependency>
<groupId>org.jeecgframework.boot</groupId>
Expand Down