# Work queue

* Também conhecidos como Task queue.
* Ideal para tarefas demoradas.
* Encapsula a tarefa e envia para ser executada 'no seu tempo'.
* Permite paralelização de tarefas.
* **Cada tarefa é entregue uma única vez para cada consumidor!**

![work_queue](https://s3-sa-east-1.amazonaws.com/lcpi/f2018581-2a58-492b-98a8-cf151f971fc2.png)

## Envio de mensagens em série

In [None]:
import pika
import datetime as dt
import time
import random

In [None]:
# Establish a connection with RabbitMQ server
connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost')
)

# Create a channel
channel = connection.channel()

# Define queue
queue_name = 'work_queues_1'

# Create queue
channel.queue_declare(queue=queue_name)

# Create and publish messages
for i in range (30):

    # Assemble message
    time_stamp = dt.datetime.strftime(dt.datetime.now(), format='%Y-%m-%d %H:%M:%S.%f')
    message = f'Hello RabbitMQ {time_stamp} {i:6}'

    # Publish message
    channel.basic_publish(exchange='',
                        routing_key=queue_name,
                        body=message)
    # time.sleep(1)

    print(f" [x] Sent {message}")

# Close the connection
connection.close()

# Round-robin dispatching

* Habilidade de dividir tarefas entre consumidores.
* Em média, cada consumidor recebe o mesmo número de mensagens: distribuição chamada de round-robin.

`02_worker_consumer.py`
```python
import pika
import sys
import os
import time


def main():
    # Create a connection to the RabbitMQ server running on the local machine
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    queue_name = 'work_queues_1'

    # Declare a queue to consume messages from
    channel.queue_declare(queue=queue_name)

    # Define a callback function to handle incoming messages
    def callback(ch, method, properties, body):
        print(f" [x] Received {body}.")


    # Set up a consumer to receive messages from the queue and pass them to the callback function
    channel.basic_consume(
        queue=queue_name,
        on_message_callback=callback,
        auto_ack=True
    )

    # Start consuming messages from the queue
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()


if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')

        # Attempt to exit gracefully
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)
```

**Pergunta:** Como é feita a distribuição de mensagens? Teste com 2 e 3 consumidores, pelo menos!

# Tarefas *que demandam tempo*

Para simular tarefas que demandam tempo, enviaremos mensagens com um certo número de `.` ao final, onde cada ponto representa um segundo de espera para o consumidor.

Façamos as seguintes alterações em nosso código:


---
`produtor`
```python
 # Assemble message
    time_stamp = dt.datetime.strftime(dt.datetime.now(), format='%Y-%m-%d %H:%M:%S.%f')
    
    # NOVA TAREFA
    message = f'{time_stamp} {i:6} {"."*random.randint(1,10)}'
```
---
`consumidor`
```python
 # Define a callback function to handle incoming messages
    def callback(ch, method, properties, body):
        print(f" [x] Received {body}")

        # Simulate work being done on the message by sleeping for an amount of time
        time.sleep(body.count(b'.'))
        print(" [x] Done")
```
---

`produtor`
---

`consumidor`
---

O que acontece?
---

**Pergunta:**  O que acontece se desligamos um consumidor no meio da execução da tarefa?

**Exercícios** 
1. Crie uma fila vazia `work_queues`.
2. Inicie três consumidores (`02_worker_consumer.py`) em terminais diferentes.
3. Envie 20 mensagens que demandam tempo para essa fila.
4. Derrube 2 consumidores.
5. Após todas as mensagens terem sido processadas, religue os dois consumidores.

# Message acknowledgment

auto_ack
---

* **True**: indica que o servidor RabbitMQ pode deletar as tarefas.
* **False**: as tarefas ficam no servidor e serão consumidas todas vez que um novo consumidor se conectar à fila.

**Observe:** 
Repita o exercício acima, trocando **auto_ack** para *False*. 
1. As mensagens ainda foram perdidas?
2. Ao final do processo, religue os 2 consumidores e em seguida interrompa o que recebeu todas as mensagens. O que acontece?

Manual acknowledgment
---

Nos exercícios acima, percebemos que as tarefas enviadas pelo produtor que não foram executas antes do consumidor *sair do ar* ou foram perdidas ou são reprocessadas todas vez que iniciamos um novo consumidor nessa fila. Esses comportamentos não são desejados!

É importante que todas as tarefas sejam executadas, **uma única vez**, independentemente de falhas na entrega e/ou processamento das mensagens!

Para conseguirmos o comportamento desejado, devemos parar de utilizar o **auto_ack** e realizar a confirmação de recebimento das mensagens manualmente para cada mensagem! Esse efeito é atingido alterando o código para:

---
`consumer`
```python
# Define a callback function to handle incoming messages
    def callback(ch, method, properties, body):
        print(f" [x] Received {body}.")

        # Simulate work being done on the message by sleeping for an amount of time
        time.sleep(body.count(b'.'))
        print(" [x] Done")

        ch.basic_ack(delivery_tag = method.delivery_tag)
```

---
`consumer`
```python
   # Set up a consumer to receive messages from the queue and pass them to the callback function
    channel.basic_consume(
        queue=queue_name,
        on_message_callback=callback,
        # auto_ack=True
    )
```
---

`consumidor`
---

O que acontece?
---

**Perguntas**  
1. O que acontece se desligamos um consumidor no meio da execução da tarefa?

2. O que acontece ao religarmos os consumidores após todas as mensagens terem sido consumidas?

**Exercícios - replay** 
1. Crie uma fila vazia `work_queues`.
2. Inicie três consumidores (`02_worker_consumer_ack.py`) em terminais diferentes.
3. Envie 20 mensagens que demandam tempo para a fila.
4. Derrube 2 consumidores.
5. Após todas as mensagens terem sido processadas, religue os dois consumidores.

# Message durability

Aprendemos como garantir que todas as mensagens sejam entregues uma única vez, independentemente de falhas na comunicação com os consumidores.

**Pergunta:** Mas o que acontece se o servidor 'cair' no meio do processo?


**Exercícios**
1. Crie uma fila vazia `work_queues`.
2. Inicie três consumidores (`02_worker_consumer_ack.py`) em terminais diferentes.
3. Envie 20 mensagens que demandam tempo para a fila.
4. Liste as filas existentes `sudo rabbitmqctl list_queues`.
5. Derrube o servidor enquanto as mensagens estiverem sendo processadas `sudo service rabbitmq-server stop`.
6. Reinicie o servidor `sudo service rabbitmq-server start`
7. Liste as filas novamente.

Para garantir que as mensagens não sejam perdidas em caso de pane do servidor, é preciso marcar filas como duráveis e mensagens como persistentes, modificando o seguintes pedaços de código:

--- 
`producer` e `consumer`
```python
queue_name = 'work_queues_durable' #não é possível redefinir uma fila existente
```

--- 
`producer` e `consumer`
```python
channel.queue_declare(
    queue=queue_name,
    durable=True
)
```

---

`producer`
```python
channel.basic_publish(
    exchange='',
    routing_key=queue_name,
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
    )
)
```
---

`produtor`
---

`consumidor`
---

O que acontece?
---

**Pergunta:** Mas o que acontece se o servidor 'cair' no meio do processo?


**Exercícios**
1. Crie uma fila vazia `work_queues`.
2. Inicie três consumidores (`02_worker_consumer_ack.py`) em terminais diferentes.
3. Envie 20 mensagens que demandam tempo para a fila.
4. Liste as filas existentes `sudo rabbitmqctl list_queues`.
5. Derrube o servidor enquanto as mensagens estiverem sendo processadas `sudo service rabbitmq-server stop`.
6. Reinicie o servidor `sudo service rabbitmq-server start`
7. Liste as filas novamente.

O que acontece 2?
---

**Pergunta:** Ao religar o servidor e os 2 ou 3 consumidores, o que você observa?


**Exercícios**
1. Após o exercício anterior, religue os consumidores.

# Fair dispatching

Apesar de não perdermos as mensagens em casos de falga de comunicação e/ou queda do servidor, percebemos que o *round-robin dispatching* não distribui as mensagens conforme desejamos: o primeiro consumidor bloqueia e processa tudo o que está na lista, não importa quantos outros consumidores sejam instanciados para ajudar!

No *round-robin dispatching*, o servidor distribui $M$ mensagens igualitariamente para $N$ consumidores. Ao ligarmos o primeiro consumidor, como somente ele está disponível, ele recebe todas as mensagens.

Para resolver esse problema, habilitamos o *Quality of Service (QoS)* para o canal e definimos `prefetch_count=1`, ou seja, cada consumidor recebe uma mensagem por vez, garantindo a otimização do processamento.

![prefetch-count](https://s3-sa-east-1.amazonaws.com/lcpi/7cfc070b-dd79-4ec5-8d28-e15bd745e37f.png)

Adicionemos o código abaixo em nosso consumidor:

---
`consumer`
```python
# Set up a consumer to receive messages from the queue and pass them to the callback function
    channel.basic_qos(prefetch_count=1)
    
    channel.basic_consume(
        queue=queue_name,
        on_message_callback=callback,
    )
```
---

`consumidor`
---

O que acontece?
---

**Pergunta:** O que acontece quando removemos e adicionamos consumidores?


**Exercícios**
1. Crie uma fila vazia `work_queues_durable`.
2. Inicie três consumidores (`02_worker_consumer_ack.py`) em terminais diferentes.
3. Envie 20 mensagens que demandam tempo para a fila.
4. Liste as filas existentes `sudo rabbitmqctl list_queues`.
5. Derrube um ou mais consumidores.
6. Reinicie um ou mais consumidores.

# Deletando queues

In [None]:
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
channel = connection.channel()

queue_name = 'work_queues_durable'

channel.queue_delete(queue=queue_name)

connection.close()