In [None]:
  pip install gymnasium



In [None]:
import gymnasium as gym

In [None]:
pip install swig



In [None]:
pip install "gymnasium[box2d]"



In [None]:
import os
import random
import numpy as np
import torch
import torch.nn.functional as F
import torch.autograd as autograd
from torch.autograd import Variable
from collections import deque,namedtuple


In [None]:
env = gym.make("LunarLander-v3", continuous=False,gravity=-10.0, enable_wind=False, wind_power=15.0, turbulence_power=1.5)

In [None]:
state = env.observation_space.shape
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
print(state)
print(state_size)
print(action_size)

(8,)
8
4


In [None]:
 learning_rate = 5e-4
 minibatch= 150
 gamma = 0.99
 replay_buffer_size = 100000
 interpolation_parameter = 1e-3
 number_episodes = 5000
 max_time_steps = 1000
 epsilon_starting_value = 1.0
 eplison_ending_value = 0.01
 eplison_decay_value = 0.995
 scores_100_episode = deque(maxlen=100)

In [None]:
import torch.nn as nn

class ANN(nn.Module):
  def __init__(self,state_size,action_size , seed = 42):
    super(ANN,self).__init__()
    self.seed = torch.manual_seed(seed)
    self.fc1 = nn.Linear(state_size,64)
    self.fc2 = nn.Linear(64,64)
    self.fc3 = nn.Linear(64,action_size)

  def forward(self,state):
    x = self.fc1(state)
    x = F.relu(x)
    x = self.fc2(x)
    x = F.relu(x)
    return self.fc3(x)

In [None]:
class ReplayMemory(object):
  def __init__(self,capacity):
    self.capacity = capacity
    self.memory = []


  def push(self,event):
      self.memory.append(event)
      if len(self.memory) > self.capacity:
        del self.memory[0]

  def sample(self,batch_size): #Takes random experiences from the memory for the agent to train
   experiences = random.sample(self.memory,batch_size)
   states = torch.from_numpy(np.vstack([e[0] for e in experiences if e is not None])).float()
   actions = torch.from_numpy(np.vstack([e[1] for e in experiences if e is not None])).long()
   rewards = torch.from_numpy(np.vstack([e[2] for e in experiences if e is not None])).float()
   next_states = torch.from_numpy(np.vstack([e[3] for e in experiences if e is not None])).float()
   dones = torch.from_numpy(np.vstack([e[4] for e in experiences if e is not None]).astype(np.uint8)).float()

   return states, actions, rewards, next_states, dones

In [None]:
import torch
import torch.nn.functional as F
import numpy as np
import random
import os

class Agent():

    def __init__(self, state_size, action_size, save_dir="checkpoints"):
        self.state_size = state_size
        self.action_size = action_size
        self.local_qnetwork = ANN(state_size, action_size)
        self.target_qnetwork = ANN(state_size, action_size)
        self.optimizer = torch.optim.Adam(self.local_qnetwork.parameters(), lr=learning_rate)
        self.memory = ReplayMemory(replay_buffer_size)
        self.t_step = 0
        self.save_dir = save_dir

        # Create directory for saving models if not exists
        if not os.path.exists(save_dir):
            os.makedirs(save_dir)

    def step(self, state, action, reward, next_state, done):
        self.memory.push((state, action, reward, next_state, done))
        self.t_step = (self.t_step + 1) % 4
        if self.t_step == 0:
            if len(self.memory.memory) > minibatch:
                experiences = self.memory.sample(minibatch)
                self.learn(experiences, gamma)

    def get_action(self, state, epsilon):
        state = torch.from_numpy(state).float().unsqueeze(0)
        self.local_qnetwork.eval()
        with torch.no_grad():
            action_values = self.local_qnetwork(state)
        self.local_qnetwork.train()
        if random.random() > epsilon:
            return np.argmax(action_values.cpu().data.numpy())
        else:
            return random.choice(np.arange(self.action_size))

    def learn(self, experiences, gamma):
        states, actions, rewards, next_states, dones = experiences
        next_q_targets = self.target_qnetwork(next_states).detach().max(1)[0].unsqueeze(1)
        q_targets = rewards + (gamma * next_q_targets * (1 - dones))
        q_expected = self.local_qnetwork(states).gather(1, actions)

        loss = F.mse_loss(q_expected, q_targets)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        self.soft_update(self.local_qnetwork, self.target_qnetwork, interpolation_parameter)

    def soft_update(self, local_model, target_model, interpolation_parameter):
        for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
            target_param.data.copy_(interpolation_parameter * local_param.data + (1.0 - interpolation_parameter) * target_param.data)


    def save_model(self, filename="dqn_trained.pth"):
        save_path = os.path.join(self.save_dir, filename)
        torch.save(self.local_qnetwork.state_dict(), save_path)
        print(f"✅ Model saved at: {save_path}")



    def load_model(self, filename="dqn_trained.pth"):
        load_path = os.path.join(self.save_dir, filename)
        if os.path.exists(load_path):
            self.local_qnetwork.load_state_dict(torch.load(load_path))
            self.target_qnetwork.load_state_dict(torch.load(load_path))
            print(f"✅ Model loaded from: {load_path}")
        else:
            print(f"⚠️ No saved model found at {load_path}")


In [None]:
agent = Agent(state_size, action_size)

In [None]:
epsilon = epsilon_starting_value
for episode in range(0, number_episodes):
  state, _ = env.reset() #before running each ep, reset the environment
  score = 0
  for time_step in range(max_time_steps):
    action = agent.get_action(state, epsilon)
    next_state, reward, done, _, _ = env.step(action)
    agent.step(state, action, reward, next_state, done)
    state = next_state
    score += reward
    if done:
      break

  scores_100_episode.append(score) #add the score to the q we have created
  epsilon =  max(eplison_ending_value, epsilon*eplison_decay_value)
  if episode % 10 == 0:
    print('Episode{} Avg: {:.2f}'.format(episode, np.mean(scores_100_episode)))
    if np.mean(scores_100_episode) >= 200.0:
      print('Congratulations, solved in {:d} episodes \tAverage Score: {:.2f}'.format(episode, np.mean(scores_100_episode)))
      break

Episode0 Avg: 199.42
Episode10 Avg: 163.73
Episode20 Avg: 126.94
Episode30 Avg: 100.33
Episode40 Avg: 66.79
Episode50 Avg: 36.71
Episode60 Avg: 2.90
Episode70 Avg: -22.80
Episode80 Avg: -50.14
Episode90 Avg: -78.74
Episode100 Avg: -99.72
Episode110 Avg: -91.28
Episode120 Avg: -77.33
Episode130 Avg: -69.36
Episode140 Avg: -57.77
Episode150 Avg: -48.93
Episode160 Avg: -38.28
Episode170 Avg: -33.47
Episode180 Avg: -19.87
Episode190 Avg: -18.10
Episode200 Avg: -15.20
Episode210 Avg: -6.74
Episode220 Avg: -2.43
Episode230 Avg: 9.16
Episode240 Avg: 14.25
Episode250 Avg: 27.01
Episode260 Avg: 35.79
Episode270 Avg: 46.72
Episode280 Avg: 50.11
Episode290 Avg: 64.79
Episode300 Avg: 81.28
Episode310 Avg: 86.50
Episode320 Avg: 105.17
Episode330 Avg: 113.56
Episode340 Avg: 129.72
Episode350 Avg: 140.75
Episode360 Avg: 149.41
Episode370 Avg: 158.84
Episode380 Avg: 178.68
Episode390 Avg: 183.76
Episode400 Avg: 192.92
Episode410 Avg: 204.39
Congratulations, solved in 410 episodes 	Average Score: 204.3

In [None]:
import glob
import io
import base64
import imageio
import gymnasium as gym
from IPython.display import HTML
from IPython import display as ipythondisplay

def record_agent_video(agent, env_name, output_filename="video.mp4", fps=30):
  """Records a video of an agent interacting with an environment."""
  env = gym.make(env_name, render_mode="rgb_array") # Corrected render_mode
  state, _ = env.reset()
  done = False
  # agent.init_episode() # This method does not exist in the provided Agent class
  frames = []
  while not done:
    frame = env.render()
    frames.append(frame) #capture frame
    action = agent.get_action(state,0.0) #get action from agent
    # Ensure action is an integer before passing to env.step
    if isinstance(action, np.ndarray):
        action = action.item()
    state, reward, done, _, _ = env.step(action)

  env.close()
  imageio.mimsave(output_filename, frames, fps=fps) # save video

def display_video(filename="video.mp4"):
  """Display a  recorded video."""
  try:
    with open(filename, "rb") as video_file:
     encoded_video = base64.b64encode(video_file.read()).decode("ascii")
    ipythondisplay.display(HTML(f"""
  <video alt = "Agent playing" autoplay loop controls style="height:400px">
    <source src="data:video/mp4;base64,{encoded_video}" type="video/mp4">
  </video>
  """))
  except FileNotFoundError:
    print(f"Error: Video file not found!")

#Example Usage
record_agent_video(agent, "LunarLander-v3")
display_video()

