Skip to content

mq rabbit

sika-code edited this page Jul 20, 2019 · 1 revision

mq-rabbit【rabbitMq消息队列组件】

What

  • RabbitMq消息队列组件

Why

  • 降低集成RabbitMq的难度
  • 简化RabbitMq的交互步骤
  • 统一消息队列的交互方式

How

  • 定义通用枚举
  • 定义消息队列生成者
  • 封装消息生成者发送消息的细节

MqRabbit组件使用说明

common [公共组件]

名称 备注
MqBindingConfig 绑定的配置接口、定义绑定配置规范
MqExchangeConfig 交换机的配置接口、定义交换机配置规范
MqExchangeType 交换机类型枚举、定义交换机类型
MqQueueConfig 队列配置接口、定义队列配置规范
MqGenerator 消息对象相关对象的生成者

consumer [消费者组件]

名称 备注
RabbitConsumerConfig Rabbit消费者配置类

producer [生产者组件]

名称 备注
RabbitProducerAckConfig 生产者确认配置类
RabbitProducerConfig 生产者配置类
RabbitSenderDTO 发送者数据传输对象
RabbitSenderImpl 发送者实现类

示例

1 添加配置

lxzl:
  skull:
    rabbit:
      sender:
        log-fire: true     # 打开发送者的日志
        ack: true          # 启用消息确认机制
        
spring:
  rabbitmq:
    host: localhost            # 配置主机
    port: 5672                 # 端口
    username: username         # 用户名
    password: password         # 密码
    publisher-confirms: true   # 发送者确认
    publisher-returns: true    # 发送返回确认
    
    listener:                  # 消费者配置
      simple:
        acknowledge-mode: manual  # 配置为手动处理ack

2 生产者配置及使用

  • 添加相关配置枚举
@AllArgsConstructor
@Getter
@ToString
public enum DemoMqBindConfig implements MqBindingConfig {
    /** 绑定配置枚举 */
    API_CORE(1, "api.core", "测试"),
    API_CORE_ONLY(1, "api.*", "测试"),
    API_CORE_ANY(1, "api.core.#", "测试"),
    ;
    private Integer type;
    private String routingKey;
    private String desc;
}
@AllArgsConstructor
@Getter
@ToString
public enum DemoMqExchangeConfig implements MqExchangeConfig {
    /**
     * 交换机配置枚举类
     */
    EXCHANGE(1, EXCHANGE_NAME, MqExchangeType.TOPIC, true, "TOPIC"),
    EXCHANGE1(1, EXCHANGE_NAME1, MqExchangeType.TOPIC, true, "TOPIC"),
    EXCHANGE2(2, EXCHANGE_NAME2, MqExchangeType.DIRECT, true, "DIRECT"),
    EXCHANGE3(3, EXCHANGE_NAME3, MqExchangeType.FANOUT, true, "FANOUT"),
    EXCHANGE31(3, EXCHANGE_NAME31, MqExchangeType.FANOUT, true, "FANOUT"),
    EXCHANGE4(4, EXCHANGE_NAME4, MqExchangeType.HEADERS, true, "HEADERS"),
    EXCHANGE5(5, EXCHANGE_NAME5, true, "测试队列"),
    ;

    DemoMqExchangeConfig(Integer type, String name, MqExchangeType exchangeType, boolean durable, String desc) {
        this.type = type;
        this.name = name;
        this.exchangeType = exchangeType;
        this.durable = durable;
        this.desc = desc;
    }

    DemoMqExchangeConfig(Integer type, String name, MqExchangeType exchangeType, String desc) {
        this.type = type;
        this.name = name;
        this.exchangeType = exchangeType;
        this.desc = desc;
    }

    DemoMqExchangeConfig(Integer type, String name, String desc) {
        this.type = type;
        this.name = name;
        this.desc = desc;
    }

    DemoMqExchangeConfig(Integer type, String name, boolean durable, String desc) {
        this.type = type;
        this.name = name;
        this.durable = durable;
        this.desc = desc;
    }

    private Integer type;
    private String name;
    private MqExchangeType exchangeType;
    private boolean durable;
    private boolean exclusive;
    private boolean autoDelete;

    private String desc;

    @Override
    public String getName() {
        return getNameFull(this.name, this.exchangeType);
    }

    public static class DemoExchangeName {

        public static final String EXCHANGE_NAME = "test";
        public static final String EXCHANGE_NAME1 = "test1";
        public static final String EXCHANGE_NAME2 = "test2";
        public static final String EXCHANGE_NAME3 = "test3";
        public static final String EXCHANGE_NAME31 = "test31";
        public static final String EXCHANGE_NAME4 = "test4";
        public static final String EXCHANGE_NAME5 = "test5";
    }
/**
 * 队列枚举类
 *
 * @author daiqi
 * @create 2019-07-06 15:38
 */
@AllArgsConstructor
@Getter
@ToString
public enum DemoMqQueueConfig implements MqQueueConfig {
    /**
     * 队列枚举类
     */
    QUEUE(1, QUEUE_NAME, "测试队列"),
    QUEUE2(2, QUEUE_NAME2, true, "测试队列"),
    QUEUE3(3, QUEUE_NAME3, true, "测试队列"),
    QUEUE4(4, QUEUE_NAME4, true, "测试队列"),
    QUEUE5(5, QUEUE_NAME5, true, "测试队列"),
    ;

    DemoMqQueueConfig(Integer type, String name, String desc) {
        this.type = type;
        this.name = name;
        this.desc = desc;
    }

    DemoMqQueueConfig(Integer type, String name, boolean durable, String desc) {
        this.type = type;
        this.name = name;
        this.durable = durable;
        this.desc = desc;
    }

    private Integer type;
    private String name;
    private boolean durable;
    private boolean exclusive;
    private boolean autoDelete;

    private String desc;


    public static class DemoQueueName {

        public static final String QUEUE_NAME = "queue";
        public static final String QUEUE_NAME2 = "queue2";
        public static final String QUEUE_NAME3 = "api.core";
        public static final String QUEUE_NAME4 = "api.core.user";
        public static final String QUEUE_NAME5 = "api.core.user.query";
    }

  • 配置队列、交换机、绑定对象的Bean

/**
 * mq配置类
 *
 * @author daiqi
 * @create 2019-07-06 15:35
 */
@Configuration
public class DemoMqConfig {
    @Bean
    public Queue queue() {
        return MqGenerator.generateQueue(DemoMqQueueConfig.QUEUE);
    }

    @Bean
    public Exchange topicExchange() {
        return MqGenerator.generateExchange(DemoMqExchangeConfig.EXCHANGE);
    }

    @Bean
    public Binding bindingCoreExchange(Queue queue, TopicExchange topicExchange) {
        return MqGenerator.generateBinding(topicExchange, queue, DemoMqBindConfig.API_CORE);
    }

}
  • 使RabbitSender对象发送消息

/**
 * <p>
 * 准入规则类型表 逻辑实现类
 * </p>
 *
 * @author daiqi
 * @since 2019-06-22 00:32:04
 */
@Component(value = "demoTestSender")
public class DemoTestSender  {
    @Autowired
    private RabbitSender rabbitSender;
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void send(User user) {
       RabbitSenderDTO rabbitSenderDTO = new RabbitSenderDTO()
                    .setExchangeConfigEnum(DemoMqExchangeConfig.EXCHANGE)
                    .setQueueConfigEnum(DemoMqQueueConfig.QUEUE)
                    .setData(user);
            rabbitSender.convertAndSend(rabbitSenderDTO);
    }
    
    @Override
    public void send2(User user) {
       RabbitSenderDTO rabbitSenderDTO = new RabbitSenderDTO()
                    .setExchangeConfigEnum(DemoMqExchangeConfig.EXCHANGE)
                    .setQueueConfigEnum(DemoMqQueueConfig.QUEUE)
                    .setRabbitTemplate(rabbitTemplate)
                    .setData(user);
            rabbitSender.convertAndSend(rabbitSenderDTO);
    }
    
    @Data
    public static class User {
        private String userName;
    }
    
}

3 消费者

@Component
@Slf4j
public class TestReceive {
    /**
     * 获取消息 --- 自动ack或者不需要ack
     */
    @RabbitListener(queues = DemoMqQueueConfig.DemoQueueName.QUEUE_NAME)
    public void receive(MqMsgDTO mqMsgDTO) {
        log.info("{} receive message: {}", DemoMqQueueConfig.QUEUE5, JSONUtil.toJSONString(mqMsgDTO));
    }
    
    /**
     * 获取消息 --- 开启手动ack
     */
    @RabbitListener(queues = DemoMqQueueConfig.DemoQueueName.QUEUE_NAME)
    public void receive1(MqMsgDTO mqMsgDTO, Channel channel, Message message) throws IOException {
        try {
            // 模拟执行任务
            Thread.sleep(1000);
            log.info("{} receive message: {}", DemoMqQueueConfig.QUEUE3, JSONUtil.toJSONString(mqMsgDTO));
            // 确认收到消息,false只确认当前consumer一个消息收到,true确认所有consumer获得的消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            if (message.getMessageProperties().getRedelivered()) {
                log.info("消息已重复处理失败,拒绝再次接收!");
                // 拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列则进入死信队列
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            } else {
                log.info("消息即将再次返回队列处理!");
                // requeue为是否重新回到队列,true重新入队
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
            log.error(e.getMessage(), e);
        }

    }
}