# Pub/Sub Publisher

https://www.rabbitmq.com/tutorials/tutorial-three-python.html

## Message Routing

RabbitMQ uses exchanges as the message routing mechanism. In a pub/sub model a publisher will publish a message to an exchange.  Subscribers will each have their own queue that they are watching for messages to appear into.  Exchanges will then route messages sent by publishers to the appropriate subscriber queues.

In [1]:
import pika
import time
from datetime import datetime
import json

In [2]:
RABBIT_CONNECTION = 'amqp://guest:guest@rabbit:5672/'
EXCHANGE_NAME = 'logs'

In [3]:
def create_channel():
    connection = pika.BlockingConnection(
        pika.connection.URLParameters(RABBIT_CONNECTION))
    channel = connection.channel()
    
    return channel, connection

### Fanout exchange

A fanout exchange will send a copy of a published method to each queue with a binding to the exchange. This is what we use for pub/sub so each subscriber will get a copy of the message.

In [4]:
def declare_exchange(channel):
    channel.exchange_declare(exchange='logs', exchange_type='fanout')

In [5]:
def publish_log_messages(messages=10):
    channel, connection = create_channel()
    declare_exchange(channel)
    
    for message in range(1, messages+1):
        channel.basic_publish(exchange=EXCHANGE_NAME, 
                              routing_key='', 
                              body=f'Log Message # {message}')
        print(f'Log Message # {message} sent')
        
    connection.close()

In [6]:
# publish_log_messages()

In [7]:
def periodic_publish(total_messages=200):
    channel, connection = create_channel()
    declare_exchange(channel)
    
    for iteration in range(1, total_messages+1):
        message = {
            "timestamp": datetime.now().isoformat(),
            "messageNumber": iteration,
            "source": "Python Log Publisher"
        }
        
        channel.basic_publish(exchange=EXCHANGE_NAME, 
                              routing_key='', 
                              body=json.dumps(message))
        
        print(f'Published JSON log message # {iteration}')
        time.sleep(2)
    
    connection.close()

In [9]:
periodic_publish(100)

Published JSON log message # 1
Published JSON log message # 2
Published JSON log message # 3
Published JSON log message # 4
Published JSON log message # 5
Published JSON log message # 6
Published JSON log message # 7
Published JSON log message # 8
Published JSON log message # 9
Published JSON log message # 10
Published JSON log message # 11
Published JSON log message # 12
Published JSON log message # 13
Published JSON log message # 14
Published JSON log message # 15
Published JSON log message # 16
Published JSON log message # 17
Published JSON log message # 18
Published JSON log message # 19
Published JSON log message # 20
Published JSON log message # 21
Published JSON log message # 22
Published JSON log message # 23
Published JSON log message # 24
Published JSON log message # 25
Published JSON log message # 26
Published JSON log message # 27
Published JSON log message # 28
Published JSON log message # 29
Published JSON log message # 30
Published JSON log message # 31
Published JSON lo