# MQTT Subscribe-Publish Testing Environment

## INTRODUCTION

This notebook demonstrates the functionality of `publish` and `subscribe` using:
- Local Mosquitto MQTT broker
- The `publish` decorator to send messages in **synchronous** mode
- The `subscribe` decorator to process messages in **synchronous** mode

Currently, `async_publish` and `async_subscribe` are also implemented, but **not thoroughly tested**.

### Prerequisites
1. Ensure Mosquitto is installed and running:
   ```bash
   brew install mosquitto
   mosquitto
   ```
2. Ensure `pycarta` is installed in editable mode (`pip install -e .`) from the root directory of the project.
3. A mosquitto subscriber can be set up as follows for testing purposes:
   ```bash
   mosquitto_sub -t test
   ```

---
## SECTION 1: Synchronous Mode

In [None]:
import logging
import os
from dotenv import load_dotenv
logging.basicConfig(
    level=logging.DEBUG, 
    format='%(asctime)s - %(levelname)s - %(message)s'
)
load_dotenv(override=True)
print(f"POLLING_TIMEOUT: {os.getenv('POLLING_TIMEOUT')}")
print(f"POLLING_INTERVAL: {os.getenv('POLLING_INTERVAL')}")


### Example 1.1: Publish Results of a Function
This example demonstrates publishing results to a topic using the `publish` decorator.

In [None]:
import time
from pycarta.mqtt.publisher import publish
from pycarta.mqtt.subscriber import subscribe

# Use the publish decorator on a simple function
@publish("test")
def add(lhs, rhs):
    return lhs + rhs

In [None]:
# Publish results
add(12, 34)  # Publishes 46

In [4]:
# Clean up
del add

### Example 1.2: Subscribe to a Topic and Process Messages
This example demonstrates subscribing to the topic and processing messages using the `subscribe` decorator.

In [None]:
@subscribe("test")
def double(x):
    return x * 2

# Single message processing (simulate publisher: mosquitto_pub -t test -m 25)
# The subscriber will stop if:
# 1. Receive a published message
# 2. Timeout (TimeoutException after certain amount of time)
# 3. Canceled (KeyboardInterrupt)
double()

### Example 1.3: Subscribe and Process Multiple Messages
This example shows how to process multiple messages until a `KeyboardInterrupt` or polling timeout occurs.

In [None]:
# Process messages until KeyboardInterrupt or timeout (default: 30 seconds)
[x for x in double]

#

---
## SECTION 2: Asynchronous Mode

In [None]:
import logging
import os
from dotenv import load_dotenv
logging.basicConfig(
    level=logging.DEBUG, 
    format='%(asctime)s - %(levelname)s - %(message)s'
)
load_dotenv(override=True)
print(f"POLLING_TIMEOUT: {os.getenv('POLLING_TIMEOUT')}")
print(f"POLLING_INTERVAL: {os.getenv('POLLING_INTERVAL')}")

In [8]:
import asyncio
from pycarta.mqtt.publisher import async_publish
from pycarta.mqtt.subscriber import async_subscribe

# Define the asynchronous publisher and subscriber functions
@async_publish("test")
async def add(lhs, rhs):
    return lhs + rhs

# Define the asynchronous subscriber function to yield
@async_subscribe("test")
async def double(x):
    return x * 2

In [None]:
# Start the subscriber asynchronously in the background.
# The subscriber will stop if:
# 1. Receive a published message
# 2. Timeout
# 3. Canceled (see below)
subscriber_task = asyncio.create_task(double())

In [None]:
# Stop the subscriber
# Case 1: While the subscriber is running, you can stop it by cancelling the task, you will receive a returned value of `True`
# Case 2: While the subscriber is not running, you will receive a returned value of `False`
subscriber_task.cancel()

In [None]:
# This will actually run the asynchronous publisher in synchronous mode in JN
await double()

In [None]:
# This will run the asynchronous publisher in non-blocking mode in JN
[message async for message in double]

### Example 2.1: Asynchronous Calculation Publish and Subscribe

#### Overview
This script demonstrates an asynchronous MQTT-based publish and subscribe system for simple calculations, leveraging Python's `asyncio` for efficient, non-blocking execution.

#### Process
- **Publish**:
  - Asynchronously calculates the sum of two numbers.
  - Publishes the result to the MQTT topic `test`.
  - Sends multiple messages sequentially with a delay between each.

- **Subscribe**:
  - Asynchronously listens to the `test` topic for incoming messages.
  - Processes each message by doubling its value and printing the result.
  - Stops automatically when a processed value exceeds 20, demonstrating event-driven control.


In [None]:
import asyncio
from pycarta.mqtt.publisher import async_publish
from pycarta.mqtt.subscriber import async_subscribe

# Define the asynchronous publisher and subscriber functions
@async_publish("test")
async def add(lhs, rhs):
    return lhs + rhs

# Define the asynchronous subscriber function to yield
@async_subscribe("test")
async def double(x):
    print(f"[Subscriber] Received message: {x}")
    result = int(x) * 2
    print(f"[Subscriber] Doubled value: {result}")
    return result

async def main():
    print("[Main] Starting MQTT subscription to 'test' topic...")
    
    # Initialize a list to store received messages
    received_messages = []
    
    # Start the publisher as a background task
    publisher_task = asyncio.create_task(publish_messages())
    print("[Main] Publisher task started in the background. Subscriber will process the delayed incoming messages.")
    
    # Process incoming messages from the subscriber using async for
    async for message in double:
        if message is not None:
            print(f"[Main] Processed message result: {message}")
            # Append each received message to the list
            received_messages.append(message)
            # Exit condition for demonstration purposes
            if message > 20:
                print("[Main] Received a message greater than 20. Exiting.")
                break
    
    # Await the publisher task to ensure all messages are published
    await publisher_task
    
    print("[Main] Subscription ended.")
    
    # Access the list of received messages after exiting the loop
    print(f"[Main] All received messages: {received_messages}")

async def publish_messages():
    # List of values to publish
    test_values = [0, 5, 10, 15, 20]
    
    # Publish messages sequentially
    for value in test_values:
        await add(value, value)  # For example, publish lhs + rhs
        await asyncio.sleep(3)   # Short delay between publishes
    
    print("[Publisher] Published all messages.")

# Execute the main coroutine
await main()

In [None]:
# NOTE: If you run the previous cell, any message published after the subscriber is stopped will remain unprocessed in the queue. As a result, when the subscriber is resumed, the stale messages will still be processed. For example, if the subscriber processes numbers and the previously processed results are [60, 80, 2, 4, 6], newly published numbers 1, 2, 3 in separate publishing events would unexpectedly append their processed results to the old results, leading to [60, 80, 2, 4, 6] instead of the expected [2, 4, 6].

# To address this issue, you must clear the queue explicitly using one of the following approaches before resuming or stopping the subscriber:

# 1. Clear the queue explicitly:
await double.clear_queue()

# 2. Disconnect the subscriber and clear the queue simultaneously:
# await double.disconnect_async(clear_queue=True)

# Simply disconnecting the subscriber without clearing the queue will not remove the residual messages, and they will persist:
# await double.disconnect_async()

# To confirm the contents of the queue at any point, you can inspect it directly:
list(double.result_queue._queue)

In [None]:
# After running the cleanup method, the result queue should be empty before new subscriber is launched, and the result should be [2, 4, 6] as expected now.
await double()
# [message async for message in double]

### Example 2.2: Asynchronous Sensor Monitoring and Control

#### Overview
This script demonstrates an asynchronous MQTT-based system for monitoring sensor values and triggering control actions based on predefined thresholds.

#### Process
- **Publish**:
  - Sends simulated sensor values to the MQTT topic **`machine/sensor`**.
  - Calculates and publishes the sum of two numbers as the sensor value.
  - Provides error (`STOP`) or no-error (`CONTINUE`) status messages based on thresholds.

- **Subscribe**:
  - Listens to the **`machine/sensor`** topic for sensor values.
  - Processes incoming messages and evaluates them against defined thresholds.
  - Publishes a **`STOP`** action if the value is out of bounds, halting further processing.
  - Publishes a **`CONTINUE`** action if the value is within bounds, allowing normal operation.

- **Main**:
  - Manages asynchronous publishing and subscribing with `asyncio`.
  - Ensures sensor values are monitored in real-time and appropriate control actions are taken.
  - Ends the subscription after an out-of-bound value triggers a **`STOP`** action.
  

In [None]:
import asyncio
import json
from pycarta.mqtt.publisher import async_publish
from pycarta.mqtt.subscriber import async_subscribe

# Define expected value and variance
mu = 12.34
sigma = 3.45
lower_bound, upper_bound = (mu - 2.0 * sigma, mu + 2.0 * sigma)

# Define the asynchronous publisher functions
@async_publish(topic="machine/sensor")
async def add(lhs, rhs):
    return lhs + rhs

@async_publish(topic="machine/sensor")
async def on_error():
    return "STOP"

@async_publish(topic="machine/sensor")
async def no_error():
    return "CONTINUE"

# Define the asynchronous subscriber function to yield sensor value
@async_subscribe("machine/sensor")
async def get_sensor_value(x):
    return x

# Define the main coroutine
async def main():
    print("[Main] Starting MQTT subscription to 'machine/sensor' topic...")
    
    # Initialize a list to store received messages
    received_messages = []
    
    # Start the publisher as a background task
    publisher_task = asyncio.create_task(publish_messages())
    print("[Main] Publisher task started in the background. Subscriber will process the incoming messages.")
    
    # Process incoming messages from the subscriber using async for
    async for x in get_sensor_value:
        if x is not None:
            print(f"[Main] Received sensor value: {x}")
            
            # Check if x can be converted to a number
            try:
                x_float = float(x)
                received_messages.append(x)  # Append each received message to the list
            except ValueError:
                print(f"[Main] Received non-numeric sensor value: {x}. Ignoring.")
                continue

            # Check error condition
            if not lower_bound < x < upper_bound:
                await on_error()
                print(f"[Main] Published 'STOP' action: value out of bounds [{lower_bound:.2f}, {upper_bound:.2f}]. ")
                break
            else:
                await no_error()
                print("[Main] Published 'CONTINUE' action.")
    
    # Await the publisher task to ensure all messages are published
    await publisher_task
    
    print("[Main] Subscription ended.")
    
    # Access the list of received messages after exiting the loop
    print(f"[Main] All received messages: {received_messages}")

# Define the publisher coroutine
async def publish_messages():
    # List of values to publish
    test_values = [7, 9, 11, 13]
    
    # Publish messages sequentially with a 3-second delay between publishes
    for value in test_values:
        result = value + value  # Example: publish lhs + rhs (e.g., 5 + 5 = 10)
        await add(value, value)  # Publish the result to 'machine/sensor'
        print(f"[Publisher] Published sensor value: {result}")
        await asyncio.sleep(3)   # Short delay between publishes
    
    print("[Publisher] Published all messages.")

# Execute the main coroutine
await main()
