# Rabbitmq

rabbitmq是轻量级的，易于部署在本地和云上的消息队列服务。它支持多个消息传递协议。 
使用消息队列的原因：
1. 解耦: 使用中间件的模式，改善系统间耦合性太强的问题。
2. 异步: 将消息写入消息队列，非必要的业务逻辑以异步的方式运行，加快相应速度。
3. 削峰: 如果有大规模的请求需要服务端存储到数据库， 使用消息队列能够慢慢的按照数据库能处理的并发量，从消息队列中慢慢拉取消息

## Rabbitmq的传输可靠性保证

1.生产者丢数据： 

    从生产者弄丢数据这个角度来看，RabbitMQ提供transaction和confirm模式来确保生产者不丢消息。transaction机制就是说，发送消息前，开启事务（channel.txSelect()）,然后发送消息，如果发送过程中出现什么异常，事务就会回滚（channel.txRollback()）,如果发送成功则提交事务（channel.txCommit()）。 生产上用confirm模式的居多。一旦channel进入confirm模式，所有在该信道上发布的消息都将会被指派一个唯一的ID（从1开始），一旦消息被投递到所有匹配的队列之后，rabbitMQ就会发送一个ACK给生产者（包含消息的唯一ID），这就使得生产者知道消息已经正确到达目的队列了

2.消息队列丢数据：

    设置消息队列持久化就行
    1.将queue的持久化标识durable设置为true,则代表是一个持久的队列
    2.发送消息的时候将deliveryMode=2
    
3.消费者丢数据:

    消费者丢数据一般是因为采用了自动确认消息模式。这种模式下，消费者会自动确认收到信息。这时rabbitMQ会立即将消息删除，这种情况下，如果消费者出现异常而未能处理消息，就会丢失该消息。采用手动确认消息即可。



## Rabbitmq的工作模式
1. work queues
2. publish / subscribe
3. routing
4. topics
5. rpc

### Work queues
![work queue](images/python-two.png)

工作队列背后的主要思想是为了避免立即执行资源密集型任务，而必须等待它完成。相反，我们计划稍后要完成的任务。我们将任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。（所以rabbitmq是celery指定broker^^） 这种工作模式一个任务对应一个worker

#### Producer 

In [2]:
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 消息持久化
channel.queue_declare(queue='task_queue', durable=True)

message = 'Hello world .'
channel.basic_publish(exchange='',
                     routing_key='task_queue',
                     body=message,
                     properties=pika.BasicProperties(
                         delivery_mode = 2  # make message persistent
                     ))

print("[x] sent %r " % message)
connection.close()

[x] sent 'Hello world .' 


#### consumer

In [None]:
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print('[*] Waiting for message.')


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

    # 模拟任务
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    
    # 避免重复消费， 发送ACK确认
    ch.basic_ack(delivery_tag = method.delivery_tag)

# 在一个时间单位下不要给worker超过一个任务
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

### Publish / Subsribe

![exchange](images/exchanges.png)

这种工作模式允许我们投递一条消息，给多个消费者进行消费（广播给其他的消费者）. 生产者只会将消息投递给交换机(exchange), 由交换机将消息投递给其他的队列，由exchange的type决定接收的队列     
***
** Exchange Type: **
    1. direct  直接模式，根据绑定队列指定的routing_key来为消息指定分发的队列
    2. topic   话题， 用户自定义相关的routing_key给绑定的queue
    3. headers
    4. fanout  广播模式，投递给多个queue， 需要为exchange绑定queue, 不需要指定routing_key

#### publish

In [None]:
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 声明exchange的名字和类型
channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

message = "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

#### Subsribe

In [None]:
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

# 当链接关闭的时候， 随机生成的队列应该被销毁
result = channel.queue_declare(exclusive=True)

queue_name = result.method.queue

# 将队列绑定到exchange
channel.queue_bind(exchange='logs',
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

### routing
![routing](images/python-four.png)
这种工作模式是可以对消息进行挑选的一种工作模式， 比如将错误日志保存到本地， 将普通日志输出到终端

#### producer

In [None]:
import pika


connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 声明exchange 和类型
channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

messages = {
    'error': 'this is error',
    'info': 'hello'
}


for key, value in messages.items():
    channel.basic_publish(exchange='direct_logs',
                          routing_key=severity,
                          body=message)
    print(" [x] Sent %r:%r" % (key, value))
connection.close()

#### Consumer

In [None]:
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue


severity = 'info' # or 'error'


channel.queue_bind(exchange='direct_logs',
                   queue=queue_name,
                   routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

### topics
![topics](images/python-five.png)
topics是一种支持自定以消息获取的工作方式, 它要求绑定的key为一系列的words，通过‘.’分割。          
*（星）可以代替一个词。  
\#（hash）可以代替零个或多个单词。 

比如如上图所示， Q1只接收黄色相关的， Q2只接收兔子和懒的。

如果绑定的key为'#‘， 则相当于fanout模式， 如果没有指定'#'或者'*'则相当于direct模式
                

### RPC
以后用的上再来补充
