In [None]:
!pip install swig
!pip install gymnasium[box2d] 

In [15]:
import os
import random
from collections import deque
from typing import Deque, Tuple, List
import cv2
import gymnasium as gym
import numpy as np
import tensorflow as tf
from tensorflow import keras

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
tf.get_logger().setLevel('ERROR')

MODEL_PATH = 'lunar_lander_dqn.weights.h5'

class DQNAgent:
    def __init__(
        self,
        state_size: int,
        action_size: int,
        learning_rate: float = 0.001,
        gamma: float = 0.99,
        epsilon: float = 1.0,
        epsilon_decay: float = 0.995,
        epsilon_min: float = 0.01,
        memory_size: int = 20000
    ):
        self.state_size = state_size
        self.action_size = action_size
        self.memory: Deque[Tuple] = deque(maxlen=memory_size)
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min
        self.optimizer = keras.optimizers.Adam(learning_rate=learning_rate)
        self.loss_fn = keras.losses.MeanSquaredError()
        self.model = self._build_model()
        self.target_model = self._build_model()
        self.update_target_model()

    def _build_model(self) -> keras.Model:
        return keras.Sequential([
            keras.layers.Input(shape=(self.state_size,)),
            keras.layers.Dense(64, activation='relu'),
            keras.layers.Dense(64, activation='relu'),
            keras.layers.Dense(self.action_size, activation='linear', dtype='float32')
        ])

    def update_target_model(self) -> None:
        self.target_model.set_weights(self.model.get_weights())

    def remember(self, state, action, reward, next_state, done) -> None:
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state: tf.Tensor) -> int:
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        q_values = self.model(state, training=False)
        return tf.argmax(q_values[0]).numpy()

    def replay(self, batch_size: int) -> None:
        if len(self.memory) < batch_size:
            return

        minibatch = random.sample(self.memory, batch_size)
        
        states = tf.convert_to_tensor(np.array([t[0] for t in minibatch]), dtype=tf.float32)
        actions = tf.convert_to_tensor([t[1] for t in minibatch], dtype=tf.int32)
        rewards = tf.convert_to_tensor([t[2] for t in minibatch], dtype=tf.float32)
        next_states = tf.convert_to_tensor(np.array([t[3] for t in minibatch]), dtype=tf.float32)
        dones = tf.convert_to_tensor([t[4] for t in minibatch], dtype=tf.float32)

        self._train_step(states, actions, rewards, next_states, dones)

        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    @tf.function
    def _train_step(self, states, actions, rewards, next_states, dones):
        q_next = self.target_model(next_states, training=False)
        targets = rewards + self.gamma * tf.reduce_max(q_next, axis=1) * (1 - dones)

        with tf.GradientTape() as tape:
            q_values = self.model(states, training=True)
            action_indices = tf.stack([tf.range(tf.shape(actions)[0]), actions], axis=1)
            action_q_values = tf.gather_nd(q_values, action_indices)
            loss = self.loss_fn(targets, action_q_values)
        
        gradients = tape.gradient(loss, self.model.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.model.trainable_variables))

    def load(self, name: str) -> None:
        self.model.load_weights(name)
        self.update_target_model()

    def save(self, name: str) -> None:
        self.model.save_weights(name)

def train_agent(
    agent: DQNAgent,
    env: gym.Env,
    episodes: int = 1000,
    batch_size: int = 64,
    target_update_freq: int = 100
) -> DQNAgent:
    scores: Deque[float] = deque(maxlen=100)
    total_steps = 0

    for e in range(episodes):
        state, _ = env.reset()
        state = np.reshape(state, [1, agent.state_size])
        total_reward = 0.0
        done = False
        
        while not done:
            action = agent.act(tf.convert_to_tensor(state, dtype=tf.float32))
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            
            reward = -10.0 if terminated else reward
            total_reward += reward

            next_state_reshaped = np.reshape(next_state, [1, agent.state_size])
            agent.remember(state[0], action, reward, next_state_reshaped[0], done)
            state = next_state_reshaped
            
            agent.replay(batch_size)
            total_steps += 1

            if total_steps % target_update_freq == 0:
                agent.update_target_model()

        scores.append(total_reward)
        avg_score = np.mean(scores) if scores else 0.0

        print(
            f"Episode: {e+1}/{episodes}, Score: {total_reward:.2f}, "
            f"Avg Score: {avg_score:.2f}, Epsilon: {agent.epsilon:.2f}"
        )

        if avg_score >= 200.0 and len(scores) >= 100:
            print(f"Environment solved in {e+1} episodes!")
            break

    return agent

def save_video(frames: List[np.ndarray], filename: str, fps: int = 30) -> None:
    if not frames:
        return
    height, width, _ = frames[0].shape
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(filename, fourcc, fps, (width, height))
    for frame in frames:
        out.write(cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
    out.release()
    print(f"Video saved: {filename}")

def test_agent_with_video(
    agent: DQNAgent,
    env: gym.Env,
    model_path: str,
    test_episodes: int = 5
) -> None:
    agent.load(model_path)
    agent.epsilon = 0.0
    
    all_frames = []
    episode_scores = []
    
    for episode in range(test_episodes):
        state, _ = env.reset()
        state = np.reshape(state, [1, agent.state_size])
        total_reward = 0.0
        done = False
        episode_frames = []

        print(f"Testing episode {episode + 1}/{test_episodes}")

        while not done:
            frame = env.render()
            if frame is None:
                continue
            episode_frames.append(frame)
            action = agent.act(tf.convert_to_tensor(state, dtype=tf.float32))
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            state = np.reshape(next_state, [1, agent.state_size])
            total_reward += reward
        
        episode_scores.append(total_reward)
        print(f"Episode {episode + 1}: Score = {total_reward:.2f}")

        all_frames.extend(episode_frames)
        
        if episode < test_episodes - 1 and episode_frames:
            transition_frame = np.zeros_like(episode_frames[0])
            cv2.putText(transition_frame, f'Episode {episode + 1} Complete', (50, 200), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
            cv2.putText(transition_frame, f'Score: {total_reward:.1f}', (50, 250), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
            for _ in range(60):
                all_frames.append(transition_frame)

    print("\nTest Summary:")
    print(f"Average Score: {np.mean(episode_scores):.2f}")
    print(f"Min Score: {np.min(episode_scores):.2f}")
    print(f"Max Score: {np.max(episode_scores):.2f}")

    save_video(all_frames, 'lunar_lander_test.mp4')

def run_training():
    print("Starting training...")
    env = gym.make('LunarLander-v2')
    state_size = env.observation_space.shape[0]
    action_size = env.action_space.n
    agent = DQNAgent(state_size, action_size)
    
    trained_agent = train_agent(agent, env, episodes=1000, batch_size=64)
    trained_agent.save(MODEL_PATH)
    env.close()

def run_testing():
    print("\nStarting testing and video rendering...")
    if not os.path.exists(MODEL_PATH):
        print(f"Model file not found at {MODEL_PATH}. Skipping testing.")
        return

    env = gym.make('LunarLander-v2', render_mode='rgb_array')
    state_size = env.observation_space.shape[0]
    action_size = env.action_space.n
    agent = DQNAgent(state_size, action_size)
    
    test_agent_with_video(agent, env, MODEL_PATH, test_episodes=5)
    env.close()

if __name__ == "__main__":
    run_training()
    run_testing()

Starting training...
Episode: 1/1000, Score: -101.83, Avg Score: -101.83, Epsilon: 0.79
Episode: 2/1000, Score: -345.81, Avg Score: -223.82, Epsilon: 0.46
Episode: 3/1000, Score: 17.81, Avg Score: -143.27, Epsilon: 0.23
Episode: 4/1000, Score: -174.70, Avg Score: -151.13, Epsilon: 0.09
Episode: 5/1000, Score: -286.86, Avg Score: -178.28, Epsilon: 0.06
Episode: 6/1000, Score: -53.16, Avg Score: -157.42, Epsilon: 0.01
Episode: 7/1000, Score: -135.62, Avg Score: -154.31, Epsilon: 0.01
Episode: 8/1000, Score: -77.14, Avg Score: -144.66, Epsilon: 0.01
Episode: 9/1000, Score: -16.96, Avg Score: -130.47, Epsilon: 0.01
Episode: 10/1000, Score: 2.88, Avg Score: -117.14, Epsilon: 0.01
Episode: 11/1000, Score: -74.36, Avg Score: -113.25, Epsilon: 0.01
Episode: 12/1000, Score: 57.05, Avg Score: -99.06, Epsilon: 0.01
Episode: 13/1000, Score: -80.46, Avg Score: -97.63, Epsilon: 0.01
Episode: 14/1000, Score: -7.34, Avg Score: -91.18, Epsilon: 0.01
Episode: 15/1000, Score: -11.79, Avg Score: -85.89, E