In [None]:
import numpy as np
import tensorflow as tf
from time import sleep
import random

# Define the environment class
class RoomEnvironment:
    def __init__(self):
        self.light_intensity = 0  # Initial light intensity
        self.outside_temperature = 25  # Initial outside temperature
        self.room_temperature = 25  # Initial room temperature
        self.fan_speed = 0  # Initial fan speed
        self.ac_temperature = 25  # Initial air conditioner temperature
        self.is_person_present = False  # Whether someone is in the room or not

    def update_state(self, light_intensity, outside_temperature, room_temperature, fan_speed, ac_temperature, is_person_present):
        self.light_intensity = light_intensity
        self.outside_temperature = outside_temperature
        self.room_temperature = room_temperature
        self.fan_speed = fan_speed
        self.ac_temperature = ac_temperature
        self.is_person_present = is_person_present

    def get_state(self):
        return (self.light_intensity, self.outside_temperature, self.room_temperature, self.fan_speed, self.ac_temperature, self.is_person_present)

    def take_action(self, action):
        # Apply the action and update the environment state
        if action == 0:  # Decrease AC temperature
            self.ac_temperature -= 1
        elif action == 1:  # Increase AC temperature
            self.ac_temperature += 1
        elif action == 2:  # Decrease fan speed
            self.fan_speed -= 1
        elif action == 3:  # Increase fan speed
            self.fan_speed += 1

        # Only update room temperature if no person is present
        if not self.is_person_present:
            self.room_temperature += (self.fan_speed - 1) * 0.5 + (self.ac_temperature - 25) * 0.1
            # Ensure that room temperature is within a reasonable range (e.g., between 18 and 28 degrees)
            self.room_temperature = max(18, min(28, self.room_temperature))

        if abs(self.room_temperature - 22) <= 1:
            reward = 5
        else:
            reward = -1
        
        return reward

# Define the ReplayBuffer class
class ReplayBuffer:
    def __init__(self, buffer_size):
        self.buffer_size = buffer_size
        self.buffer = []

    def add_experience(self, experience):
        if len(self.buffer) >= self.buffer_size:
            self.buffer.pop(0)  # Remove oldest experience if the buffer is full
        self.buffer.append(experience)

    def sample_batch(self, batch_size):
        return random.sample(self.buffer, min(batch_size, len(self.buffer)))

DQN Agent1

In [None]:
# Define the DQN agent class
class DQNAgent:
    def __init__(self, state_space, action_space, learning_rate, discount_factor, exploration_rate, min_exploration_rate, exploration_decay, replay_buffer_size, batch_size):
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.exploration_rate = exploration_rate
        self.min_exploration_rate = min_exploration_rate
        self.exploration_decay = exploration_decay
        self.state_space = state_space
        self.action_space = action_space
        self.replay_buffer = ReplayBuffer(replay_buffer_size)
        self.batch_size = batch_size
        
        # Flatten state space size
        self.state_size = len(state_space)
        
        # Create the Q-network
        self.q_network = tf.keras.Sequential([
                tf.keras.layers.Dense(450, input_shape=(6,), activation='relu'),
                tf.keras.layers.Dense(256, activation='relu'),
                tf.keras.layers.Dense(128, activation='relu'),
                tf.keras.layers.Dense(len(action_space), activation='linear')
                ])
        self.q_network.compile(optimizer=tf.optimizers.Adam(learning_rate=learning_rate), loss='mse')

    def get_action(self, state):
        if np.random.rand() < self.exploration_rate:
            return np.random.choice(self.action_space)
        else:
            state_input = np.array(state).reshape(1, -1)
            q_values = self.q_network.predict(state_input)[0]
            return np.argmax(q_values)

    def update_q_network(self, state, action, reward, next_state):
        experience = (state, action, reward, next_state)
        self.replay_buffer.add_experience(experience)

        if len(self.replay_buffer.buffer) >= self.batch_size:
            batch = self.replay_buffer.sample_batch(self.batch_size)

            states, actions, rewards, next_states = zip(*batch)
            states = np.array(states)
            next_states = np.array(next_states)

            targets = rewards + self.discount_factor * np.max(self.q_network.predict(next_states), axis=1)
            q_values = self.q_network.predict(states)
            for i, a in enumerate(actions):
                q_values[i][a] = targets[i]

            self.q_network.fit(states, q_values, verbose=0)

    def decay_exploration(self):
        self.exploration_rate = max(self.min_exploration_rate, self.exploration_rate * self.exploration_decay)

# Define the state space, action space, and initialize the environment and the agent
state_space = [
    (
        light_intensity,
        outside_temperature,
        room_temperature,
        fan_speed,
        ac_temperature,
        is_person_present
    )
    for light_intensity in range(11)
    for outside_temperature in range(15, 36)
    for room_temperature in range(15, 36)
    for fan_speed in range(6)
    for ac_temperature in range(15, 36)
    for is_person_present in [False, True]
]

action_space = [0, 1, 2, 3]  # Decrease AC temperature, increase AC temperature, decrease fan speed, increase fan speed
environment = RoomEnvironment()

agent1 = DQNAgent(state_space, action_space, learning_rate=0.001, discount_factor=0.9, exploration_rate=1.0, min_exploration_rate=0.1, exploration_decay=0.995, replay_buffer_size=1000, batch_size=32)
agent2 = DQNAgent(state_space, action_space, learning_rate=0.01, discount_factor=0.9, exploration_rate=1.5, min_exploration_rate=0.1, exploration_decay=0.995, replay_buffer_size=1000, batch_size=64)
agent3 = DQNAgent(state_space, action_space, learning_rate=0.01, discount_factor=0.9, exploration_rate=1.5, min_exploration_rate=0.1, exploration_decay=0.995, replay_buffer_size=2000, batch_size=64)

In [None]:
rewards_per_episode1 = []  # List to store rewards per episode
mae_loss_per_episode1 = []  # List to store MAE loss per episode

# Training loop
num_episodes = 10
for episode in range(num_episodes):  # 10 episodes
    state = environment.get_state()
    episode_exploration_rate = []
    total_reward = 0
    total_mae_loss = 0
    
    for step in range(100):  # 100 steps per episode
        action = agent1.get_action(state)
        reward = environment.take_action(action)
        next_state = environment.get_state()
        agent1.update_q_network(state, action, reward, next_state)
        
        # Calculate MAE loss and accumulate for the episode
        state_input = np.array(state).reshape(1, -1)
        target = np.array([reward + agent1.discount_factor * np.max(agent1.q_network.predict(np.array([next_state])))])

        mae_loss = np.mean(np.abs(target - agent1.q_network.predict(state_input)))
        total_mae_loss += mae_loss
        
        state = next_state
        agent1.decay_exploration()
        total_reward += reward
        sleep(0.0001)
        print('Episode:', episode, 'Step:', step, 'Action:', action, 'Reward:', reward, 'Exploration rate:', agent1.exploration_rate, end='\r')
        
    rewards_per_episode1.append(total_reward)
    mae_loss_per_episode1.append(total_mae_loss/100)
    print("Total reward:", total_reward, "MAE Loss:", total_mae_loss, "after training loop", "Episode", episode, end='\r')
    

DQN Agent2

In [None]:
rewards_per_episode2 = []  # List to store rewards per episode
mae_loss_per_episode2 = []  # List to store MAE loss per episode

# Training loop
for episode in range(num_episodes):  # 10 episodes
    state = environment.get_state()
    episode_exploration_rate = []
    total_reward = 0
    total_mae_loss = 0
    
    for step in range(100):  # 100 steps per episode
        action = agent2.get_action(state)
        reward = environment.take_action(action)
        next_state = environment.get_state()
        agent2.update_q_network(state, action, reward, next_state)
        
        # Calculate MAE loss and accumulate for the episode
        state_input = np.array(state).reshape(1, -1)
        target = np.array([reward + agent1.discount_factor * np.max(agent1.q_network.predict(np.array([next_state])))])

        mae_loss = np.mean(np.abs(target - agent1.q_network.predict(state_input)))/10
        total_mae_loss += mae_loss
        
        state = next_state
        agent2.decay_exploration()
        total_reward += reward
        sleep(0.0001)
        print('Episode:', episode, 'Step:', step, 'Action:', action, 'Reward:', reward, 'Exploration rate:', agent2.exploration_rate, end='\r')
        
    rewards_per_episode2.append(total_reward)
    mae_loss_per_episode2.append(total_mae_loss)
    print("Total reward:", total_reward, "MAE Loss:", total_mae_loss, "after training loop", "Episode", episode, end='\r')

DQN Agent3

In [None]:
rewards_per_episode3 = []  # List to store rewards per episode
mae_loss_per_episode3 = []  # List to store MAE loss per episode

# Training loop
for episode in range(10):  # 10 episodes
    state = environment.get_state()
    episode_exploration_rate = []
    total_reward = 0
    total_mae_loss = 0
    
    for step in range(100):  # 100 steps per episode
        action = agent3.get_action(state)
        reward = environment.take_action(action)
        next_state = environment.get_state()
        agent3.update_q_network(state, action, reward, next_state)
        
        # Calculate MAE loss and accumulate for the episode
        state_input = np.array(state).reshape(1, -1)
        target = np.array([reward + agent1.discount_factor * np.max(agent1.q_network.predict(np.array([next_state])))])

        mae_loss = np.mean(np.abs(target - agent1.q_network.predict(state_input)))/10
        total_mae_loss += mae_loss
        
        state = next_state
        agent3.decay_exploration()
        total_reward += reward
        sleep(0.0001)
        print('Episode:', episode, 'Step:', step, 'Action:', action, 'Reward:', reward, 'Exploration rate:', agent3.exploration_rate, end='\r')
        
    rewards_per_episode3.append(total_reward)
    mae_loss_per_episode3.append(total_mae_loss)
    print("Total reward:", total_reward, "MAE Loss:", total_mae_loss, "after training loop", "Episode", episode, end='\r')

In [None]:
# Plotting all three agents of rewards per episode
import matplotlib.pyplot as plt

plt.plot(rewards_per_episode1, label='agent1', color='red')
plt.plot(rewards_per_episode2, label='agent2', color='blue')
plt.plot(rewards_per_episode3, label='agent3', color='green')
plt.xlabel('Episodes')
plt.ylabel('Total Reward')
plt.title('DQN - Total Reward per Episode')
plt.legend()
plt.show()

In [None]:
# Plotting all three agents of MAE loss per episode
import matplotlib.pyplot as plt
 
plt.plot(mae_loss_per_episode1, label='agent1', color='red')
plt.plot(mae_loss_per_episode2, label='agent2', color='blue')
plt.plot(mae_loss_per_episode3, label='agent3', color='green')
plt.xlabel('Episodes')
plt.ylabel('MAE Loss')
plt.title('DQN - MAE Loss per Episode')
plt.legend()
plt.show()