ip地址 | 主机名 | 操作系统版本 | RocketMQ版本 | JDK版本 | maven版本 | 备注 |
---|---|---|---|---|---|---|
172.16.7.91 | nameserver01 | centos 7.6 | 4.8.0 | 1.8.0_291 | 3.6 | Name Server集群 |
172.16.7.92 | nameserver03 | centos 7.6 | 4.8.0 | 1.8.0_291 | 3.6 | Name Server集群 |
172.16.7.93 | master01 | centos 7.6 | 4.8.0 | 1.8.0_291 | 3.6 | Broker集群1 |
172.16.7.94 | slave01 | centos 7.6 | 4.8.0 | 1.8.0_291 | 3.6 | Broker集群1 |
172.16.7.95 | master02 | centos 7.6 | 4.8.0 | 1.8.0_291 | 3.6 | Broker集群2 |
172.16.7.96 | slave02 | centos 7.6 | 4.8.0 | 1.8.0_291 | 3.6 | Broker集群2 |
[root@master01 ~]# init 6
[root@master01 ~]# cd /root/logs/rocketmqlogs/
[root@master01 rocketmqlogs]# rm -rf *
[root@master01 rocketmqlogs]# cd /root/store/
[root@master01 store]# rm -rf *
以master01为例,首先停止所有rocketmq进程,然后删除日志和存储信息。所有服务器都执行该操作。
启动各节点服务,查看集群状态
新增主题topic_broker_test
主题配置如下:
查看新增的主题
新建订阅组group_broker_test
配置如下:
查看新建的订阅组
package com.my.maven.rocketmq;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class Producer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new
DefaultMQProducer("group_broker_test");
// Specify name server addresses.
producer.setNamesrvAddr("172.16.7.91:9876;172.16.7.92:9876");
producer.setRetryTimesWhenSendAsyncFailed(2);
//Launch the instance.
producer.start();
for (int i = 0; i < 10000; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("topic_broker_test" /* Topic */,
"TagA" /* Tag */,
("Broker HA Test" +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
使用循环方式产生多条消息
package com.my.maven.rocketmq;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
public class Consumer {
public static void main(String[] args) throws InterruptedException,
MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
"group_broker_test");
consumer.setNamesrvAddr("172.16.7.91:9876;172.16.7.92:9876");
consumer.subscribe("topic_broker_test", "TagA || tagB");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName()
+ " Receive New Messages: " + msgs);
MessageExt msg = msgs.get(0);
if (msg.getTopic().equals("topic_broker_test")) {
if (msg.getTags() != null && msg.getTags().equals("tagA")) {
// 获取消息体
String message = new String(msg.getBody());
System.out.println("receive tagA message:" + message);
} else if (msg.getTags() != null
&& msg.getTags().equals("tagB")) {
// 获取消息体
String message = new String(msg.getBody());
System.out.println("receive tagB message:" + message);
}
}
// 成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
[root@master01 rocketmq]# init 6
主机名 | 状态 |
---|---|
broker-a master | 发送时重启 |
broker-a slave | 正常运行 |
broker-b master | 正常运行 |
broker-b slave | 正常运行 |
发送5000条消息,在消息发送的同时关闭broker-a master
消息发送会暂停,一共发送了153条
结论:消息发送时如果有master宕机,则消息发送会终止,主机起来后消息也不会继续发送。
[root@slave01 rocketmq]# init 6
主机名 | 状态 |
---|---|
broker-a master | 宕机 |
broker-a slave | 发送时重启 |
broker-b master | 正常运行 |
broker-b slave | 正常运行 |
发送5000条消息,在发送过程中同时重启broker-a slave
消息发送会暂停,一共发送了339条
结论:消息发送时如果有slave宕机,则消息发送会终止,主机起来后消息也不会继续发送。
主机名 | 状态 |
---|---|
broker-a master | 正常运行 |
broker-a slave | 宕机 |
broker-b master | 正常运行 |
broker-b slave | 宕机 |
[root@slave01 rocketmq]# init 0
[root@slave02 rocketmq]# init 0
发送5000条消息,在发送过程中同时关闭所有的slave
消息发送会暂停,一共发送了401条,这也验证了上面的结论:消息发送时如果有slave宕机,则消息发送会终止
保持两个slave宕机状态,继续发送5000条消息
console显示消息记录数为5000条
结论:slave都宕机不影响消息发送。
主机名 | 状态 |
---|---|
broker-a master | 正常运行 |
broker-a slave | 宕机 |
broker-b master | 宕机 |
broker-b slave | 宕机 |
[root@master02 rocketmq]# init 0
发送5000条测试消息,发送前broker-b master关机,只保留broker-a master运行
console显示发送了5000条消息
**结论:**集群只有一台master消息发送正常。
主机名 | 状态 |
---|---|
broker-a master | 宕机 |
broker-a slave | 正常运行 |
broker-b master | 宕机 |
broker-b slave | 正常运行 |
关闭所有的master,启动所有的slave,发送5000条消息
消息发送前:
消息发送:
console报错,消息无法发送
结论:master都宕机消息无法正常发送。
在消息消费高可用测试前先清空消息,然后发送1万条消息
[root@master01 rocketmq]# init 0
在消息消费时将broker-a master关机
主机名 | 状态 |
---|---|
broker-a master | 消费时关机 |
broker-a slave | 正常运行 |
broker-b master | 正常运行 |
broker-b slave | 正常运行 |
消费刚发送的1万条消息,消费过程中将broker-a master关机
console日志显示消息消费了1万条
结论:某台master宕机不影响消息消费。
先发送1万条消息,然后消费,消费过程中broker-a slave关机
[root@slave01 rocketmq]# init 0
dashboard的消费统计不是很准确,以eclipse的console日志为准。
主机名 | 状态 |
---|---|
broker-a master | 消费时关机 |
broker-a slave | 消费时关机 |
broker-b master | 正常运行 |
broker-b slave | 正常运行 |
消费刚发送的1万条消息,消费过程中将broker-a slave关机
console显示消费了1万条
结论:某台slave宕机不影响消息消费
先发送1万条消息,然后消费,消费过程中broker-b slave关机
[root@slave02 rocketmq]# init 0
主机名 | 状态 |
---|---|
broker-a master | 消费时关机 |
broker-a slave | 消费时关机 |
broker-b master | 正常运行 |
broker-b slave | 消费时关机 |
消费刚发送的1万条消息,消费过程中将broker-b slave关机
console消费1万条消息
结论:slave都宕机不影响消息消费
拉起broker-a slave或者broker-b slave,保持broker-b master开机状态,发送1万条消息,再将所有master关机最后消费
主机名 | 状态 |
---|---|
broker-a master | 关机 |
broker-a slave | 关机 |
broker-b master | 关机 |
broker-b slave | 正常运行 |
消费刚发送的1万条消息
console显示消费了1w条记录
结论:master都宕机不影响消息发送
拉起broker-a master或者broker-b master,关闭所有的slave,发送1万条消息,然后消费
主机名 | 状态 |
---|---|
broker-a master | 正常运行 |
broker-a slave | 关机 |
broker-b master | 关机 |
broker-b slave | 关机 |
消费刚发送的1万条消息
console显示消息全部被消费
结论:slave都宕机不影响消息消费
- 1.消息发送过程中只要有任意一台master或者slave宕机则发送程序暂停;
- 2.消息发送前slave都宕机不影响消息发送;
- 3.master或者slave都宕机不影响消息消费;
- 4.为保证消息正常的收发,集群最小配置为必需要有一台master主机;