<a href="https://colab.research.google.com/github/karanidenis/DQN/blob/main/q_learning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import gymnasium as gym
import keras
from keras import layers
from gymnasium.wrappers import AtariPreprocessing, FrameStack
import numpy as np
import tensorflow as tf

In [None]:
# !pip install stable-baselines3[extra] gym

!pip install tensorflow
!pip install keras
!pip install stable-baselines3[extra]





In [None]:
import gym
from gym import spaces
import numpy as np

class ACEnv(gym.Env):
    def __init__(self):
        super(ACEnv, self).__init__()
        self.min_temp = 0.0
        self.max_temp = 50.0
        self.desired_temp = 20.0  # Desired temperature
        self.current_temp = 25.0  # Initial temperature
        self.observation_space = spaces.Box(
            low=np.array([self.min_temp]), high=np.array([self.max_temp]), dtype=np.float32
        )
        self.action_space = spaces.Discrete(2)  # 0: cool down, 1: warm up
        self.state = np.array([self.current_temp])  # Initial state

    def reset(self):
        self.current_temp = 25.0
        self.state = np.array([self.current_temp])
        return self.state

    def step(self, action):
        temp = self.state[0]
        if action == 0:  # Cool down
            temp -= np.random.uniform(0.5, 1.5)
        elif action == 1:  # Warm up
            temp += np.random.uniform(0.5, 1.5)

        # Bound the temperature to the range [min_temp, max_temp]
        temp = np.clip(temp, self.min_temp, self.max_temp)
        self.state = np.array([temp])

        # Calculate reward based on distance to desired temperature
        distance_to_desired = abs(temp - self.desired_temp)
        reward = -distance_to_desired  # Higher penalty for larger deviations

        done = bool(temp <= self.min_temp or temp >= self.max_temp)
        return self.state, reward, done, {}

    def render(self, mode='human'):
        print(f"Current Temperature: {self.state[0]}")

    def close(self):
        pass

# Instantiate the environment
env = ACEnv()


In [None]:
import os
os.environ["KERAS_BACKEND"] = "tensorflow"

import keras
from keras import layers
import numpy as np
import tensorflow as tf

# Configuration parameters for the whole setup
seed = 42
gamma = 0.99  # Discount factor for past rewards
epsilon = 1.0  # Epsilon greedy parameter
epsilon_min = 0.1  # Minimum epsilon greedy parameter
epsilon_max = 1.0  # Maximum epsilon greedy parameter
epsilon_interval = (
    epsilon_max - epsilon_min
)  # Rate at which to reduce chance of random action being taken
batch_size = 32  # Size of batch taken from replay buffer
max_steps_per_episode = 50
max_episodes = 100  # Limit training episodes, will run until solved if smaller than 1

num_actions = 2

def create_q_model():
    return keras.Sequential(
        [
            layers.Dense(24, activation="relu", input_shape=(1,)),
            layers.Dense(24, activation="relu"),
            layers.Dense(num_actions, activation="linear"),
        ]
    )

# The first model makes the predictions for Q-values which are used to make a action.
model = create_q_model()
# Build a target model for the prediction of future rewards.
model_target = create_q_model()

optimizer = keras.optimizers.Adam(learning_rate=0.00025, clipnorm=1.0)

# Experience replay buffers
action_history = []
state_history = []
state_next_history = []
rewards_history = []
done_history = []
episode_reward_history = []
running_reward = 0
episode_count = 0
frame_count = 0
epsilon_random_frames = 200
epsilon_greedy_frames = 500.0
max_memory_length = 100
update_after_actions = 4
update_target_network = 100
loss_function = keras.losses.Huber()


In [None]:
while True:
    state = env.reset()
    episode_reward = 0

    for timestep in range(1, max_steps_per_episode):
        frame_count += 1

        if frame_count < epsilon_random_frames or epsilon > np.random.rand(1)[0]:
            action = np.random.choice(num_actions)
        else:
            state_tensor = tf.convert_to_tensor(state)
            state_tensor = tf.expand_dims(state_tensor, 0)
            action_probs = model(state_tensor, training=False)
            action = tf.argmax(action_probs[0]).numpy()

        epsilon -= epsilon_interval / epsilon_greedy_frames
        epsilon = max(epsilon, epsilon_min)

        state_next, reward, done, _ = env.step(action)
        state_next = np.array(state_next)

        episode_reward += reward

        action_history.append(action)
        state_history.append(state)
        state_next_history.append(state_next)
        done_history.append(done)
        rewards_history.append(reward)
        state = state_next

        if frame_count % update_after_actions == 0 and len(done_history) > batch_size:
            indices = np.random.choice(range(len(done_history)), size=batch_size)
            state_sample = np.array([state_history[i] for i in indices])
            state_next_sample = np.array([state_next_history[i] for i in indices])
            rewards_sample = [rewards_history[i] for i in indices]
            action_sample = [action_history[i] for i in indices]
            done_sample = tf.convert_to_tensor([float(done_history[i]) for i in indices])

            future_rewards = model_target.predict(state_next_sample)
            updated_q_values = rewards_sample + gamma * tf.reduce_max(future_rewards, axis=1)

            updated_q_values = updated_q_values * (1 - done_sample) - done_sample
            masks = tf.one_hot(action_sample, num_actions)

            with tf.GradientTape() as tape:
                q_values = model(state_sample)
                q_action = tf.reduce_sum(tf.multiply(q_values, masks), axis=1)
                loss = loss_function(updated_q_values, q_action)

            grads = tape.gradient(loss, model.trainable_variables)
            optimizer.apply_gradients(zip(grads, model.trainable_variables))

        if frame_count % update_target_network == 0:
            model_target.set_weights(model.get_weights())
            template = "running reward: {:.2f} at episode {}, frame count {}"
            print(template.format(running_reward, episode_count, frame_count))

        if len(rewards_history) > max_memory_length:
            del rewards_history[:1]
            del state_history[:1]
            del state_next_history[:1]
            del action_history[:1]
            del done_history[:1]

        if done:
            break

    episode_reward_history.append(episode_reward)
    if len(episode_reward_history) > 100:
        del episode_reward_history[:1]
    running_reward = np.mean(episode_reward_history)

    episode_count += 1

    if running_reward > 40:
        print("Solved at episode {}!".format(episode_count))
        break

    if max_episodes > 0 and episode_count >= max_episodes:
        print("Stopped at episode {}!".format(episode_count))
        break


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 68ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 28ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 17ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 22

In [None]:
def interact_with_ac(env, model):
    state = env.reset()
    done = False
    while not done:
        state_tensor = tf.convert_to_tensor(state)
        state_tensor = tf.expand_dims(state_tensor, 0)
        action_probs = model(state_tensor, training=False)
        action = tf.argmax(action_probs[0]).numpy()

        state, reward, done, _ = env.step(action)
        env.render()
        print(f"Action: {'Cool' if action == 0 else 'Warm'}, Reward: {reward}")

        user_input = input("Enter new temperature (or 'exit' to quit): ")
        if user_input.lower() == 'exit':
            break
        else:
            try:
                new_temp = float(user_input)
                env.state = np.array([new_temp])
                state = env.state
            except ValueError:
                print("Invalid input. Please enter a numeric value.")


In [None]:
interact_with_ac(env, model)


Current Temperature: 24.26923926546335
Action: Cool, Reward: -4.2692392654633515
Enter new temperature (or 'exit' to quit): 18
Current Temperature: 17.288813218440186
Action: Cool, Reward: -2.711186781559814
Enter new temperature (or 'exit' to quit): quit
Invalid input. Please enter a numeric value.
Current Temperature: 15.879483339076563
Action: Cool, Reward: -4.120516660923437
Enter new temperature (or 'exit' to quit): exit
