最好一个topic只负责一类消息,topic的数量对MQ集群几乎无影响。
可以发送成功,但是不要这样做。
不可以,consumer group与topic的关系是,多对一。
可以使用json或者map。
- 如果是通知类型消息,即消息可以丢失,推荐采用oneway方式发送。
- 如果需要知道消息是否发送成功,但是不能阻塞主流程,推荐采用async方式发送。
- 如果消息必须发送成功,不在乎是否阻塞主流程,推荐采用普通方式发送。
- oneway和同步发送有对应的hystrix隔离版,可以在MQ集群故障时保障客户端主流程不阻塞。
检查发送消息后的返回值,针对失败的消息进行重试发送或降级处理。
针对需要重试的消息,消费失败需要抛出异常,这样会将失败的消息发回重试队列。
不建议使用tags,理由如下:
1 说起tags不得不说consumer group,其必须在整个集群中全局唯一,否则会在消费时导致部分消息丢失的问题:参见测试,而MQCloud在业务层面保证了这个唯一性。
2 那么跟tags有什么关系?关系就是同一个topic,同样的consumer group,使用不同的tags,会导致和consumer group一样的问题。
也就是说topic<->consumer group<->tags需要一一对应!
3 引起这两个问题的原因都跟rocketmq心跳机制有关,具体类可以参见ConsumerManager,中的结构:
private final ConcurrentMap<String/* Consumer Group */, ConsumerGroupInfo> consumerTable = new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);
4 如果自己能确保上述的一一对应关系,可以参考如下相关代码:
// 生产者:注意一条消息只支持设置一个tag
producer.publish(msg, tags, null, null);
// 消费者:在启动之前设置
consumer.setSubExpression("tagA || tagB");
5 tags替代方案:消息体增加type字段,各个消费者自己过滤。
1 org.apache.rocketmq.client.exception.MQBrokerException: CODE: 25 DESC: the consumer's subscription not latest。
该问题是拉取消息流程控制不严格导致,但是并不影响消息消费,参见。
2 org.apache.rocketmq.client.exception.MQBrokerException: CODE: 2 DESC: [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while
该问题是由于rocketmq4.1之后broker针对处理发送过来的请求增加了快速失败机制,对于响应超过200ms的请求移除队列。默认broker端采用单线程和spin lock来处理。引起的原因可能是SYN_FLUSH,SYN_MASTER,gc,iops过高等,参考1,参考2。
3 2019-10-16 14:26:52.610 WARN 8 --- [NettyClientWorkerThread_2] RocketmqRemoting: NETTY CLIENT PIPELINE: IDLE exception [10.10.10.10:10913]
该问题是由于netty通道在两分钟内,没有产生读写,会触发idle事件,为了节约资源,会关闭此链接。
一般来说,RocketMQ有心跳检测机制,链接一般不会idle。但是rocketmq客户端默认是开启vip通道的,而RocketMQ的心跳机制检测的是10915端口,而并非vip通道的端口,所以心跳机制对于此链接无效。