-
pom依赖
<dependency> <groupId>com.sohu.tv</groupId> <artifactId>${clientArtifactId}</artifactId> <version>${version}</version> </dependency> <repository> <id>sohu.nexus</id> <url>${repositoryUrl}</url> </repository>
-
日志配置
在类路径添加日志配置文件rmq.logback.xml,名称不可更改,文件内容参考如下:
<?xml version="1.0" encoding="UTF-8"?> <configuration> <appender name="rmqAppender" class="ch.qos.logback.core.rolling.RollingFileAppender"> <file>${LOGS_DIR}/rocketmq.log</file> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <fileNamePattern>${LOGS_DIR}/otherdays/rocketmq.log.%d{yyyy-MM-dd}</fileNamePattern> <maxHistory>40</maxHistory> </rollingPolicy> <encoder> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} {%thread} %-5level %logger{50}-%L - %msg%n</pattern> <charset class="java.nio.charset.Charset">UTF-8</charset> </encoder> </appender> <root level="INFO"> <appender-ref ref="rmqAppender" /> </root> </configuration>
无论项目中使用的是log4j还是lo4j2,都可用此方式配置RocketMQ的日志,因为RocketMQ内部已经集成了logback。
@Configuration
public class MQConfiguration {
@Value("${flushCache.producerGroup}")
private String flushCacheProducer;
@Value("${flushCache.topic}")
private String flushCacheTopic;
@Bean(initMethod = "start", destroyMethod = "shutdown")
public ${producer} flushCacheProducer() {
return new ${producer}(flushCacheProducer, flushCacheTopic);
}
}
producerGroup和topic具体的值,请参考topic详情页,然后配置到yml或properties里。
<!-- 采用spring xml方式 -->
<bean id="xxxProducer" class="com.sohu.tv.mq.rocketmq.${producer}" init-method="start" destroy-method="shutdown">
<constructor-arg index="0" value="${请从topic详情查询生产者的producer group}"></constructor-arg>
<constructor-arg index="1" value="${topic名字}"></constructor-arg>
</bean>
// 生产者初始化 注意:只用初始化一次
${producer} producer = new ${producer}("xxx-producer", "xxx-topic");
// 注意,只用启动一次
producer.start();
// 应用退出时
producer.shutdown();
-
发送json消息(建议申请topic时序列化方式选择为String)
// 构建业务对象 int id = 123; Video video = new Video(); video.setId(id); // 转换为json String str = JSON.toJSONString(video); //建议设置keys(多个key用空格分隔)参数(也可以忽略该参数),比如keys指定为id,那么就可以根据id查询消息 Result<SendResult> sendResult = producer.publish(str, String.valueOf(id)); if(!sendResult.isSuccess){ //失败消息处理 }
-
发送对象(要保证此topic仅仅自己使用,申请topic时序列化方式选择为Protobuf)
// 构建业务对象 int id = 123; Video video = new Video(); video.setId(id); //建议设置keys(多个key用空格分隔)参数(也可以忽略该参数),比如keys指定为id,那么就可以根据id查询消息 Result<SendResult> sendResult = producer.publish(video, String.valueOf(id)); if(!sendResult.isSuccess){ //失败消息处理 }
如果采用Protobuf方式序列化,修改消息对象时需要注意如下事项:
- 已经存在的属性请勿删除。
- 新增属性务必加到所有属性后边。
否则,可能导致消费者反序列化失败,无法消费消息。
另外,如果发送的对象包含jdk以外的类,请联系管理员做处理,否则
消息查询
模块会展示乱码(本质原因是MQCloud做反序列化时找不到相应的类导致的)。 -
发送map(申请topic时序列化方式选择为Protobuf)
Map<String, Object> message = new HashMap<String, Object>(); message.put("vid", "123456"); message.put("aid", "789172"); //建议设置keys(多个key用空格分隔)参数(也可以忽略该参数),比如keys指定为vid,那么就可以根据vid查询消息 Result<SendResult> sendResult = producer.publish(message); if(!sendResult.isSuccess){ //失败消息处理 }
注意: Map中只能存放基本类型,请勿存放对象,否则MQCloud
消息查询
模块可能会展示成乱码。如果Map中包含了jdk以外的类,请联系管理员做处理。
-
如何使用rocketmq官方的方式发送消息?
producer.publish(Message message)
具体可以参考rocketmq官方demo。
此种方式发送消息跟使用原生rocketmq客户端一致,不会经过任何序列化。
但是,消费者需要注意,需要单独设置setMessageSerializer(null),否则消费消息会反序列化失败。
-
发送消息如何进行异步重试?
注:与RocketMQ自身的重试是不一样的,因为RocketMQ默认的重试机制是同步的,并存在超时而无法完成重试的可能。
MQCloud在消息发送失败时,提供了异步重试api:
Result<SendResult> sendResult = producer.send(MQMessage.build(msg).setKeys(key)); if (!result.isSuccess && !result.isRetrying()) { // 发送失败并且没有正在重试认为失败 System.out.println("发送失败"); }
另外,如果需要知道异步重试的结果,可以在producer初始化时进行如下设置:
producer.setResendResultConsumer(result -> { if (!result.isSuccess) { logger.info("重试次数:{},消息:{}", result.getRetriedTimes(), result.getMqMessage()); // 可以在这里增加重试失败的消息处理逻辑 } });
默认的重试次数为一次,可以通过如下api修改默认重试次数:
producer.setDefaultRetryTimes(2)
当然,如果想针对某条消息单独设置重试次数,可以参考如下,会覆盖默认重试次数:
MQMessage.build(msg).setRetryTimes(3)
异步重试使用的线程数默认为cpu核数,任务阻塞队列为100,如果想修改可以在producer.start之前,调用如下api修改:
producer.setRetrySenderExecutor(ExecutorService retrySenderExecutor)
/**
* 相同的id发送到同一个队列
* hash方法:id % 队列数
*/
class IDHashMessageQueueSelector implements MessageQueueSelector {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object idObject) {
long id = (Long) idObject;
int size = mqs.size();
int index = (int) (id % size);
return mqs.get(index);
}
}
// 设置到producer
producer.setMessageQueueSelector(new IDHashMessageQueueSelector());
// 消息发送
long id = 123L;
Map<String, Object> map = new HashMap<String, Object>();
map.put("id", id);
Result<SendResult> sendResult = producer.publishOrder(map, String.valueOf(id), id);
注意:此种发送方式不带重试机制。
// 1.定义实现事务回调接口
TransactionListener transactionListener = new TransactionListener() {
/**
* 在此方法执行本地事务
*/
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// arg可以传业务id
int id = (Integer) arg;
// 确定事务状态,未知返回:UNKNOW,回滚返回:ROLLBACK_MESSAGE,成功返回:COMMIT_MESSAGE,抛出异常默认为:UNKNOW
return LocalTransactionState.COMMIT_MESSAGE;
}
/**
* 如果executeLocalTransaction返回UNKNOW,rocketmq会回调此方法查询事务状态,默认每分钟查一次,最多查询15次,状态还是UNKNOW的话,丢弃消息
*/
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String key = msg.getKeys();
int id = Integer.valueOf(key);
return LocalTransactionState.COMMIT_MESSAGE;
}
};
// 2.发送事务消息
// 初始化
${producer} producer = new ${producer}(producerGroup, topic, transactionListener);
// 组装消息
int id = 123;
Map<String, Object> map = new HashMap<String, Object>();
map.put("id", id);
map.put("msg", "msg" + id);
// 发送
Result<SendResult> sendResult = producer.publishTransaction(JSON.toJSONString(map), String.valueOf(id), id);
if(!sendResult.isSuccess){
//失败消息处理
}
Map<String, String> map = new HashMap<String, String>();
map.put("aid", "123456");
map.put("vid", "765432");
// 1.oneway方式 - 此种方式发送效率最高,但是无法获取返回的结果
new PublishOnewayCommand(producer, map).execute();
// 2.普通方式 - 此种方式即为普通方式的hystrix封装,与普通发送方式无异
Result<SendResult> result = new PublishCommand(producer, map).execute();
注意:hystrix配置默认采用线程池隔离,容量为30,超时时间为rocketmq客户端默认超时3s,如果使用hystrix版,还需要显示依赖hystrix,如下:
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-core</artifactId>
<version>1.3.20</version>
</dependency>
-
检查处理
比如客户端是否启动,消息序列化,消息大小是否超过4M等。
-
topic路由查询
消息归属于topic,topic是一类消息的合集,那么首先需要知道topic在哪些broker上。
-
broker选择
因为集群中有多个broker,需要挑选一个健康的broker进行消息发送。
-
获取broker主节点
broker一般都是主备两个节点,消息只能发送到主节点。
-
一些发送前钩子调用
-
获取netty通道
-
调用通道发送消息
-
等待响应(默认为sendMsgTimeout=3秒)
-
处理响应及异常
-
topic路由的查询
topic的路由需要从name server上查询,此过程是远程调用,超时默认设置的是3秒。
-
获取netty通道
相当于建立链接,默认超时时间3秒。
-
发送消息并等待响应(默认为sendMsgTimeout=3秒)
对于以上三种耗时,
第一,其中topic路由的查询
客户端启动后10毫秒会自动缓存topic路由,之后每隔30秒更新一次,所以topic路由的查询
一般来说不会影响消息发送。
第二,对于获取netty通道
也仅限于第一次消息发送,因为netty是长链,一旦建立会自动缓存,后续通过心跳机制来保障链接的连通性。
第三,其实耗时基本是在发送消息并等待响应
。
-
针对发送失败的消息,后续会最多进行2次重试(可以通过设置retryTimesWhenSendFailed修改)
-
为什么说最多2次重试呢,因为如果发送耗时达到sendMsgTimeout也会中断重试机制。
那如果把sendMsgTimeout设长是否会一定重试2次呢?这个不一定,因为第一次调用有可能一直等sendMsgTimeout的时间,就没有第二次重试的机会了。
MQCloud针对此种情况进行了修改,增加了单次请求最大耗时参数的设置,默认总耗时设置为4秒,单次请求最大为3秒,这样至少保证broker无响应时重试一次。
可以的,默认mq-client-open开启了rocketmq的容错机制,即它通过统计每次发送消息到broker的耗时和异常情况,检测出哪些broker响应情况不好,从而避免向这些broker发送消息。
-
MQClientException
客户端的问题,比如客户端未启动,消息过大,空消息,配置错误等等。
-
RemotingTooMuchRequestException
真实发送前超时检测,如果已超时,直接抛出异常。
-
RemotingSendRequestException
请求发送失败。
-
RemotingTimeoutException
- 请求未发送,
topic路由查询
或通道连接
阶段已经超时。 - 请求发送正常,超时时间内broker没有返回值。
- 请求未发送,
-
RemotingConnectException
通道无法连接,请求未发送。
-
MQBrokerException
响应的code中除了成功,刷盘超时,同步slave超时,slave不可用以外的值都认为是MQBrokerException。比如
org.apache.rocketmq.client.exception.MQBrokerException: CODE: 2 DESC: [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while
就是broker流控导致的。 -
InterruptedException
发送线程被中断。
发送消息后打印消息及Result对象,及检查返回值,是否成功,如果发送失败,进行重试或者降级操作,比如把失败的消息存储到数据库,定时补发。
另外,Result.isSuccess()
只是表明此次发送成功了,具体消息存储状态还取决于Result.getResult().getSendStatus()
里的值(以下参考来自rocketmq官方文档):
1. SEND_OK
消息发送成功
2. FLUSH_DISK_TIMEOUT
消息接收成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失。
3. FLUSH_SLAVE_TIMEOUT
消息接收成功,但是服务器同步到 Slave 时超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失。
4. SLAVE_NOT_AVAILABLE
消息接送成功,但是此时 slave 不可用,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢
失。
以上状态值跟broker设置相关,如果确定需要最高级别保障消息不可丢失,请申请topic时勾选上支持事务
选项,将会在事务集群上创建该topic。
-
异步发送会使用独立的线程池来发送,不会阻塞业务线程,默认线程池配置简化如下:
this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000); this.defaultAsyncSenderExecutor = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1000 * 60, TimeUnit.MILLISECONDS, this.asyncSenderThreadPoolQueue);
可以通过
DefaultMQProducerImpl.setAsyncSenderExecutor(ExecutorService asyncSenderExecutor)
来设置自己的线程池。由于默认线程池使用有界队列,所以可能存在任务等待或被拒绝的情况:
- 如果执行任务时等待时间超过了sendMsgTimeout(默认为3秒),那么任务将不会执行直接返回。
- 如果线程池满了,将直接抛出
MQClientException("executor rejected ", e)
。
-
异步发送通过信号量来做流控,默认最大控制的并发请求数为65535。
-
异步发送重试逻辑与同步发送类似,重试次数默认为retryTimesWhenSendAsyncFailed=2,可以修改配置。
-
由于异步发送是通过回调检测返回值和异常的,参见如下:
public interface SendCallback { void onSuccess(final SendResult sendResult); void onException(final Throwable e); }
在
onSuccess
中务必检查返回值,与同步发送类似。在
onException
中处理异常。 -
其余与同步发送基本相同。
使用业务线程发送,没有重试机制,不等待响应。
目前已支持30天内任意维度的定时消息,使用方式如下:
// 24小时后投递消息
long deliveryTimestamp = System.currentTimeMillis() + (24 * 3600 * 1000L);
MQMessage<?> mqMessage = MQMessage.build(msg).setKeys(key).setDeliveryTimestamp(deliveryTimestamp);
Result<SendResult> sendResult = producer.send(mqMessage);
if (!sendResult.isSuccess) { // 发送失败
System.out.println("发送失败");
}
- 投递消息的时间尽量分散,不建议在同一时间大量投递消息。
- 定时投递在少数特殊情况下会产生重复消息,业务端需自行实现幂等
- 如需取消定时消息,请自行保存msgId,获取方式如下:
使用方法参见取消定时消息
sendResult.getResult().getMsgId()
当前支持两种方式取消定时消息:
-
页面取消
消息查询-定时消息页面支持点击取消定时消息。
-
接口取消
-
接口地址:
POST /topic/message/cancelWheelMsg
-
接口参数:
- topic:消息主题
- uniqIds:消息唯一id(msgId),多个id用逗号分隔,单次最多支持20个id
- token:验证token,可咨询管理员获取
-
响应说明:
{ "status": 200, "message": }
- status::标识本次响应的状态码,包括但不限于如下值:
- status:300 参数错误,topic不存在
- status:303 权限不足,无法取消
- status:705 uniqId无效,无法定位消息
- status:706 uniqid对应的消息已超出取消时间范围,无法取消
- status:707 uniqid对应的消息为非时间轮定时消息,无法取消
- status:708 uniqid的取消申请已存在,不能重复申请
- status:200 取消成功
- message:当响应状态码非200时的提示信息。
- status::标识本次响应的状态码,包括但不限于如下值:
-
生产示例:
// 设置请求头 HttpHeaders headers = new HttpHeaders(); headers.add("Cookie", "TOKEN=" + token); headers.setContentType(MediaType.MULTIPART_FORM_DATA); // 设置请求参数 MultiValueMap<String, String> multiValueMap = new LinkedMultiValueMap<>(); multiValueMap.add("topic", "basic-delay-cancel-topic"); multiValueMap.add("uniqIds", uniqIds); // 发送POST请求 HttpEntity httpEntity = new HttpEntity<>(multiValueMap, headers); ResponseEntity<WebResult> response = restTemplate.postForEntity(CANCEL_DELAY_URL, httpEntity, WebResult.class); WebResult body = response.getBody(); if (body.ok()) { log.info("取消定时消息成功, uniqIds:{}", uniqIds); }
-
-
能被取消的定时消息的定时时间需大于当前时间5分钟以上,如需取消小于该范围的消息,请联系管理员。
-
该功能能保障绝大部分情况下的取消,但仍有极少数情况下无法取消,如:
- 集群机器不可用,取消消息写入失败,定时无法取消
- MQCloud服务不可用,取消消息发送失败,定时无法取消
- 网络故障,取消消息无法在定时消息触发前发送,定时无法取消
如需严格保证,请先咨询管理员。
-
该功能仅支持rocketmq 5.x版本的时间轮定时消息。