In [1]:
# mqtt_env_gymnasium.py
import gymnasium as gym
import numpy as np
import paho.mqtt.client as mqtt
import threading
import queue
import time
import json

In [2]:
class MQTTFanEnv(gym.Env):
    def __init__(self, broker_ip="10.0.0.194"):
        super().__init__()

        self.broker_ip = broker_ip

        self.observation_space = gym.spaces.Box(low=np.array([1.0, 0.0]), high=np.array([5.0, 1.0]), dtype=np.float32)
        self.action_space = gym.spaces.Discrete(3)  # 0=decrease, 1=nothing, 2=increase

        self.state_queue = queue.Queue()
        self.last_state = np.array([3.0, 0.5], dtype=np.float32)  # Fallback state

        # Setup MQTT
        self.client = mqtt.Client()
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        self.client.connect(self.broker_ip)
        self.client.loop_start()

    def on_connect(self, client, userdata, flags, rc):
        print("Connected to MQTT broker with code", rc)
        client.subscribe("sensor/data")

    def on_message(self, client, userdata, msg):
        try:
            payload = json.loads(msg.payload.decode())
            aqi = float(payload.get("aqi", 3))
            fan_speed = float(payload.get("fan_speed", 0.5))
            state = np.array([aqi, fan_speed], dtype=np.float32)
            self.last_state = state
            self.state_queue.put(state)
        except Exception as e:
            print("MQTT message error:", e)

    def send_action(self, action_idx):
        action_map = ["decrease", "nothing", "increase"]
        action_payload = {"action": action_map[action_idx]}
        self.client.publish("agent/action", json.dumps(action_payload))

    def wait_for_state(self, timeout=5.0):
        try:
            return self.state_queue.get(timeout=timeout)
        except queue.Empty:
            print("Sensor timeout. Using last known state.")
            return self.last_state

    def reset(self, *, seed=None, options=None):
        super().reset(seed=seed)
        print("Waiting for initial state...")
        obs = self.wait_for_state()
        return obs, {}

    def step(self, action):
        self.send_action(action)
        time.sleep(1.0)  # Allow hardware to respond
        obs = self.wait_for_state()

        aqi, fan_speed = obs
        reward = -aqi + (1 - fan_speed)  # Encourage low AQI and low energy use

        terminated = False  # Environment never terminates naturally
        truncated = False
        return obs, reward, terminated, truncated, {}

    def close(self):
        self.client.loop_stop()
        self.client.disconnect()


In [3]:
# train_real_ppo_gymnasium.py
from stable_baselines3 import PPO
env = MQTTFanEnv()
model = PPO("MlpPolicy", env, verbose=1)
model.learn(total_timesteps=10000)
model.save("ppo_fan_real_gymnasium")


  gym.logger.warn(
  gym.logger.warn(
  self.client = mqtt.Client()


Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Connected to MQTT broker with code 0
Waiting for initial state...
Sensor timeout. Using last known state.
Sensor timeout. Using last known state.
Sensor timeout. Using last known state.


KeyboardInterrupt: 