In [1]:
!pip install pika

Defaulting to user installation because normal site-packages is not writeable


In [2]:
import pika

# Establish connection
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

print("Connected to RabbitMQ")

Connected to RabbitMQ


In [3]:
channel.queue_declare(queue='test_queue')
print("Queue declared: test_queue")

Queue declared: test_queue


In [4]:
channel.basic_publish(exchange='', routing_key='test_queue', body='Hello from Jupyter!')
print("Sent message: Hello from Jupyter!")

Sent message: Hello from Jupyter!


In [5]:
def callback(ch, method, properties, body):
    print(f" [✓] Received: {body.decode()}")

channel.basic_consume(queue='test_queue', on_message_callback=callback, auto_ack=True)
print("Waiting for messages... Run next cell to consume.")

Waiting for messages... Run next cell to consume.


In [6]:
print("Listening for messages... Press 'STOP' in Jupyter to stop.")
try:
    channel.start_consuming()
except KeyboardInterrupt:
    print("Stopped consuming.")

Listening for messages... Press 'STOP' in Jupyter to stop.
 [✓] Received: Hello from Jupyter!
Stopped consuming.


In [12]:
channel.stop_consuming()
connection.close()
print("RabbitMQ connection closed.")

ConnectionWrongStateError: BlockingConnection.close(200, 'Normal shutdown') called on closed connection.

## Worker

In [18]:
import time

def on_request(ch, method, properties, body):
    message = body.decode()
    print(f" [✓] Received request: {message}")
    
    response = f"Processed: {message.upper()}"  # Process the request
    
    time.sleep(2)  # Simulate processing delay

    ch.basic_publish(exchange='',
                     routing_key=properties.reply_to,
                     properties=pika.BasicProperties(correlation_id=properties.correlation_id),
                     body=response)

    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.queue_declare(queue='rpc_queue')
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
print(" [*] Waiting for RPC requests... Run the next cell to send a request.")

 [*] Waiting for RPC requests... Run the next cell to send a request.


## RPC request

In [19]:
import uuid

response_queue = channel.queue_declare(queue='', exclusive=True).method.queue
correlation_id = str(uuid.uuid4())
response_received = None

def on_response(ch, method, properties, body):
    global response_received
    if properties.correlation_id == correlation_id:
        response_received = body.decode()

channel.basic_consume(queue=response_queue, on_message_callback=on_response, auto_ack=True)

print(" [x] Sending RPC request: 'hello'")
channel.basic_publish(exchange='',
                      routing_key='rpc_queue',
                      properties=pika.BasicProperties(reply_to=response_queue, correlation_id=correlation_id),
                      body="hola")

# Wait for response
while response_received is None:
    connection.process_data_events()

print(f" [✓] Got response: {response_received}")

 [x] Sending RPC request: 'hello'
 [✓] Received request: hola
 [✓] Got response: Processed: HOLA


## Producer

In [8]:
import pika

# 1️⃣ Connect to RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 2️⃣ Declare a Queue (Ensure it exists)
channel.queue_declare(queue='message_queue', durable=True)

# 3️⃣ Publish Messages
for i in range(5):
    message = f"Message {i}"
    channel.basic_publish(
        exchange='',
        routing_key='message_queue',
        body=message,
        properties=pika.BasicProperties(delivery_mode=2)  # Message persistence
    )
    print(f" [✓] Sent: {message}")

# 4️⃣ Close Connection
connection.close()
print("Publisher finished!")

 [✓] Sent: Message 0
 [✓] Sent: Message 1
 [✓] Sent: Message 2
 [✓] Sent: Message 3
 [✓] Sent: Message 4
Publisher finished!
