<a href="https://colab.research.google.com/github/kunalr33/SOC_RlForAgents/blob/main/Skiing_AtariGame_usingRL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#!apt-get install -y python3-opengl

In [None]:
#!pip install gym[atari]

In [None]:
#!pip install --upgrade gym

In [None]:
!pip install autorom[accept-rom-license]



In [None]:
!pip install atari_py



In [None]:
!pip install tensorflow



In [None]:
import gym
import numpy as np
from tensorflow.keras import layers
import tensorflow as tf
from collections import deque
import random
import cv2
import matplotlib.pyplot as plt
from IPython.display import clear_output
import torchvision.transforms as T

**Define the DQN Model**

In [None]:
def create_dqn_model(input_shape, num_actions):
    inputs = layers.Input(shape=input_shape)

    # Convolutional layers
    layer1 = layers.Conv2D(32, (8, 8), strides=(4, 4), activation='relu')(inputs)
    layer2 = layers.Conv2D(64, (4, 4), strides=(2, 2), activation='relu')(layer1)
    layer3 = layers.Conv2D(64, (3, 3), activation='relu')(layer2)

    # Flatten and Dense layers
    flattened = layers.Flatten()(layer3)
    common = layers.Dense(512, activation='relu')(flattened)

    # Dueling streams
    value = layers.Dense(1)(common)
    advantage = layers.Dense(num_actions)(common)

    # Compute the mean advantage using a Lambda layer
    mean_advantage = layers.Lambda(lambda x: tf.reduce_mean(x, axis=1, keepdims=True))(advantage)

    # Compute Q-values
    q_values = value + (advantage - mean_advantage)

    # Create and compile the model
    model = tf.keras.Model(inputs, q_values)
    model.compile(optimizer='adam', loss='mse')

    return model


**Define the Replay Buffer**

In [None]:
class ReplayBuffer:
    def __init__(self, max_size):
        self.buffer = deque(maxlen=max_size)

    def add(self, experience):
        self.buffer.append(experience)

    def sample(self, batch_size):
        return random.sample(self.buffer, batch_size)

    def size(self):
        return len(self.buffer)

**Define the DQN Agent**

In [None]:
class DQNAgent:
    def __init__(self, state_shape, num_actions, replay_buffer_size=10000, batch_size=32, gamma=0.99, epsilon=1.0, epsilon_min=0.1, epsilon_decay=0.999, target_update_freq=10000):
        self.model = create_dqn_model(state_shape, num_actions)
        self.target_model = create_dqn_model(state_shape, num_actions)
        self.target_model.set_weights(self.model.get_weights())
        self.replay_buffer = ReplayBuffer(replay_buffer_size)
        self.batch_size = batch_size
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        self.num_actions = num_actions
        self.target_update_freq = target_update_freq
        self.steps = 0

    def update_target_model(self):
        self.target_model.set_weights(self.model.get_weights())

    def choose_action(self, state):
        if np.random.rand() < self.epsilon:
            return np.random.randint(self.num_actions)
        q_values = self.model.predict(np.expand_dims(state, axis=0), verbose=0)
        return np.argmax(q_values[0])

    def act(self, state):
        return self.choose_action(state)

    def learn(self):
        if self.replay_buffer.size() < self.batch_size:
            return

        minibatch = self.replay_buffer.sample(self.batch_size)
        states, actions, rewards, next_states, dones = zip(*minibatch)
        states = np.array(states)
        next_states = np.array(next_states)

        targets = self.model.predict(states, verbose=0)
        next_q_values = self.target_model.predict(next_states, verbose=0)

        for i in range(self.batch_size):
            target = rewards[i]
            if not dones[i]:
                target += self.gamma * np.max(next_q_values[i])
            targets[i, actions[i]] = target

        loss = self.model.train_on_batch(states, targets)

        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

        self.steps += 1
        if self.steps % self.target_update_freq == 0:
            self.update_target_model()

        return loss

**Frame Preprocessing**

In [None]:
def preprocess_frame(frame):
    frame = frame[35:195]  # Crop to play area
    frame = cv2.resize(frame, (84, 84))  # Resize to 84x84
    frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)  # Convert to grayscale
    frame = frame / 255.0  # Normalize pixel values
    frame = np.expand_dims(frame, axis=-1)  # Add channel dimension
    return frame

**Train the DQN Agent**

In [None]:
def train_dqn(env_name, episodes=1000, max_steps_per_episode=200):
    env = gym.make(env_name, render_mode='rgb_array')
    state_shape = (84, 84, 1)
    num_actions = env.action_space.n
    agent = DQNAgent(state_shape, num_actions)

    episode_rewards = []
    epsilon_values = []
    losses = []

    for episode in range(episodes):
        state = env.reset()
        if isinstance(state, tuple):
            state = state[0]
        state = preprocess_frame(state)
        total_reward = 0

        for step in range(max_steps_per_episode):
            action = agent.choose_action(state)
            next_frame, reward, done, truncated, info = env.step(action)
            next_state = preprocess_frame(next_frame)
            agent.replay_buffer.add((state, action, reward, next_state, done))
            state = next_state
            total_reward += reward

            if agent.replay_buffer.size() >= agent.batch_size:
                loss = agent.learn()
                if loss is not None:
                    losses.append(loss)

            if done or truncated:
                break

        episode_rewards.append(total_reward)
        epsilon_values.append(agent.epsilon)

        print(f"Episode {episode+1}/{episodes}, Reward: {total_reward}, Epsilon: {agent.epsilon}")

    env.close()
    return agent, episode_rewards, epsilon_values, losses


**Rewards**

In [None]:
def plot_rewards(episode_rewards):
    plt.figure(figsize=(12, 6))
    plt.plot(episode_rewards, label='Episode Reward', color='blue')
    plt.xlabel('Episode')
    plt.ylabel('Total Reward')
    plt.title('Agent Rewards Over Episodes')
    plt.legend()
    plt.grid(True)
    plt.show()

**Metrics**

In [None]:
def plot_metrics(episode_rewards, epsilon_values, losses=None):
    plt.figure(figsize=(12, 8))

    plt.subplot(3, 1, 1)
    plt.plot(episode_rewards, label='Episode Reward', color='blue')
    plt.xlabel('Episode')
    plt.ylabel('Total Reward')
    plt.title('Training Convergence - Rewards')
    plt.legend()
    plt.grid(True)

    plt.subplot(3, 1, 2)
    plt.plot(epsilon_values, label='Epsilon', color='orange')
    plt.xlabel('Episode')
    plt.ylabel('Epsilon')
    plt.title('Epsilon Decay')
    plt.legend()
    plt.grid(True)

    if losses:
        plt.subplot(3, 1, 3)
        plt.plot(losses, label='Loss', color='red')
        plt.xlabel('Update Steps')
        plt.ylabel('Loss')
        plt.title('Training Loss')
        plt.legend()
        plt.grid(True)

    plt.tight_layout()
    plt.show()

**Training and Plot Results**

In [None]:
# Train the agent and get rewards
trained_agent, rewards, epsilon_values, losses = train_dqn('ALE/Skiing-v5', episodes=1000)

# Plot the rewards
plot_rewards(rewards)

# Plot the metrics
plot_metrics(rewards, epsilon_values, losses)

  if not isinstance(terminated, (bool, np.bool8)):


Episode 1/1000, Reward: -1331.0, Epsilon: 0.8444374977929298
Episode 2/1000, Reward: -1331.0, Epsilon: 0.6912977691360495
Episode 3/1000, Reward: -1331.0, Epsilon: 0.5659301095244186
Episode 4/1000, Reward: -1331.0, Epsilon: 0.46329802172888124


**Frame Display Function**

In [None]:
def show_frame(frame):
    plt.figure(figsize=(10, 7))
    plt.imshow(frame)
    plt.axis('off')
    plt.show()
    clear_output(wait=True)

**Image Preprocessing Function**

In [None]:
def preprocess_image(image):
    # Convert to grayscale and resize
    transform = T.Compose([
        T.ToPILImage(),
        T.Grayscale(),
        T.Resize((84, 84)),
        T.ToTensor()
    ])
    return transform(image).unsqueeze(0)  # Add batch dimension

**Visualization Function**

In [None]:
import atari_py


In [None]:
def visualize_agent(env_name, agent, num_frames=100):
    # Create the environment with a render mode
    env = gym.make(env_name, render_mode='rgb_array')
    state, info = env.reset()
    done = False

    for _ in range(num_frames):
        frame = env.render()  # Capture frame from environment
        show_frame(frame)  # Display the frame

        # Get action from agent and step through environment
        action = agent.act(state)
        state, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated

        if done:
            state, info = env.reset()

    env.close()


**Visualize Agent**

In [None]:
# Example state_shape and num_actions
state_shape = (84, 84, 1)
num_actions = 3  # Update this based on your environment

# Instantiate agent
agent = DQNAgent(state_shape=state_shape, num_actions=num_actions)

# Visualize the agent
visualize_agent('ALE/Skiing-v5', agent, num_frames=100)