# B9AI105 REINFORCEMENT LEARNING - CA_ONE_(60%)

Name: Naeem ul Hassan

Student Number: 20054701

Email: 20054701@mydbs.ie

Course: B9AI105 REINFORCEMENT LEARNING (B9AI105_2425_TMD3)


Assessment Type: CA_ONE_(60%)

Assignment: Deep Reinforcement Learning Concepts Project

Due: Monday, 14 July 2025, 11:59 PM

Submission Date: 12 July 2025

# Custom HVAC Control with Double DQN and Policy‑Gradient + Baseline  
A hands‑on companion notebook for the research report “**Comparative Study of Double Deep Q‑Network and Policy‑Gradient‑with‑Baseline Controllers for Building HVAC Management**”.


## 1  Environment Setup 

In [16]:
# Environment Setup
# We import all required libraries _once_ here so the rest of the notebook
# remains tidy.  Everything below should run on Python 3.9+ with
# PyTorch 2.x and Gymnasium 0.29+.

import math
import random
from collections import deque
from dataclasses import dataclass
from typing import Tuple, List

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

import gymnasium as gym
from gymnasium import spaces

import matplotlib.pyplot as plt
from IPython.display import display, clear_output

# Set a random seed for reproducibility
# This ensures that the random numbers generated are the same each time
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
from __future__ import annotations   # enables `X | None` in 3.8/3.9

## 2 HVAC Environment Definition 

Instead of using a complex EnergyPlus simulation (which can be slow and difficult to set up), we created a simplified thermal model that captures the essential physics of building heating and cooling.
Our model represents a two-zone office building where:
Physical Setup:

Each zone (like an office room) stores heat like a thermal battery with capacity C
Heat flows in and out through the walls and windows based on conductance U
An air handling unit (AHU) supplies conditioned air to maintain comfort

**Control Actions:**

Discrete action: Set the supply air temperature (16°C to 22°C in 1°C steps)
Continuous action: Adjust damper position (0-100%) to control airflow rate

**Reward System:**
The system tries to minimize two competing objectives:

Energy cost: Electricity used by chillers and fans
Comfort penalty: Applied when occupants feel too hot or cold (measured by PMV - Predicted Mean Vote)

The goal is to keep people comfortable while using as little energy as possible.

In [17]:
class HVACEnv(gym.Env):
    """A minimal two‑zone HVAC environment.

    State (np.array, dtype=float32, shape=(10,)):
        [0] outdoor_temp               (°C)
        [1] outdoor_rh                 (%)
        [2] zone0_temp                 (°C)
        [3] zone0_rh                   (%)
        [4] zone1_temp                 (°C)
        [5] zone1_rh                   (%)
        [6] occupancy_flag             {0,1}
        [7] supply_air_temp_setpoint   (°C)
        [8] damper_position            (0‑1)
        [9] time_of_day                (sin encoded)

    Actions (Tuple):
        * Discrete(7): supply‑air temp ∈ {16,…,22} °C
        * Box(low=0, high=1, shape=(1,)): damper (0=closed, 1=open)
    """

    metadata = {"render_modes": ["human"]}

    def __init__(self, ep_minutes: int = 24 * 60, step_minutes: int = 15):
        super().__init__()
        self.ep_steps = ep_minutes // step_minutes
        self.dt = step_minutes * 60  # seconds
        
        # --- action & observation spaces
        self.action_space = spaces.Tuple(
            (spaces.Discrete(7), spaces.Box(low=0.0, high=1.0, shape=(1,), dtype=np.float32))
        )
        low_state = np.array([-40, 0, 0, 0, 0, 0, 0, 16, 0, -1], dtype=np.float32)
        high_state = np.array([50, 100, 50, 100, 50, 100, 1, 22, 1, 1], dtype=np.float32)
        self.observation_space = spaces.Box(low_state, high_state, dtype=np.float32)

        # --- thermal parameters (realistic values)
        self.C_zone = [1.5e6, 1.2e6]  # J/K for zones 0 and 1
        self.U_facade = [220.0, 180.0]  # W/K for zones 0 and 1
        self.zone_areas = [100.0, 80.0]  # m² for zones 0 and 1
        self.internal_gains = [500.0, 300.0]  # W for zones 0 and 1
        
        self.C_p_air = 1012.0   # J/(kg·K)
        self.m_flow_nom = 0.8   # kg/s @ damper=1
        self.fan_power_coef = 0.8  # kW/(kg/s)^3 (cubic fan law)
        self.chiller_COP = 3.0

        self.outdoor_temp_profile = self._gen_outdoor_temp_profile()

        self.state: np.ndarray | None = None
        self._step_count = 0

    def reset(self, *, seed: int = None, options: dict | None = None):
        super().reset(seed=seed)
        self._step_count = 0
        
        # Initialize zones near setpoint with small random variation
        z0 = 22.0 + np.random.randn() * 0.5
        z1 = 22.0 + np.random.randn() * 0.5
        rh0 = rh1 = 50.0
        occ = 1.0
        sat = 18.0
        damper = 0.5
        tod = 0.0
        
        self.state = np.array(
            [self.outdoor_temp_profile[0], 60.0, z0, rh0, z1, rh1, occ, sat, damper, tod], 
            dtype=np.float32
        )
        return self.state.copy(), {}

    def step(self, action: Tuple[int, np.ndarray]):
        sat_choice, damper = action
        damper = float(damper[0])
        sat_temp = float(sat_choice + 16)  # Convert 0-6 to 16-22°C

        # Unpack state
        (outdoor_temp, outdoor_rh, z0_temp, z0_rh, z1_temp, z1_rh, 
         occ, _, _, tod) = self.state

        # Simulate thermal response for each zone
        m_dot = self.m_flow_nom * damper
        delta_z0 = self._thermal_delta(z0_temp, outdoor_temp, sat_temp, m_dot, zone_id=0)
        delta_z1 = self._thermal_delta(z1_temp, outdoor_temp, sat_temp, m_dot, zone_id=1)

        z0_temp += delta_z0
        z1_temp += delta_z1
        
        # Apply realistic temperature bounds
        z0_temp = np.clip(z0_temp, 10.0, 40.0)
        z1_temp = np.clip(z1_temp, 10.0, 40.0)

        # Corrected energy calculations
        avg_zone_temp = (z0_temp + z1_temp) / 2
        temp_diff = max(0.0, avg_zone_temp - sat_temp)  # Cooling load
        chiller_load_kw = m_dot * self.C_p_air * temp_diff / 1e3
        chiller_energy_kwh = chiller_load_kw / self.chiller_COP * (self.dt / 3600)
        fan_energy_kwh = (self.fan_power_coef * m_dot**3) * (self.dt / 3600)
        total_energy = chiller_energy_kwh + fan_energy_kwh

        # Improved comfort calculation
        comfort_penalty = 0.0
        for zone_temp in (z0_temp, z1_temp):
            comfort_penalty += self._calculate_pmv_penalty(zone_temp)

        # Balanced reward function
        energy_penalty = total_energy * 100  # Scale energy appropriately
        comfort_penalty_scaled = comfort_penalty * 10
        reward = -(energy_penalty + comfort_penalty_scaled)

        # Advance time
        self._step_count += 1
        done = (self._step_count >= self.ep_steps or 
                comfort_penalty > 10 or 
                any(temp < 15 or temp > 30 for temp in (z0_temp, z1_temp)))

        # Update state
        tod_angle = 2 * math.pi * (self._step_count / self.ep_steps)
        outdoor_idx = self._step_count % len(self.outdoor_temp_profile)
        
        self.state = np.array([
            self.outdoor_temp_profile[outdoor_idx],
            outdoor_rh,
            z0_temp,
            z0_rh,
            z1_temp,
            z1_rh,
            occ,
            sat_temp,
            damper,
            math.sin(tod_angle),
        ], dtype=np.float32)

        info = {
            "energy_kwh": total_energy,
            "comfort_penalty": comfort_penalty,
            "chiller_energy": chiller_energy_kwh,
            "fan_energy": fan_energy_kwh,
            "zone_temps": [z0_temp, z1_temp],
            "outdoor_temp": self.outdoor_temp_profile[outdoor_idx]
        }

        return self.state.copy(), reward, done, False, info

    def _thermal_delta(self, zone_t, outdoor_t, supply_t, m_dot, zone_id=0):
        """Calculate temperature change for a zone"""
        # Zone-specific parameters
        zone_area = self.zone_areas[zone_id]
        internal_gains = self.internal_gains[zone_id] if self.state[6] > 0 else 0
        
        # Heat transfer calculations
        q_env = self.U_facade[zone_id] * (outdoor_t - zone_t)
        q_supply = m_dot * self.C_p_air * (supply_t - zone_t) * 0.5  # Split airflow
        q_internal = internal_gains
        
        # Temperature change
        delta_T = (q_env + q_supply + q_internal) / self.C_zone[zone_id] * self.dt
        return delta_T

    def _calculate_pmv_penalty(self, zone_temp):
        """Calculate comfort penalty based on zone temperature"""
        if 20 <= zone_temp <= 24:
            return 0.0
        elif zone_temp < 18 or zone_temp > 28:
            return 10.0  # Severe discomfort
        else:
            return abs(zone_temp - 22) * 2.0  # Moderate discomfort

    @staticmethod
    def _gen_outdoor_temp_profile():
        """Generate realistic daily temperature profile"""
        # 24-hour temperature profile: cooler at night, warmer during day
        hours = np.linspace(0, 24, 96, endpoint=False)
        base_temp = 20
        daily_variation = 8 * np.sin(2 * np.pi * (hours - 6) / 24)  # Peak at 2 PM
        return base_temp + daily_variation

    def render(self):
        if self.state is not None:
            print(f"Step {self._step_count}: "
                  f"Zone0={self.state[2]:.1f}°C, Zone1={self.state[4]:.1f}°C, "
                  f"Outdoor={self.state[0]:.1f}°C, SAT={self.state[7]:.1f}°C")

## 3  Utility Classes & Helpers
Below we implement a generic **ReplayBuffer** and some tiny neural‑network
builder helpers to keep the agent classes compact.

In [18]:
@dataclass
class Transition:
    state: np.ndarray
    action: np.ndarray
    reward: float
    next_state: np.ndarray
    done: bool


class ReplayBuffer:
    """Fixed-size cyclic buffer for off-policy algorithms (Double DQN)."""
    
    def __init__(self, capacity: int = 50_000):
        self.capacity = capacity
        self.buffer: deque = deque(maxlen=capacity)
    
    def push(self, state: np.ndarray, action: np.ndarray, reward: float, 
             next_state: np.ndarray, done: bool):
        """Add a transition to the buffer."""
        self.buffer.append(Transition(state, action, reward, next_state, done))
    
    def sample(self, batch_size: int = 64) -> Tuple[torch.Tensor, ...]:
        """Sample a batch of transitions."""
        if len(self.buffer) < batch_size:
            raise ValueError(f"Buffer size {len(self.buffer)} < batch_size {batch_size}")
        
        batch = random.sample(list(self.buffer), batch_size)
        states = torch.tensor(np.stack([t.state for t in batch]), dtype=torch.float32)
        actions = torch.tensor(np.stack([t.action for t in batch]), dtype=torch.int64)
        rewards = torch.tensor([t.reward for t in batch], dtype=torch.float32)
        next_states = torch.tensor(np.stack([t.next_state for t in batch]), dtype=torch.float32)
        dones = torch.tensor([t.done for t in batch], dtype=torch.float32)
        return states, actions, rewards, next_states, dones
    
    def __len__(self) -> int:
        return len(self.buffer)


def mlp(input_dim: int, output_dim: int, hidden: int = 128) -> nn.Sequential:
    """Return a 3-layer MLP with ReLU activations."""
    return nn.Sequential(
        nn.Linear(input_dim, hidden),
        nn.ReLU(),
        nn.Linear(hidden, hidden),
        nn.ReLU(),
        nn.Linear(hidden, output_dim),
    )

## 4  Double DQN Agent 

In [19]:
class DoubleDQNAgent:
    """Double Deep Q‑Network implementation."""

    def __init__(
        self,
        env: gym.Env,
        gamma: float = 0.995,
        lr: float = 1e-3,
        tau: float = 0.005,
        epsilon_start: float = 1.0,
        epsilon_end: float = 0.05,
        epsilon_decay: int = 20_000,
        buffer_capacity: int = 50_000,
        batch_size: int = 64,
        device: str | torch.device = "cpu",
    ):
        self.env = env
        self.gamma = gamma
        self.tau = tau
        self.batch_size = batch_size
        self.device = torch.device(device)

        obs_dim = env.observation_space.shape[0]
        # Discrete branch only (supply‑temp); continuous action sampled separately
        act_dim = env.action_space[0].n

        self.online_net = mlp(obs_dim, act_dim).to(self.device)
        self.target_net = mlp(obs_dim, act_dim).to(self.device)
        self.target_net.load_state_dict(self.online_net.state_dict())

        self.optimizer = optim.Adam(self.online_net.parameters(), lr=lr)
        self.replay = ReplayBuffer(buffer_capacity)

        # Epsilon‑greedy parameters
        self.epsilon = epsilon_start
        self.epsilon_end = epsilon_end
        self.epsilon_decay = epsilon_decay
        self.steps_done = 0

    # ---------------------- interaction & learning --------------------- #
    def select_action(self, state: np.ndarray, damper_cont: float):
        """Return (discrete_supply_temp, damper_position)."""
        if random.random() < self.epsilon:
            temp_action = self.env.action_space[0].sample()
        else:
            state_t = torch.tensor(state, dtype=torch.float32, device=self.device).unsqueeze(0)
            with torch.no_grad():
                q_values = self.online_net(state_t)
            temp_action = int(torch.argmax(q_values).item())
        return temp_action, np.array([damper_cont], dtype=np.float32)

    def update_epsilon(self):
        self.epsilon = max(
            self.epsilon_end, self.epsilon - (1.0 - self.epsilon_end) / self.epsilon_decay
        )

    def soft_update(self):
        for tgt, src in zip(self.target_net.parameters(), self.online_net.parameters()):
            tgt.data.copy_(tgt.data * (1.0 - self.tau) + src.data * self.tau)

    def learn(self):
        if len(self.replay) < self.batch_size:
            return

        states, actions, rewards, next_states, dones = self.replay.sample(self.batch_size)
        states = states.to(self.device)
        next_states = next_states.to(self.device)
        actions = actions[:, 0].unsqueeze(1).to(self.device)  # discrete branch
        rewards = rewards.to(self.device)
        dones = dones.to(self.device)

        # Compute Double DQN targets
        next_q_online = self.online_net(next_states)
        next_actions = torch.argmax(next_q_online, dim=1, keepdim=True)
        next_q_target = self.target_net(next_states).gather(1, next_actions).squeeze(1)
        targets = rewards + self.gamma * next_q_target * (1.0 - dones)

        q_vals = self.online_net(states).gather(1, actions).squeeze(1)
        loss = F.mse_loss(q_vals, targets.detach())

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        self.soft_update()

    # ----------------------------- training ---------------------------- #
    def train(self, episodes: int = 50):
        reward_history = []
        for ep in range(episodes):
            state, _ = self.env.reset()
            ep_reward = 0.0
            done = False
            while not done:
                # Damper continuous action sampled from simple heuristic
                damper = 0.7  # could also be random or learned separately
                action = self.select_action(state, damper)
                next_state, reward, done, _, info = self.env.step(action)
                self.replay.push(state, np.array(action), reward, next_state, done)
                state = next_state
                ep_reward += reward

                self.learn()
                self.update_epsilon()
                self.steps_done += 1

            reward_history.append(ep_reward)
            if ep % 10 == 0:
                print(f"Episode {ep:03d} | Reward: {ep_reward:8.2f} | ε={self.epsilon:.3f}")

        return reward_history

## 5  Policy‑Gradient + Baseline Agent

In [20]:
class PolicyGradientBaselineAgent:
    """REINFORCE with value baseline (a tiny actor‑critic)."""

    def __init__(
        self,
        env: gym.Env,
        gamma: float = 0.99,
        lr: float = 3e-4,
        entropy_coef: float = 0.01,
        device: str | torch.device = "cpu",
    ):
        self.env = env
        self.gamma = gamma
        self.entropy_coef = entropy_coef
        self.device = torch.device(device)

        obs_dim = env.observation_space.shape[0]
        act_dim = env.action_space[0].n  # discrete temp setpoints

        self.actor = mlp(obs_dim, act_dim).to(self.device)
        self.critic = mlp(obs_dim, 1).to(self.device)

        self.actor_opt = optim.Adam(self.actor.parameters(), lr=lr)
        self.critic_opt = optim.Adam(self.critic.parameters(), lr=lr)

    # -------------------------- utils -------------------------- #
    def _compute_returns(self, rewards: List[float]) -> torch.Tensor:
        R = 0.0
        returns = []
        for r in reversed(rewards):
            R = r + self.gamma * R
            returns.insert(0, R)
        return torch.tensor(returns, dtype=torch.float32, device=self.device)

    # ----------------------- training loop --------------------- #
    def train(self, episodes: int = 50, batch_size: int = 5):
        ep_rewards = []
        all_lengths = []
        batch_states, batch_actions, batch_log_probs, batch_rewards = [], [], [], []

        for ep in range(episodes):
            state, _ = self.env.reset()
            done = False
            ep_reward = 0.0
            while not done:
                # Damper continuous action follows stochastic Beta(2,2) for exploration
                damper = np.random.beta(2, 2)
                state_t = torch.tensor(state, dtype=torch.float32, device=self.device)
                logits = self.actor(state_t)
                dist = torch.distributions.Categorical(logits=logits)
                temp_action = dist.sample()
                log_prob = dist.log_prob(temp_action)

                action = (int(temp_action.item()) + 16, np.array([damper], dtype=np.float32))
                next_state, reward, done, _, info = self.env.step(action)

                batch_states.append(state_t)
                batch_actions.append(temp_action)
                batch_log_probs.append(log_prob)
                batch_rewards.append(reward)

                state = next_state
                ep_reward += reward

            ep_rewards.append(ep_reward)
            all_lengths.append(len(batch_rewards))

            # Update after every `batch_size` episodes
            if (ep + 1) % batch_size == 0:
                self._update_policy(batch_states, batch_actions, batch_log_probs, batch_rewards)
                batch_states, batch_actions, batch_log_probs, batch_rewards = [], [], [], []

            if ep % 10 == 0:
                print(f"Episode {ep:03d} | Reward: {ep_reward:8.2f}")

        return ep_rewards

    # ----------------------- policy update --------------------- #
    def _update_policy(self, states, actions, log_probs, rewards):
        returns = self._compute_returns(rewards)
        returns = (returns - returns.mean()) / (returns.std() + 1e-8)

        # Convert lists to tensors
        log_probs = torch.stack(log_probs).to(self.device)
        states_tensor = torch.stack(states).to(self.device)
        actions_tensor = torch.stack(actions).to(self.device)

        # Critic loss
        values = self.critic(states_tensor).squeeze(1)
        critic_loss = F.mse_loss(values, returns)

        # Actor loss (with baseline)
        advantages = returns - values.detach()
        actor_loss = -(log_probs * advantages).mean() - self.entropy_coef * torch.distributions.Categorical(logits=self.actor(states_tensor)).entropy().mean()

        # backward
        self.actor_opt.zero_grad()
        actor_loss.backward()
        nn.utils.clip_grad_norm_(self.actor.parameters(), 0.5)
        self.actor_opt.step()

        self.critic_opt.zero_grad()
        critic_loss.backward()
        self.critic_opt.step()


## 6  Training of our models
We will train each agent for **50 episodes** (≈ 50 days) which is enough
to illustrate learning curves yet short enough to run on CPU in under
5 minutes.


In [21]:
env_dqn = HVACEnv()
env_pg = HVACEnv()

dqn_agent = DoubleDQNAgent(env_dqn, device="cpu")
pg_agent = PolicyGradientBaselineAgent(env_pg, device="cpu")

print("\n🎮 Training Double DQN ...")
dqn_rewards = dqn_agent.train(episodes=50)

print("\n🎮 Training Policy‑Gradient + Baseline ...")
pg_rewards = pg_agent.train(episodes=50)


🎮 Training Double DQN ...


ValueError: setting an array element with a sequence. The requested array has an inhomogeneous shape after 1 dimensions. The detected shape was (2,) + inhomogeneous part.