# 消息中间件学习分享


- 为什么要使用消息中间件
- Redis的发布订阅
- RabbitMQ
- Kafka

### 为什么要使用消息中间件


- 系统解耦

- 异步调用

- 流量削峰

- ...

## Redis的发布订阅


在平时生活中发布/订阅模式是非常常见的场景。

比如微信公众号，公众号作者发布文章，会广播给每个订阅者。在这个场景里，微信公众号就是一个Pulisher，微信用户就是一个Subscriber，发布的文章就是一个消息。

![](img/pubsub.png)

发送者(pub)发送消息，订阅者(sub)接收消息。

客户端可以订阅任意数量的频道， 当有新消息发送到某个频道时，信息就会被发送给所有订阅了该频道的客户端。

- PUBLISH
- SUBSCRIBE
- UNSUBSCRIBE
- PSUBSCRIBE
- PUNSUBSCRIBE


### 订阅频道（SUBSCRIBE）
![](img/sub.svg)


![](img/pub.svg)

每个 Redis 服务器进程都维持着一个表示服务器状态的 redis.h/redisServer 结构， 结构的 pubsub_channels 属性是一个字典， 这个字典就用于保存订阅频道的信息：
```c
struct redisServer {
    // ...
    dict *pubsub_channels;
    // ...
}
```
其中，字典的键为正在被订阅的频道， 而字典的值则是一个链表， 链表中保存了所有订阅这个频道的客户端。

![](img/pubsub-channels.svg)

![](img/pubsub-channel-sub.svg)

### 发布消息（PUBLISH）
```python
def PUBLISH(channel, message):

    # 遍历所有订阅频道 channel 的客户端
    for client in server.pubsub_channels[channel]:

        # 将信息发送给它们
        send_message(client, message)
```

### 退订频道（UNSUBSCRIBE）

### 订阅模式（PSUBSCRIBE）

Redis 的发布与订阅实现支持模式匹配（pattern matching）

客户端可以订阅一个带 * 号的模式， 如果某些频道的名字和这个模式匹配， 那么当有信息发送给这些频道的时候， 客户端也会收到这些频道的信息。


![](img/pubsub-patterns.svg)

redisServer.pubsub_patterns 属性是一个链表，链表中保存着所有和模式相关的信息：

```c
struct redisServer {
    // ...
    list *pubsub_patterns;
    // ...
};
```

链表中的每个节点都包含一个 redis.h/pubsubPattern 结构：

```c
typedef struct pubsubPattern {
    redisClient *client;
    robj *pattern;
} pubsubPattern;
```


![](img/pubsub-patterns2.svg)

![](img/pubsub-patterns-sub.svg)

### 发布消息（PUBLISH）
```python
def PUBLISH(channel, message):

    # 遍历所有订阅频道 channel 的客户端
    for client in server.pubsub_channels[channel]:

        # 将信息发送给它们
        send_message(client, message)

    # 取出所有模式，以及订阅模式的客户端
    for pattern, client in server.pubsub_patterns:

        # 如果 channel 和模式匹配
        if match(channel, pattern):

            # 那么也将信息发给订阅这个模式的客户端
            send_message(client, message)
```

### 退订模式（PUNSUBSCRIBE）

## RabbitMQ

RabbitMQ是采用 Erlang 语言实现的 AMQP 协议的消息中间件

- AMQP基本组成
- Exchange类型：direct, fanout, topic, headers

#### AMQP基本组成

- producer/publisher: 消息的生产者、发布者
- consumer/subscriber: 消息的消费者、订阅者
- message: 消息实体
- virtual host: 虚拟主机
- exchange：交换机
 - queue
 - binding key
 - routing key

#### Direct Exchange

![](img/direct.png)

#### Fanout Exchange
![](img/fanout.png)

#### Topic Exchange

- `*` 匹配一个单词
- `#` 匹配一个或多个单词
![](img/topic.png)

#### Headers Exchange

RabbitMQ使用的是AMQP协议，这种协议提供了header attribute参数， Headers Exchange就是利用AMQP协议通过传送额外的路由参数来达到数据过滤的作用。


    使用任务队列的优点之一是能够轻松并行化工作。如果我们的队列正在积压，我们可以增加更多的worker，这样就可以轻松扩展。


![](img/python-one-overall.webp)

![](img/worker-queue.png)

#### 轮询分发（Round-robin）

默认RabbitMQ只管分发进入队列的消息，不会关心有多少消费者（consumer）没有作出响应。平均每个消费者将会得到相同数量的消息。


#### 公平分发（Fair dispatch）

    如果多个worker进程中，某个worker处理比较慢，另一个worker比较快，默认RabbitMQ只管分发进入队列的消息，不会关心有多少消费者没有作出响应，这样会使得比较慢的worker消息堆积过多，导致任务分配不均。
    
    Qos公平调度设置prefetch_count=1，即在同一时刻，不会发送超过1条消息给一个工作者（worker），直到它已经处理了上一条消息并且作出了响应。这样，RabbitMQ就会把消息分发给下一个空闲的工作者（worker）。
    



![](img/prefetch-count.webp)

## Kafka

消息的发送方被称为Producer，消息的接收方被称为Consumer，消息队列被称为Topic

    没有最好的消息中间件，只有最合适的消息中间件。


## 参考

- [《Redis 设计与实现》](https://redisbook.readthedocs.io/en/latest/feature/pubsub.html)
- [《Kafka: The Definitive Guide
》](https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/)
- 《RabbitMQ实战指南》
- [RabbitMQ消息队列学习分享.ppt](http://km.netease.com/team/cc_studio/article/231277)
- [消息中间件选型分析：从Kafka与RabbitMQ的对比看全局](https://juejin.im/post/5acf29316fb9a028cb2e04ce#heading-8)
- [https://kafka.apache.org/documentation/](https://kafka.apache.org/documentation/)
- [https://www.rabbitmq.com/getstarted.html](https://www.rabbitmq.com/getstarted.html)