In [None]:
!pip install gym==0.26.0
!pip install numpy==1.24.3
!pip install tensorflow==2.12.0
!pip install matplotlib==3.7.1
!pip install opencv-python==4.7.0.72
!pip install pyvirtualdisplay
!apt-get install -y xvfb python-opengl ffmpeg > /dev/null 2>&1

In [None]:
# Self-Driving Car Simulation using Deep Q-Learning
# Environment: OpenAI Gym's CarRacing-v2
# Optimized for Google Colab

import gym
import numpy as np
import random
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, Flatten, Dense
from tensorflow.keras.optimizers import Adam
from collections import deque
import cv2
import time
import os
import base64
from IPython import display as ipythondisplay
from IPython.display import HTML
from pyvirtualdisplay import Display
from gym.wrappers import RecordVideo
import glob

# Set up virtual display for rendering
virtual_display = Display(visible=0, size=(1400, 900))
virtual_display.start()

# Set random seeds for reproducibility
np.random.seed(123)
tf.random.set_seed(123)
random.seed(123)

# Install required packages - uncomment and run in Colab
"""
!pip install gym==0.26.0
!pip install numpy==1.24.3
!pip install tensorflow==2.12.0
!pip install matplotlib==3.7.1
!pip install opencv-python==4.7.0.72
!pip install pyvirtualdisplay
!apt-get install -y xvfb python-opengl ffmpeg > /dev/null 2>&1
"""

# Function to display animation in Colab
def show_video(video_path):
    """
    Show a recorded gym video in a Colab notebook
    """
    mp4 = open(video_path, 'rb').read()
    encoded = base64.b64encode(mp4).decode('ascii')
    display_html = HTML(f"""
    <video width=600 controls>
        <source src="data:video/mp4;base64,{encoded}" type="video/mp4">
    </video>
    """)
    ipythondisplay.display(display_html)
    ipythondisplay.clear_output(wait=True)

# Function to show the latest recorded video
def show_latest_video():
    video_files = sorted(glob.glob('./videos/*.mp4'), key=os.path.getmtime)
    if len(video_files) > 0:
        show_video(video_files[-1])
    else:
        print("No videos found.")

class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=10000)
        self.gamma = 0.95    # discount rate
        self.epsilon = 1.0   # exploration rate
        self.epsilon_min = 0.1
        self.epsilon_decay = 0.9995
        self.learning_rate = 0.001
        self.update_target_frequency = 1000
        self.model = self._build_model()
        self.target_model = self._build_model()
        self.update_target_model()

    def _build_model(self):
        # Neural Net for Deep-Q learning Model
        model = Sequential()
        model.add(Conv2D(32, (8, 8), strides=(4, 4), activation='relu', input_shape=self.state_size))
        model.add(Conv2D(64, (4, 4), strides=(2, 2), activation='relu'))
        model.add(Conv2D(64, (3, 3), activation='relu'))
        model.add(Flatten())
        model.add(Dense(512, activation='relu'))
        model.add(Dense(self.action_size, activation='linear'))
        model.compile(loss='mse', optimizer=Adam(learning_rate=self.learning_rate))
        return model

    def update_target_model(self):
        # Copy weights from model to target_model
        self.target_model.set_weights(self.model.get_weights())

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state, training=True):
        if training and np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        act_values = self.model.predict(state, verbose=0)
        return np.argmax(act_values[0])  # returns action index

    def replay(self, batch_size, step):
        if len(self.memory) < batch_size:
            return

        minibatch = random.sample(self.memory, batch_size)
        states, targets = [], []

        for state, action, reward, next_state, done in minibatch:
            target = self.model.predict(state, verbose=0)[0]
            if done:
                target[action] = reward
            else:
                # Use target network for more stable Q-value estimation
                t = self.target_model.predict(next_state, verbose=0)[0]
                target[action] = reward + self.gamma * np.amax(t)

            states.append(state[0])
            targets.append(target)

        self.model.fit(np.array(states), np.array(targets), epochs=1, verbose=0)

        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

        # Update target network periodically
        if step % self.update_target_frequency == 0:
            self.update_target_model()
            print("Target model updated!")

    def load(self, name):
        self.model.load_weights(name)

    def save(self, name):
        self.model.save_weights(name)


def preprocess_state(state):
    """
    Preprocess the RGB image (96, 96, 3) to grayscale and resize to (84, 84, 1)
    """
    grayscale = cv2.cvtColor(state, cv2.COLOR_RGB2GRAY)
    resized = cv2.resize(grayscale, (84, 84), interpolation=cv2.INTER_AREA)
    normalized = resized / 255.0
    return normalized.reshape(1, 84, 84, 1)

def create_discrete_actions():
    """
    Create a set of discrete actions
    """
    # Discrete action space (steering, gas, brake):
    # 0: Steer left + gas
    # 1: Straight + gas
    # 2: Steer right + gas
    # 3: Steer left + no gas
    # 4: Straight + no gas
    # 5: Steer right + no gas
    # 6: Brake only
    actions = np.array([
        [-1.0, 0.5, 0.0],  # Left + gas
        [0.0, 0.5, 0.0],   # Straight + gas
        [1.0, 0.5, 0.0],   # Right + gas
        [-1.0, 0.0, 0.0],  # Left
        [0.0, 0.0, 0.0],   # Straight
        [1.0, 0.0, 0.0],   # Right
        [0.0, 0.0, 0.8],   # Brake
    ])
    return actions

def visualize_agent_state(state, action_idx, discrete_actions, step_reward, total_reward, action_meanings):
    """
    Visualize the current state of the agent and its action
    """
    # Reshape state from (1, 84, 84, 1) to (84, 84)
    display_state = state.reshape(84, 84)

    plt.figure(figsize=(10, 6))

    # Display the state
    plt.subplot(1, 2, 1)
    plt.imshow(display_state, cmap='gray')
    plt.title('Agent View (Grayscale)')
    plt.axis('off')

    # Display action and reward info
    plt.subplot(1, 2, 2)
    action = discrete_actions[action_idx]

    # Create text for action and reward
    info_text = f"Action: {action_meanings[action_idx]}\n"
    info_text += f"Action values: Steer={action[0]:.1f}, Gas={action[1]:.1f}, Brake={action[2]:.1f}\n"
    info_text += f"Step Reward: {step_reward:.2f}\n"
    info_text += f"Total Reward: {total_reward:.2f}"

    plt.text(0.1, 0.5, info_text, fontsize=12)
    plt.axis('off')

    plt.tight_layout()
    plt.show()

def train_model(episodes=100, batch_size=32, save_every=10, visualize_every=10, video_every=20):
    """
    Train the DQN agent
    """
    # Create folders
    if not os.path.exists('models'):
        os.makedirs('models')
    if not os.path.exists('videos'):
        os.makedirs('videos')

    # Define discrete actions
    discrete_actions = create_discrete_actions()
    num_actions = len(discrete_actions)

    # Define action meanings for visualization
    action_meanings = [
        "Left + Gas",
        "Straight + Gas",
        "Right + Gas",
        "Left",
        "Straight",
        "Right",
        "Brake"
    ]

    # State dimensions (after preprocessing)
    state_size = (84, 84, 1)

    # Create agent
    agent = DQNAgent(state_size, num_actions)

    # Keep track of scores
    scores = []

    for e in range(episodes):
        # Create video wrapper if needed
        if (e + 1) % video_every == 0:
            env = gym.make('CarRacing-v2', render_mode="rgb_array")
            env = RecordVideo(env, f'videos/episode_{e+1}',
                             episode_trigger=lambda x: True,
                             video_length=1000)
        else:
            env = gym.make('CarRacing-v2', render_mode="rgb_array")

        state = env.reset()[0]  # Get initial state
        state = preprocess_state(state)

        total_reward = 0
        done = False
        step = 0

        while not done:
            # Select action
            action_idx = agent.act(state)
            action = discrete_actions[action_idx]

            # Take action
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated

            # Preprocess next state
            next_state = preprocess_state(next_state)

            # Modify reward to encourage staying on track
            if reward < 0:
                reward *= 5  # Penalize going off track more heavily

            total_reward += reward

            # Remember experience
            agent.remember(state, action_idx, reward, next_state, done)

            # Visualize state and action periodically
            if (e + 1) % visualize_every == 0 and step % 20 == 0:
                visualize_agent_state(state, action_idx, discrete_actions, reward, total_reward, action_meanings)

            # Train agent
            agent.replay(batch_size, step)

            # Update state
            state = next_state
            step += 1

            # Limit maximum steps per episode
            if step > 1000:
                break

        # Close environment
        env.close()

        # Save score
        scores.append(total_reward)

        # Print progress
        print(f"Episode: {e+1}/{episodes}, Score: {total_reward:.2f}, Epsilon: {agent.epsilon:.2f}")

        # Save model periodically
        if (e + 1) % save_every == 0:
            agent.save(f"models/car_racing_dqn_{e+1}.h5")

            # Plot scores
            plt.figure(figsize=(10, 5))
            plt.plot(scores)
            plt.title('Training Progress')
            plt.xlabel('Episode')
            plt.ylabel('Score')
            plt.savefig(f'training_progress_{e+1}.png')
            plt.show()

        # Display the recorded video
        if (e + 1) % video_every == 0:
            print("Displaying the latest recorded episode:")
            show_latest_video()

    # Final save
    agent.save("models/car_racing_dqn_final.h5")

    # Plot final scores
    plt.figure(figsize=(10, 5))
    plt.plot(scores)
    plt.title('Training Progress')
    plt.xlabel('Episode')
    plt.ylabel('Score')
    plt.savefig('training_progress_final.png')
    plt.show()

    print("Training complete!")
    return scores, agent


def test_agent(model_path, num_episodes=3):
    """
    Test a trained agent and display video
    """
    # Create environment
    env = gym.make('CarRacing-v2', render_mode="rgb_array")
    env = RecordVideo(env, 'videos/test', episode_trigger=lambda x: True)

    # Define discrete actions
    discrete_actions = create_discrete_actions()
    num_actions = len(discrete_actions)

    # State dimensions
    state_size = (84, 84, 1)

    # Create agent
    agent = DQNAgent(state_size, num_actions)

    # Load trained weights
    agent.load(model_path)
    agent.epsilon = 0.01  # Small epsilon for minimal exploration

    for episode in range(num_episodes):
        state = env.reset()[0]
        state = preprocess_state(state)

        total_reward = 0
        done = False
        step = 0

        while not done:
            # Select action
            action_idx = agent.act(state, training=False)
            action = discrete_actions[action_idx]

            # Take action
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated

            # Preprocess next state
            next_state = preprocess_state(next_state)

            total_reward += reward
            state = next_state
            step += 1

            # Limit steps
            if step > 1000:
                break

        print(f"Test Episode: {episode+1}/{num_episodes}, Score: {total_reward:.2f}")

    env.close()

    # Show the recorded test video
    print("Displaying the test video:")
    show_latest_video()


if __name__ == "__main__":
    # Train agent
    print("Starting training...")
    scores, agent = train_model(episodes=100, batch_size=32, save_every=10, visualize_every=5, video_every=20)

    # Test trained agent
    print("Testing the trained agent...")
    test_agent("models/car_racing_dqn_final.h5")

# Example additional cell for Colab notebook to evaluate model performance
"""
# Plot learning curve
plt.figure(figsize=(10, 5))
plt.plot(scores)
plt.title('Training Progress')
plt.xlabel('Episode')
plt.ylabel('Score')
plt.grid(True)
plt.show()

# Show a recorded video of the agent's performance
show_latest_video()
"""