Skip to content

消息队列RabbitMQ

raymond-zhao edited this page Sep 6, 2020 · 4 revisions

本系统消息队列工作流程

本系统消息队列工作图

为什么使用消息队列?

众所周知,消息队列的应用场景主要有三个,分别是解耦、异步、削峰。虽然这个系统没有并发量,但是也将这些考虑了进来。

  • 解耦:在系统里的应用就是订单服务与库存服务的解耦。
  • 异步:下订单时利用 MQ 实现事务的最终一致性。
  • **削峰:**这个在项目里其实完全没有体现出来,毕竟不是一个实际投入使用的项目。

如何保证消息队列的高可用

  • 单机模式:闹着玩儿的,没人生产用单机。
  • 普通集群(无高可用性):在多台机器上启动多个 MQ 实例,创建的 queue,只会放在一个 RabbitMQ 实例上,但是每个实例都同步 queue 的元数据。
    • 没有做到分布式,就是个普通集群。
    • 要么消费者每次随机连接一个实例然后拉取数据,有数据拉取的开销。
    • 要么固定连接那个 queue 所在实例消费数据,导致单实例性能瓶颈
    • 如果存放 queue 的实例宕机了,会导致其他实例无法拉取数据。如果开启了消息持久化,消息不一定会丢,但是要等实例重新恢复上线了才可以继续拉取数据。
    • 这种方案没有高可用性,但是可以增加吞吐量,就是让集群中多个节点来服务某个 queue 的读写操作。
  • 镜像集群(高可用):创建的 queue,无论是元数据还是 queue 里的消息都会存在于多个实例上。即,每个 MQ 节点都有这个 queue 的一个完整镜像。包含 queue 的全部数据。每次写消息到 queue 时,会自动把消息同步到多个实例的 queue 上。
    • 好处:即使有机器宕机了,其他节点仍然有这个 queue 的完整数据。
    • 坏处:第一,性能开销太大,因为消息需要同步到所有机器上,导致网络带宽压力和消耗严重;第二,不是分布式,没有扩展性。

Kafka 的高可用

如何保证消息不被重复消费?

或者说,如何保证消息消费的幂等性?

如何保证消息的可靠投递?

RabbitMQ - Reliability Guide

Acknowledge and Confirm

保证消息不丢失,可以使用事务消息,性能下降大约250倍,为了避免性能下降太多,引入确认机制。分为发送端确认消费端确认

  • publisher#confirmCallback 确认模式:publisher 到 Broker
    • 消息只要被 broker 接收到就会执行 confirmCallback,如果是 cluster 模式,需要所有 broker 接收到才会调用 confirmCallback。
    • 被 broker 接收到只能表示 message 已经到达服务器,并不能保证消息一定会被投递到 Queue 里,所以还需要 returnCallback。
    • spring.rabbit.publisher-confirms=true
    • @PostConstruct:对象创建完成之后执行的方法
  • publisher#returnCallback 未投递到 queue 退回模式:Exchange 到 Queue
    • spring.rabbitmq.publisher-returns=true
    • spring.rabbitmq.template.mandatory=true
    • confirmCallback 只能保证消息到达 broker,不能保证消息准确投递到目标 queue。如果需要保证消息一定要投递到目标 queue,需要用到 returnCallback 模式。
    • 如果未能投递到目标 queue 将调用 returnCallback,可以记录下详细的投递数据,定期巡检(扫描数据库消息的状态)或自动纠错都需要这些投递数据
// message: 投递失败的消息的详细信息
// replyCode: 回复的状态码
// replyText: 回复的文本内容
// exchange: 待发送消息的目的交换机
// routingKey: 当时发送消息时所用的路由键
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {});
  • consumer#ack机制:Queue 到 Consumer
    • 消费者获取到消息,成功处理,可以回复 Ack 给 Broker
      • basic.ack:用于肯定确认,broker 将移除此消息。
      • basic.nack:用于否定确认,可以指定 broker 是否丢弃此消息,可以批量回复。
      • basic.reject:用于否定确认,同上,但不能批量。
    • queue 无消费者,消息依然会被存储,直到消费者消费。
    • 消费者收到消息,默认会自动 ack,但是如果无法确定此消息是否被处理完成,或者成功处理,可以开启手动 ack,只要没有手动签收,就是一直 unack,宕机也不会丢失,会变为ready状态,有新的consumer连接进来就会继续消费。
      • spring.rabbitmq.listener.simple.acknowledge-mode=manual
      • 消息处理成功,ack,接受下一个消息,此消息 broker 就会移除。
      • 消息处理失败,nack/reject,重新发送给其他人进行处理,或者容错处理后 ack。
      • 消息一直没有调用 ack/nack 方法,broker 认为此消息正在被处理,不会投递给别人,此时客户端断开,消息不会被 broker 移除,会投递给别人。

image-20200906141249893