# MQTT Subscribe-Publish Testing Environment

## INTRODUCTION

This notebook demonstrates the functionality of `publish` and `subscribe` using:
- Several different broker types:
    - Local Mosquitto MQTT broker (without credentials)
    - Remote AIMPF MQTT broker (with credentials)
- The `publish` decorator to send messages in both **synchronous** and **asynchronous** modes
    - `@publish("test")`
    - `@async_publish("test")`
- The `subscribe` decorator to process messages in both **synchronous** and **asynchronous** modes
    - `@subscribe("test")`
    - `@async_subscribe("test")`

### Prerequisites
- **General**
    - Ensure `pycarta` is installed in editable mode (`pip install -e .`) from the root directory of the project.

- **Local Mosquitto MQTT broker**
    - Ensure Mosquitto is installed and running:
        ```bash
        brew install mosquitto
        mosquitto
        ```
    - A mosquitto subscriber can be set up as follows to listen to topic `test`:
        ```bash
        mosquitto_sub -t test
        ```
    - A mosquitto publisher can publish `$YOUR_MESSAGE` to topic `test` as follows:
        ```bash
        mosquitto_pub -t test -m $YOUR_MESSAGE
        ```

- **Remote AIMPF MQTT broker**
    - A set of credentials available including:
        - broker_address (not in the certificate requirement, but will be needed for connection)
        - ca_certificate
        - private_key
        - certificate
    - Basic information about the broker, e.g., broker address, port, etc.
    - (Optional, but highly recommended) A Carta account for credential CRUD operations 
        - You have already uploaded MQTT credentials to your chosen `tag`. Please check the Jupyter Notebook named `credential-crud.ipynb` for more details.
        - Environment variables set for authentication for your account (e.g., `CARTA_CHEN_USERNAME`, `CARTA_CHEN_PASSWORD`).


In [1]:
# NOTE: Here, we are trying to make the brokers definition consistent between Jupyter Notebook and Pytest.
BROKERS = [
    {
        "label": "mosquitto_local",
        "host": "localhost",
        "port": 1883,
    },
    {
        "label": "aimpf_remote_cred",
        "host": "aop63bhe1nwsr-ats.iot.us-east-1.amazonaws.com",
        "port": 8883,
    },
]

# Create a dictionary for quick access by label
brokers_dict = {broker["label"]: broker for broker in BROKERS}

# NOTE: Before you proceed, select the testing broker by its label
# testing_broker = brokers_dict["mosquitto_local"]
testing_broker = brokers_dict["aimpf_remote_cred"]



---
## SECTION 1: Synchronous Mode

In [2]:
import logging
import os
from dotenv import load_dotenv
logging.basicConfig(
    level=logging.INFO, 
    format='%(asctime)s - %(levelname)s - %(message)s'
)
load_dotenv(override=True)
logger = logging.getLogger(__name__)
print(f"POLLING_TIMEOUT: {os.getenv('POLLING_TIMEOUT')}")
print(f"POLLING_INTERVAL: {os.getenv('POLLING_INTERVAL')}")


POLLING_TIMEOUT: 10
POLLING_INTERVAL: 1


In [24]:
import time
import os
import logging
from pycarta import login
from pycarta.mqtt.publisher import publish
from pycarta.mqtt.subscriber import subscribe
from pycarta.mqtt.credential import BilateralCredentialAuthenticator

logger = logging.getLogger(__name__)

if testing_broker["label"] == "aimpf_remote_cred":
    # Environment variables
    CARTA_USERNAME = os.getenv("CARTA_CHEN_USERNAME")
    CARTA_PASSWORD = os.getenv("CARTA_CHEN_PASSWORD")
    CARTA_HOST = os.getenv("CARTA_HOST_DEV", "https://api.sandbox.carta.contextualize.us.com")
    TAG = "mb33-thing301"
    certificate_folder = "../../tests/mqtt/mb33-thing301"
    ca_cert_file = "../../tests/mqtt/mb33-thing301/AmazonRootCA1.pem"
    client_cert_file = "../../tests/mqtt/mb33-thing301/34c68a3ca20dfafa5a327c03529649d206b13a789c08df92be63bbea2514cb3d-certificate.pem.crt"
    client_key_file = "../../tests/mqtt/mb33-thing301/34c68a3ca20dfafa5a327c03529649d206b13a789c08df92be63bbea2514cb3d-private.pem.key"

    # Approach #1: Provide 3 file paths
    # auth = BilateralCredentialAuthenticator.from_cert_files(
    #     ca_cert=ca_cert_file,
    #     cert=client_cert_file,
    #     key=client_key_file
    # )
    # logger.info("Approach #1 authenticator created.")

    # Approach #2: Provide 1 folder path containing exactly 3 PEM files
    # auth = BilateralCredentialAuthenticator.from_folder(certificate_folder)
    # logger.info("Approach #2 authenticator created.")

    # Approach #3: Provide username/password/tag to retrieve from Carta
    login(username=CARTA_USERNAME, password=CARTA_PASSWORD, host=CARTA_HOST)
    auth = BilateralCredentialAuthenticator.from_carta(tag=TAG)
    logger.info("Approach #3 authenticator created via Carta tag '%s'.", TAG)

    logger.info("BilateralCredentialAuthenticator initialized!")



2025-01-23 08:50:01,134 - INFO - Approach #3 authenticator created via Carta tag 'mb33-thing301'.
2025-01-23 08:50:01,135 - INFO - BilateralCredentialAuthenticator initialized!


### Example 1.1: Publish Results of a Function
This example demonstrates publishing results to a topic using the `publish` decorator.

In [21]:
# Use the publish decorator on a simple function
if testing_broker["label"] == "mosquitto_local":
    @publish("test")
    def add(lhs, rhs):
        return lhs + rhs
elif testing_broker["label"] == "aimpf_remote_cred":
    @publish(
        topic="test",
        host=testing_broker["host"],
        port=testing_broker["port"],
        qos=1,
        authenticator=auth)
    def add(lhs, rhs):
        return lhs + rhs

logger.info(f"Function `add` has been defined for testing against broker `{testing_broker["label"]}`")

2025-01-23 08:49:08,693 - INFO - Function `add` has been defined for testing against broker `aimpf_remote_cred`


In [22]:
# Publish results
add(12, 30)  # Publishes 42

2025-01-23 08:49:09,864 - DEBUG - [sync publish] Preparing to publish to topic test.
2025-01-23 08:49:09,865 - INFO - Connecting to aop63bhe1nwsr-ats.iot.us-east-1.amazonaws.com:8883...
2025-01-23 08:49:10,083 - INFO - [MQTTConnection] Connected None to aop63bhe1nwsr-ats.iot.us-east-1.amazonaws.com:8883
2025-01-23 08:49:10,084 - DEBUG - [publish] Content is not a Pydantic BaseModel.
2025-01-23 08:49:10,085 - INFO - [sync publish] Successfully published to test: 42
2025-01-23 08:49:10,085 - INFO - [MQTTConnection] Explicitly disconnecting from broker.
2025-01-23 08:49:10,086 - INFO - [MQTTConnection] Stopping the MQTT loop.


42

2025-01-23 08:49:10,125 - DEBUG - [async subscribe] Processed message result: 84


[Subscriber] Received message: 42
[Subscriber] Doubled value: 84


In [6]:
# Clean up
del add

### Example 1.2: Subscribe to a Topic and Process Messages
This example demonstrates subscribing to the topic and processing messages using the `subscribe` decorator.

In [7]:

if testing_broker["label"] == "mosquitto_local":
    @subscribe("test")
    def double(x):
        return x * 2
elif testing_broker["label"] == "aimpf_remote_cred":
    @subscribe(
        topic="test",
        host=testing_broker["host"],
        port=testing_broker["port"],
        authenticator=auth)
    def double(x):
        return x * 2

logger.info(f"Function `double` has been defined for testing against broker `{testing_broker["label"]}`")

# Single message processing (simulate publisher: mosquitto_pub -t test -m 25)
# The subscriber will stop if:
# 1. Receive a published message
# 2. Timeout (TimeoutException after certain amount of time)
# 3. Canceled (KeyboardInterrupt)
double()

2025-01-23 08:38:56,406 - INFO - Function `double` has been defined for testing against broker `aimpf_remote_cred`
2025-01-23 08:38:56,407 - INFO - Connecting to aop63bhe1nwsr-ats.iot.us-east-1.amazonaws.com:8883...
2025-01-23 08:38:56,670 - INFO - [MQTTConnection] Connected None to aop63bhe1nwsr-ats.iot.us-east-1.amazonaws.com:8883
2025-01-23 08:38:56,672 - DEBUG - [sync subscribe] Subscribed to topic: test
2025-01-23 08:38:57,677 - DEBUG - [sync subscribe] Polling... Timeout in 9 seconds.
2025-01-23 08:38:57,678 - INFO - [MQTTConnection] Stopping the MQTT loop.
2025-01-23 08:38:58,686 - DEBUG - [sync subscribe] Polling... Timeout in 8 seconds.
2025-01-23 08:38:59,692 - DEBUG - [sync subscribe] Polling... Timeout in 7 seconds.
2025-01-23 08:39:00,695 - DEBUG - [sync subscribe] Polling... Timeout in 6 seconds.
2025-01-23 08:39:00,702 - DEBUG - [sync subscribe] Received raw message: 2
2025-01-23 08:39:00,702 - DEBUG - [sync subscribe] Processed message: 4
2025-01-23 08:39:00,702 - INFO 

4

### Example 1.3: Subscribe and Process Multiple Messages
This example shows how to process multiple messages until a `KeyboardInterrupt` or polling timeout occurs.

In [8]:
# Process messages until KeyboardInterrupt or timeout (default: 30 seconds)
[x for x in double]

2025-01-23 08:39:03,215 - INFO - Connecting to aop63bhe1nwsr-ats.iot.us-east-1.amazonaws.com:8883...
2025-01-23 08:39:03,431 - INFO - [MQTTConnection] Connected None to aop63bhe1nwsr-ats.iot.us-east-1.amazonaws.com:8883
2025-01-23 08:39:03,433 - DEBUG - [sync subscribe] Subscribed to topic: test
2025-01-23 08:39:04,439 - DEBUG - [sync subscribe] Polling... Timeout in 9 seconds.
2025-01-23 08:39:05,445 - DEBUG - [sync subscribe] Polling... Timeout in 8 seconds.
2025-01-23 08:39:06,452 - DEBUG - [sync subscribe] Polling... Timeout in 7 seconds.
2025-01-23 08:39:06,539 - DEBUG - [sync subscribe] Received raw message: 2
2025-01-23 08:39:06,539 - DEBUG - [sync subscribe] Processed message: 4
2025-01-23 08:39:07,542 - DEBUG - [sync subscribe] Polling... Timeout in 9 seconds.
2025-01-23 08:39:08,370 - DEBUG - [sync subscribe] Received raw message: 4
2025-01-23 08:39:08,371 - DEBUG - [sync subscribe] Processed message: 8
2025-01-23 08:39:09,376 - DEBUG - [sync subscribe] Polling... Timeout in 

[4, 8, 12]

#

---
## SECTION 2: Asynchronous Mode

In [16]:
import logging
import os
from dotenv import load_dotenv
logging.basicConfig(
    level=logging.DEBUG, 
    format='%(asctime)s - %(levelname)s - %(message)s'
)
load_dotenv(override=True)
print(f"POLLING_TIMEOUT: {os.getenv('POLLING_TIMEOUT')}")
print(f"POLLING_INTERVAL: {os.getenv('POLLING_INTERVAL')}")

POLLING_TIMEOUT: 10
POLLING_INTERVAL: 1


In [17]:
import asyncio
from pycarta.mqtt.publisher import async_publish
from pycarta.mqtt.subscriber import async_subscribe

# Define the asynchronous publisher and subscriber functions
if testing_broker["label"] == "mosquitto_local":
    @async_publish("test")
    async def add(lhs, rhs):
        return lhs + rhs
elif testing_broker["label"] == "aimpf_remote_cred":
    @async_publish(
        topic="test",
        host=testing_broker["host"],
        port=testing_broker["port"],
        authenticator=auth)
    def add(lhs, rhs):
        return lhs + rhs

logger.info(f"Async function `add` has been defined for testing against broker `{testing_broker["label"]}`")

# Define the asynchronous subscriber function to yield
if testing_broker["label"] == "mosquitto_local":
    @async_subscribe("test")
    async def double(x):
        return x * 2
elif testing_broker["label"] == "aimpf_remote_cred":
    @async_subscribe(
        topic="test",
        host=testing_broker["host"],
        port=testing_broker["port"],
        authenticator=auth)
    async def double(x):
        return x * 2

logger.info(f"Async function `double` has been defined for testing against broker `{testing_broker["label"]}`")

2025-01-23 08:41:11,987 - INFO - [MQTTConnection] Stopping the MQTT loop.
2025-01-23 08:41:11,988 - INFO - Async function `add` has been defined for testing against broker `aimpf_remote_cred`
2025-01-23 08:41:11,989 - INFO - Async function `double` has been defined for testing against broker `aimpf_remote_cred`


In [None]:
# Start the subscriber asynchronously in the background.
# The subscriber will stop if:
# 1. Receive a published message
# 2. Timeout
# 3. Canceled (see below)
subscriber_task = asyncio.create_task(double())

2025-01-23 08:39:30,690 - DEBUG - [async subscribe] Starting single message processing.
2025-01-23 08:39:30,690 - DEBUG - [async subscribe] Connecting to aop63bhe1nwsr-ats.iot.us-east-1.amazonaws.com:8883.
2025-01-23 08:39:30,690 - INFO - Connecting to aop63bhe1nwsr-ats.iot.us-east-1.amazonaws.com:8883...


2025-01-23 08:39:30,905 - INFO - [MQTTConnection] Connected None to aop63bhe1nwsr-ats.iot.us-east-1.amazonaws.com:8883
2025-01-23 08:39:30,906 - DEBUG - [async subscribe] Subscribed to topic: test
2025-01-23 08:39:31,908 - DEBUG - [async subscribe] Polling... Timeout in 9 seconds.
2025-01-23 08:39:32,910 - DEBUG - [async subscribe] Polling... Timeout in 8 seconds.
2025-01-23 08:39:33,913 - DEBUG - [async subscribe] Polling... Timeout in 7 seconds.
2025-01-23 08:39:34,665 - DEBUG - [async subscribe] Received raw message: 6
2025-01-23 08:39:34,666 - DEBUG - [async subscribe] Processed message result: 12
2025-01-23 08:39:34,667 - DEBUG - [async subscribe] Disconnecting from aop63bhe1nwsr-ats.iot.us-east-1.amazonaws.com:8883.
2025-01-23 08:39:34,667 - INFO - [MQTTConnection] Explicitly disconnecting from broker.
2025-01-23 08:39:34,668 - INFO - [MQTTConnection] Stopping the MQTT loop.
2025-01-23 08:39:34,671 - DEBUG - [async subscribe] Disconnected from MQTT broker.
2025-01-23 08:39:42,029

In [12]:
# Stop the subscriber
# Case 1: While the subscriber is running, you can stop it by cancelling the task, you will receive a returned value of `True`
# Case 2: While the subscriber is not running, you will receive a returned value of `False`
subscriber_task.cancel()

False

In [13]:
# This will actually run the asynchronous publisher in synchronous mode in JN
await double()

2025-01-23 08:39:41,853 - DEBUG - [async subscribe] Starting single message processing.
2025-01-23 08:39:41,853 - DEBUG - [async subscribe] Connecting to aop63bhe1nwsr-ats.iot.us-east-1.amazonaws.com:8883.
2025-01-23 08:39:41,853 - INFO - Connecting to aop63bhe1nwsr-ats.iot.us-east-1.amazonaws.com:8883...
2025-01-23 08:39:42,030 - DEBUG - [async subscribe] Subscribed to topic: test
2025-01-23 08:39:43,031 - DEBUG - [async subscribe] Polling... Timeout in 9 seconds.
2025-01-23 08:39:44,034 - DEBUG - [async subscribe] Polling... Timeout in 8 seconds.
2025-01-23 08:39:45,036 - DEBUG - [async subscribe] Polling... Timeout in 7 seconds.
2025-01-23 08:39:46,039 - DEBUG - [async subscribe] Polling... Timeout in 6 seconds.
2025-01-23 08:39:46,674 - DEBUG - [async subscribe] Processed message result: 4
2025-01-23 08:39:46,675 - DEBUG - [async subscribe] Disconnecting from aop63bhe1nwsr-ats.iot.us-east-1.amazonaws.com:8883.
2025-01-23 08:39:46,676 - INFO - [MQTTConnection] Explicitly disconnecti

4

In [14]:
# This will run the asynchronous publisher in non-blocking mode in JN
[message async for message in double]

2025-01-23 08:39:55,518 - DEBUG - [async subscribe] Connecting to aop63bhe1nwsr-ats.iot.us-east-1.amazonaws.com:8883.
2025-01-23 08:39:55,519 - INFO - Connecting to aop63bhe1nwsr-ats.iot.us-east-1.amazonaws.com:8883...
2025-01-23 08:39:55,749 - DEBUG - [async subscribe] Subscribed to topic: test
2025-01-23 08:39:56,519 - DEBUG - [async subscribe] Polling... Timeout in 9 seconds.
2025-01-23 08:39:57,521 - DEBUG - [async subscribe] Polling... Timeout in 8 seconds.
2025-01-23 08:39:58,523 - DEBUG - [async subscribe] Polling... Timeout in 7 seconds.
2025-01-23 08:39:59,525 - DEBUG - [async subscribe] Polling... Timeout in 6 seconds.
2025-01-23 08:40:00,528 - DEBUG - [async subscribe] Polling... Timeout in 5 seconds.
2025-01-23 08:40:01,091 - DEBUG - [async subscribe] Processed message result: 4
2025-01-23 08:40:02,093 - DEBUG - [async subscribe] Polling... Timeout in 9 seconds.
2025-01-23 08:40:02,435 - DEBUG - [async subscribe] Processed message result: 8
2025-01-23 08:40:03,438 - DEBUG -

[4, 8, 10, 12]

In [15]:
async for message in double:
    print(message)

2025-01-23 08:40:17,188 - DEBUG - [async subscribe] Connecting to aop63bhe1nwsr-ats.iot.us-east-1.amazonaws.com:8883.
2025-01-23 08:40:17,188 - INFO - Connecting to aop63bhe1nwsr-ats.iot.us-east-1.amazonaws.com:8883...
2025-01-23 08:40:17,415 - DEBUG - [async subscribe] Subscribed to topic: test
2025-01-23 08:40:18,189 - DEBUG - [async subscribe] Polling... Timeout in 9 seconds.
2025-01-23 08:40:19,191 - DEBUG - [async subscribe] Polling... Timeout in 8 seconds.
2025-01-23 08:40:20,193 - DEBUG - [async subscribe] Polling... Timeout in 7 seconds.
2025-01-23 08:40:21,195 - DEBUG - [async subscribe] Polling... Timeout in 6 seconds.
2025-01-23 08:40:22,198 - DEBUG - [async subscribe] Polling... Timeout in 5 seconds.
2025-01-23 08:40:23,200 - DEBUG - [async subscribe] Polling... Timeout in 4 seconds.
2025-01-23 08:40:24,203 - DEBUG - [async subscribe] Polling... Timeout in 3 seconds.
2025-01-23 08:40:25,206 - DEBUG - [async subscribe] Polling... Timeout in 2 seconds.
2025-01-23 08:40:25,739

4


2025-01-23 08:40:26,740 - DEBUG - [async subscribe] Polling... Timeout in 9 seconds.
2025-01-23 08:40:27,686 - DEBUG - [async subscribe] Processed message result: 8


8


2025-01-23 08:40:28,689 - DEBUG - [async subscribe] Polling... Timeout in 9 seconds.
2025-01-23 08:40:29,430 - DEBUG - [async subscribe] Processed message result: 12


12


2025-01-23 08:40:30,432 - DEBUG - [async subscribe] Polling... Timeout in 9 seconds.
2025-01-23 08:40:31,435 - DEBUG - [async subscribe] Polling... Timeout in 8 seconds.
2025-01-23 08:40:32,438 - DEBUG - [async subscribe] Polling... Timeout in 7 seconds.
2025-01-23 08:40:33,440 - DEBUG - [async subscribe] Polling... Timeout in 6 seconds.
2025-01-23 08:40:34,443 - DEBUG - [async subscribe] Polling... Timeout in 5 seconds.
2025-01-23 08:40:35,445 - DEBUG - [async subscribe] Polling... Timeout in 4 seconds.
2025-01-23 08:40:36,448 - DEBUG - [async subscribe] Polling... Timeout in 3 seconds.
2025-01-23 08:40:37,450 - DEBUG - [async subscribe] Polling... Timeout in 2 seconds.
2025-01-23 08:40:38,452 - DEBUG - [async subscribe] Polling... Timeout in 1 seconds.
2025-01-23 08:40:39,455 - DEBUG - [async subscribe] Polling... Timeout in 0 seconds.
2025-01-23 08:40:39,455 - INFO - [async subscribe] Timeout after 10.0 seconds. No messages received.
2025-01-23 08:40:39,456 - DEBUG - [async subscrib

### Example 2.1: Asynchronous Calculation Publish and Subscribe

#### Overview
This script demonstrates an asynchronous MQTT-based publish and subscribe system for simple calculations, leveraging Python's `asyncio` for efficient, non-blocking execution.

#### Process
- **Publish**:
  - Asynchronously calculates the sum of two numbers.
  - Publishes the result to the MQTT topic `test`.
  - Sends multiple messages sequentially with a delay between each.

- **Subscribe**:
  - Asynchronously listens to the `test` topic for incoming messages.
  - Processes each message by doubling its value and printing the result.
  - Stops automatically when a processed value exceeds 20, demonstrating event-driven control.


In [18]:
import asyncio
from pycarta.mqtt.publisher import async_publish
from pycarta.mqtt.subscriber import async_subscribe


if testing_broker["label"] == "mosquitto_local":
    # Define the asynchronous publisher and subscriber functions
    @async_publish("test")
    async def add(lhs, rhs):
        return lhs + rhs

    # Define the asynchronous subscriber function to yield
    @async_subscribe("test")
    async def double(x):
        return x * 2
elif testing_broker["label"] == "aimpf_remote_cred":
    # Define the asynchronous publisher and subscriber functions
    @async_publish(
        topic="test",
        host=testing_broker["host"],
        port=testing_broker["port"],
        qos=1,
        authenticator=auth)
    def add(lhs, rhs):
        return lhs + rhs

    # Define the asynchronous subscriber function to yield
    @async_subscribe(
        topic="test",
        host=testing_broker["host"],
        port=testing_broker["port"],
        authenticator=auth)
    async def double(x):
        print(f"[Subscriber] Received message: {x}")
        result = int(x) * 2
        print(f"[Subscriber] Doubled value: {result}")
        return result

async def main():
    print("[Main] Starting MQTT subscription to 'test' topic...")
    
    # Initialize a list to store received messages
    received_messages = []
    
    # Start the publisher as a background task
    publisher_task = asyncio.create_task(publish_messages())
    print("[Main] Publisher task started in the background. Subscriber will process the delayed incoming messages.")
    
    # Process incoming messages from the subscriber using async for
    async for message in double:
        if message is not None:
            print(f"[Main] Processed message result: {message}")
            # Append each received message to the list
            received_messages.append(message)
            # Exit condition for demonstration purposes
            if message > 20:
                print("[Main] Received a message greater than 20. Exiting.")
                break
    
    # Await the publisher task to ensure all messages are published
    await publisher_task
    
    print("[Main] Subscription ended.")
    
    # Access the list of received messages after exiting the loop
    print(f"[Main] All received messages: {received_messages}")

async def publish_messages():
    # List of values to publish
    test_values = [0, 5, 10, 15, 20]
    
    # Publish messages sequentially
    for value in test_values:
        await asyncio.sleep(1)   # Short delay between publishes
        await add(value, value)  # For example, publish lhs + rhs
        await asyncio.sleep(1)   # Short delay between publishes
    
    print("[Publisher] Published all messages.")

# Execute the main coroutine and the expected final results are: 
# >>> [Main] All received messages: [0, 20, 40]
await main()

2025-01-23 08:41:17,725 - INFO - [MQTTConnection] Stopping the MQTT loop.
2025-01-23 08:41:17,726 - INFO - [MQTTConnection] Stopping the MQTT loop.
2025-01-23 08:41:17,727 - DEBUG - [async subscribe] Connecting to aop63bhe1nwsr-ats.iot.us-east-1.amazonaws.com:8883.
2025-01-23 08:41:17,727 - INFO - Connecting to aop63bhe1nwsr-ats.iot.us-east-1.amazonaws.com:8883...


[Main] Starting MQTT subscription to 'test' topic...
[Main] Publisher task started in the background. Subscriber will process the delayed incoming messages.


2025-01-23 08:41:17,962 - DEBUG - [async subscribe] Subscribed to topic: test
2025-01-23 08:41:18,728 - DEBUG - [async subscribe] Polling... Timeout in 9 seconds.
2025-01-23 08:41:18,729 - DEBUG - [async publish] Preparing to publish to topic test.
2025-01-23 08:41:18,729 - INFO - Connecting to aop63bhe1nwsr-ats.iot.us-east-1.amazonaws.com:8883...
2025-01-23 08:41:18,893 - DEBUG - [async publish] Connected to aop63bhe1nwsr-ats.iot.us-east-1.amazonaws.com:8883
2025-01-23 08:41:18,893 - DEBUG - [publish] Content is not a Pydantic BaseModel.
2025-01-23 08:41:18,894 - INFO - [async publish] Successfully published to test: 0
2025-01-23 08:41:18,935 - DEBUG - [async subscribe] Processed message result: 0


[Subscriber] Received message: 0
[Subscriber] Doubled value: 0
[Main] Processed message result: 0


2025-01-23 08:41:19,936 - DEBUG - [async subscribe] Polling... Timeout in 9 seconds.
2025-01-23 08:41:20,897 - DEBUG - [async publish] Preparing to publish to topic test.
2025-01-23 08:41:20,898 - DEBUG - [publish] Content is not a Pydantic BaseModel.
2025-01-23 08:41:20,899 - INFO - [async publish] Successfully published to test: 10
2025-01-23 08:41:20,938 - DEBUG - [async subscribe] Processed message result: 20


[Subscriber] Received message: 10
[Subscriber] Doubled value: 20
[Main] Processed message result: 20


2025-01-23 08:41:21,939 - DEBUG - [async subscribe] Polling... Timeout in 9 seconds.
2025-01-23 08:41:22,902 - DEBUG - [async publish] Preparing to publish to topic test.
2025-01-23 08:41:22,904 - DEBUG - [publish] Content is not a Pydantic BaseModel.
2025-01-23 08:41:22,906 - INFO - [async publish] Successfully published to test: 20
2025-01-23 08:41:22,941 - DEBUG - [async subscribe] Polling... Timeout in 8 seconds.
2025-01-23 08:41:22,942 - DEBUG - [async subscribe] Processed message result: 40


[Subscriber] Received message: 20
[Subscriber] Doubled value: 40
[Main] Processed message result: 40
[Main] Received a message greater than 20. Exiting.


2025-01-23 08:41:24,910 - DEBUG - [async publish] Preparing to publish to topic test.
2025-01-23 08:41:24,913 - DEBUG - [publish] Content is not a Pydantic BaseModel.
2025-01-23 08:41:24,915 - INFO - [async publish] Successfully published to test: 30
2025-01-23 08:41:24,948 - DEBUG - [async subscribe] Processed message result: 60


[Subscriber] Received message: 30
[Subscriber] Doubled value: 60


2025-01-23 08:41:26,924 - DEBUG - [async publish] Preparing to publish to topic test.
2025-01-23 08:41:26,927 - DEBUG - [publish] Content is not a Pydantic BaseModel.
2025-01-23 08:41:26,929 - INFO - [async publish] Successfully published to test: 40
2025-01-23 08:41:26,967 - DEBUG - [async subscribe] Processed message result: 80


[Subscriber] Received message: 40
[Subscriber] Doubled value: 80
[Publisher] Published all messages.
[Main] Subscription ended.
[Main] All received messages: [0, 20, 40]


In [19]:
# NOTE: If you run the previous cell, any message published after the subscriber is stopped will remain unprocessed in the queue. As a result, when the subscriber is resumed, the stale messages will still be processed. For example, if the subscriber processes numbers and the previously processed results are [60, 80, 2, 4, 6], newly published numbers 1, 2, 3 in separate publishing events would unexpectedly append their processed results to the old results, leading to [60, 80, 2, 4, 6] instead of the expected [2, 4, 6].
logger.info(f"Queue before clearing: {list(double.result_queue._queue)}")

# To address this issue, you must clear the queue explicitly using one of the following approaches before resuming or stopping the subscriber:

# 1. Clear the queue explicitly:
await double.clear_queue()

# 2. Disconnect the subscriber and clear the queue simultaneously:
# await double.disconnect_async(clear_queue=True)

# Simply disconnecting the subscriber without clearing the queue will not remove the residual messages, and they will persist:
# await double.disconnect_async()

# To confirm the contents of the queue at any point, you can inspect it directly:
logger.info(f"Queue after clearing: {list(double.result_queue._queue)}")

2025-01-23 08:41:31,808 - INFO - Queue before clearing: [60, 80]
2025-01-23 08:41:31,809 - DEBUG - [async subscribe] Clearing the queue. Current size: 2
2025-01-23 08:41:31,809 - DEBUG - [async subscribe] Removed item from queue: 60
2025-01-23 08:41:31,810 - DEBUG - [async subscribe] Removed item from queue: 80
2025-01-23 08:41:31,810 - DEBUG - [async subscribe] Queue cleared.
2025-01-23 08:41:31,811 - INFO - Queue after clearing: []


In [None]:
# After running the cleanup method, the result queue should be empty before new subscriber is launched, and the result should be [2, 4, 6] as expected now.
# await double()
# [message async for message in double]

### Example 2.2: Asynchronous Sensor Monitoring and Control

#### Overview
This script demonstrates an asynchronous MQTT-based system for monitoring sensor values and triggering control actions based on predefined thresholds.

#### Process
- **Publish**:
  - Sends simulated sensor values to the MQTT topic **`machine/sensor`**.
  - Calculates and publishes the sum of two numbers as the sensor value.
  - Provides error (`STOP`) or no-error (`CONTINUE`) status messages based on thresholds.

- **Subscribe**:
  - Listens to the **`machine/sensor`** topic for sensor values.
  - Processes incoming messages and evaluates them against defined thresholds.
  - Publishes a **`STOP`** action if the value is out of bounds, halting further processing.
  - Publishes a **`CONTINUE`** action if the value is within bounds, allowing normal operation.

- **Main**:
  - Manages asynchronous publishing and subscribing with `asyncio`.
  - Ensures sensor values are monitored in real-time and appropriate control actions are taken.
  - Ends the subscription after an out-of-bound value triggers a **`STOP`** action.
  

In [20]:
import asyncio
import json
from pycarta.mqtt.publisher import async_publish
from pycarta.mqtt.subscriber import async_subscribe

# Define expected value and variance
mu = 12.34
sigma = 3.45
lower_bound, upper_bound = (mu - 2.0 * sigma, mu + 2.0 * sigma)

if testing_broker["label"] == "mosquitto_local":
    # Define the asynchronous publisher functions
    @async_publish(topic="machine/sensor")
    async def add(lhs, rhs):
        return lhs + rhs

    @async_publish(topic="machine/sensor")
    async def on_error():
        return "STOP"

    @async_publish(topic="machine/sensor")
    async def no_error():
        return "CONTINUE"

    # Define the asynchronous subscriber function to yield sensor value
    @async_subscribe("machine/sensor")
    async def get_sensor_value(x):
        return x
elif testing_broker["label"] == "aimpf_remote_cred":
    # Define the asynchronous publisher functions
    @async_publish(
        topic="machine/sensor",
        host=testing_broker["host"],
        port=testing_broker["port"],
        authenticator=auth)
    async def add(lhs, rhs):
        return lhs + rhs

    @async_publish(
        topic="machine/sensor",
        host=testing_broker["host"],
        port=testing_broker["port"],
        authenticator=auth)
    async def on_error():
        return "STOP"

    @async_publish(
        topic="machine/sensor",
        host=testing_broker["host"],
        port=testing_broker["port"],
        authenticator=auth)
    async def no_error():
        return "CONTINUE"

    # Define the asynchronous subscriber function to yield sensor value
    @async_subscribe(
        topic="machine/sensor",
        host=testing_broker["host"],
        port=testing_broker["port"],
        authenticator=auth)
    async def get_sensor_value(x):
        return x


# Define the main coroutine
async def main():
    print("[Main] Starting MQTT subscription to 'machine/sensor' topic...")
    
    # Initialize a list to store received messages
    received_messages = []
    
    # Start the publisher as a background task
    publisher_task = asyncio.create_task(publish_messages())
    print("[Main] Publisher task started in the background. Subscriber will process the incoming messages.")
    
    # Process incoming messages from the subscriber using async for
    async for x in get_sensor_value:
        if x is not None:
            print(f"[Main] Received sensor value: {x}")
            
            # Check if x can be converted to a number
            try:
                x_float = float(x)
                received_messages.append(x)  # Append each received message to the list
            except ValueError:
                print(f"[Main] Received non-numeric sensor value: {x}. Ignoring.")
                continue

            # Check error condition
            if not lower_bound < x < upper_bound:
                await on_error()
                print(f"[Main] Published 'STOP' action: value out of bounds [{lower_bound:.2f}, {upper_bound:.2f}]. ")
                break
            else:
                await no_error()
                print("[Main] Published 'CONTINUE' action.")
    
    # Await the publisher task to ensure all messages are published
    await publisher_task
    
    print("[Main] Subscription ended.")
    
    # Access the list of received messages after exiting the loop
    print(f"[Main] All received messages: {received_messages}")

# Define the publisher coroutine
async def publish_messages():
    # List of values to publish
    test_values = [7, 9, 11, 13]
    
    # Publish messages sequentially with a 3-second delay between publishes
    for value in test_values:
        await asyncio.sleep(3)   # Short delay to allow the subscriber to establish connection
        result = value + value  # Example: publish lhs + rhs (e.g., 5 + 5 = 10)
        await add(value, value)  # Publish the result to 'machine/sensor'
        print(f"[Publisher] Published sensor value: {result}")
        await asyncio.sleep(2)   # Short delay between publishes
    
    print("[Publisher] Published all messages.")

# Execute the main coroutine and the expected final results are: 
# >>> [Main] All received messages: [14, 18, 22]
# NOTE: After receiving the last number 22 which is actually out of range, the subscriber stops recording messages.
await main()


2025-01-23 08:41:36,155 - DEBUG - [async subscribe] Connecting to aop63bhe1nwsr-ats.iot.us-east-1.amazonaws.com:8883.
2025-01-23 08:41:36,156 - INFO - Connecting to aop63bhe1nwsr-ats.iot.us-east-1.amazonaws.com:8883...


[Main] Starting MQTT subscription to 'machine/sensor' topic...
[Main] Publisher task started in the background. Subscriber will process the incoming messages.


2025-01-23 08:41:36,381 - DEBUG - [async subscribe] Subscribed to topic: machine/sensor
2025-01-23 08:41:37,156 - DEBUG - [async subscribe] Polling... Timeout in 9 seconds.
2025-01-23 08:41:38,158 - DEBUG - [async subscribe] Polling... Timeout in 8 seconds.
2025-01-23 08:41:39,157 - DEBUG - [async publish] Preparing to publish to topic machine/sensor.
2025-01-23 08:41:39,159 - INFO - Connecting to aop63bhe1nwsr-ats.iot.us-east-1.amazonaws.com:8883...
2025-01-23 08:41:39,161 - DEBUG - [async subscribe] Polling... Timeout in 7 seconds.
2025-01-23 08:41:39,388 - DEBUG - [async publish] Connected to aop63bhe1nwsr-ats.iot.us-east-1.amazonaws.com:8883
2025-01-23 08:41:39,388 - DEBUG - [publish] Content is not a Pydantic BaseModel.
2025-01-23 08:41:39,391 - INFO - [async publish] Successfully published to machine/sensor: 14
2025-01-23 08:41:39,433 - DEBUG - [async subscribe] Processed message result: 14
2025-01-23 08:41:39,435 - DEBUG - [async publish] Preparing to publish to topic machine/se

[Publisher] Published sensor value: 14
[Main] Received sensor value: 14


2025-01-23 08:41:39,615 - DEBUG - [async publish] Connected to aop63bhe1nwsr-ats.iot.us-east-1.amazonaws.com:8883
2025-01-23 08:41:39,617 - DEBUG - [publish] Content is not a Pydantic BaseModel.
2025-01-23 08:41:39,622 - INFO - [async publish] Successfully published to machine/sensor: "CONTINUE"
2025-01-23 08:41:39,659 - DEBUG - [async subscribe] Processed message result: CONTINUE


[Main] Published 'CONTINUE' action.
[Main] Received sensor value: CONTINUE
[Main] Received non-numeric sensor value: CONTINUE. Ignoring.


2025-01-23 08:41:40,660 - DEBUG - [async subscribe] Polling... Timeout in 9 seconds.
2025-01-23 08:41:41,663 - DEBUG - [async subscribe] Polling... Timeout in 8 seconds.
2025-01-23 08:41:42,666 - DEBUG - [async subscribe] Polling... Timeout in 7 seconds.
2025-01-23 08:41:43,668 - DEBUG - [async subscribe] Polling... Timeout in 6 seconds.
2025-01-23 08:41:44,394 - DEBUG - [async publish] Preparing to publish to topic machine/sensor.
2025-01-23 08:41:44,396 - DEBUG - [publish] Content is not a Pydantic BaseModel.
2025-01-23 08:41:44,399 - INFO - [async publish] Successfully published to machine/sensor: 18
2025-01-23 08:41:44,436 - DEBUG - [async subscribe] Processed message result: 18
2025-01-23 08:41:44,437 - DEBUG - [async publish] Preparing to publish to topic machine/sensor.
2025-01-23 08:41:44,437 - DEBUG - [publish] Content is not a Pydantic BaseModel.
2025-01-23 08:41:44,438 - INFO - [async publish] Successfully published to machine/sensor: "CONTINUE"
2025-01-23 08:41:44,476 - DEB

[Publisher] Published sensor value: 18
[Main] Received sensor value: 18
[Main] Published 'CONTINUE' action.
[Main] Received sensor value: CONTINUE
[Main] Received non-numeric sensor value: CONTINUE. Ignoring.


2025-01-23 08:41:45,478 - DEBUG - [async subscribe] Polling... Timeout in 9 seconds.
2025-01-23 08:41:46,482 - DEBUG - [async subscribe] Polling... Timeout in 8 seconds.
2025-01-23 08:41:47,486 - DEBUG - [async subscribe] Polling... Timeout in 7 seconds.
2025-01-23 08:41:48,489 - DEBUG - [async subscribe] Polling... Timeout in 6 seconds.
2025-01-23 08:41:49,402 - DEBUG - [async publish] Preparing to publish to topic machine/sensor.
2025-01-23 08:41:49,403 - DEBUG - [publish] Content is not a Pydantic BaseModel.
2025-01-23 08:41:49,404 - INFO - [async publish] Successfully published to machine/sensor: 22
2025-01-23 08:41:49,443 - DEBUG - [async subscribe] Processed message result: 22
2025-01-23 08:41:49,444 - DEBUG - [async publish] Preparing to publish to topic machine/sensor.
2025-01-23 08:41:49,444 - INFO - Connecting to aop63bhe1nwsr-ats.iot.us-east-1.amazonaws.com:8883...


[Publisher] Published sensor value: 22
[Main] Received sensor value: 22


2025-01-23 08:41:49,659 - DEBUG - [async publish] Connected to aop63bhe1nwsr-ats.iot.us-east-1.amazonaws.com:8883
2025-01-23 08:41:49,659 - DEBUG - [publish] Content is not a Pydantic BaseModel.
2025-01-23 08:41:49,660 - INFO - [async publish] Successfully published to machine/sensor: "STOP"
2025-01-23 08:41:49,698 - DEBUG - [async subscribe] Processed message result: STOP


[Main] Published 'STOP' action: value out of bounds [5.44, 19.24]. 


2025-01-23 08:41:54,406 - DEBUG - [async publish] Preparing to publish to topic machine/sensor.
2025-01-23 08:41:54,408 - DEBUG - [publish] Content is not a Pydantic BaseModel.
2025-01-23 08:41:54,409 - INFO - [async publish] Successfully published to machine/sensor: 26
2025-01-23 08:41:54,456 - DEBUG - [async subscribe] Processed message result: 26


[Publisher] Published sensor value: 26
[Publisher] Published all messages.
[Main] Subscription ended.
[Main] All received messages: [14, 18, 22]
