<a href="https://colab.research.google.com/github/vandat2614/Deep-RL/blob/main/PolicyGradient_CartPole.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
! pip install --quiet git+https://github.com/ntasfi/PyGame-Learning-Environment.git
!pip install --quiet git+https://github.com/simoninithomas/gym-games
!pip install --quiet numpy==1.23.5

  Preparing metadata (setup.py) ... [?25l[?25hdone
  Preparing metadata (setup.py) ... [?25l[?25hdone


# CartPole Environment

In [2]:
import gym
import gym_pygame

In [5]:
env_id = "CartPole-v1"
env = gym.make(env_id)

In [6]:
action_size = env.action_space.n
state_size = env.observation_space.shape[0]

print("The State Space is: ", state_size)
print("The Action Space is: ", action_size)

The State Space is:  4
The Action Space is:  2


# Policy Net

In [7]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

In [8]:
class CartPoleAgent(nn.Module):
  def __init__(self, state_size, action_size, hidden_size):
    super(CartPoleAgent, self).__init__()

    self.fc1 = nn.Linear(state_size, hidden_size)
    self.fc2 = nn.Linear(hidden_size, action_size)

    self.device = "cuda" if torch.cuda.is_available() else "cpu"
    self.to(self.device)

  def forward(self, x):
    x = F.relu(self.fc1(x))
    x = F.relu(self.fc2(x))
    x = F.softmax(x, dim=1)
    return x

  def act(self, state):
    state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)
    probs = self.forward(state).cpu()
    m = Categorical(probs)
    action = m.sample()
    return action.item(), m.log_prob(action)

# Training

In [9]:
import numpy as np
from collections import deque

In [25]:
def train(agent, env, num_episodes, max_step, gamma, lr, print_per_episodes):

  scores = []
  optimizer = optim.Adam(agent.parameters(), lr)

  for episode in range(1, num_episodes + 1):

    state = env.reset()
    rewards, log_probs = [], []

    for step in range(max_step):
      action, log_prob = agent.act(state)
      state, reward, done, _ = env.step(action)

      rewards.append(reward)
      log_probs.append(log_prob)

      if done: break

    scores.append(sum(rewards))

    returns = deque(maxlen = max_step)
    num_steps = len(rewards)

    for t in range(num_steps)[::-1]:
      discount_return = returns[0] if len(returns) > 0 else 0
      returns.appendleft(gamma * discount_return + rewards[t])

    returns = torch.tensor(returns)
    eps = np.finfo(np.float32).eps.item()
    returns = (returns - returns.mean()) / (returns.std() + eps)

    loss = []
    for log_prob, discount_return in zip(log_probs, returns):
      loss.append(-log_prob * discount_return)

    optimizer.zero_grad()
    loss = torch.cat(loss).sum()

    loss.backward()
    optimizer.step()

    if episode % print_per_episodes == 0:
      print(f"Episode: {episode} \t Average Score: {np.mean(scores[-print_per_episodes:]):.2f}")

In [48]:
hyperparameters = {
    "env_id" : env_id,
    "state_size" : state_size,
    "action_size" : action_size,
    "hidden_size" : 16,
    "training_episodes" : 1000,
    "evaluate_episodes" : 100,
    "max_step" : 1000,
    "gamma" : 0.99,
    "lr" : 1e-2,
    "print_per_episodes" : 100
}

In [28]:
agent = CartPoleAgent(
    state_size = hyperparameters["state_size"],
    action_size = hyperparameters["action_size"],
    hidden_size = hyperparameters["hidden_size"]
)

In [29]:
train(
    agent = agent,
    env = env,
    num_episodes = hyperparameters["training_episodes"],
    max_step = hyperparameters["max_step"],
    gamma = hyperparameters["gamma"],
    lr = hyperparameters["lr"],
    print_per_episodes = hyperparameters["print_per_episodes"]
)

Episode: 100 	 Average Score: 29.53
Episode: 200 	 Average Score: 151.70
Episode: 300 	 Average Score: 404.04
Episode: 400 	 Average Score: 491.29
Episode: 500 	 Average Score: 382.82
Episode: 600 	 Average Score: 299.17
Episode: 700 	 Average Score: 454.65
Episode: 800 	 Average Score: 475.69
Episode: 900 	 Average Score: 384.81
Episode: 1000 	 Average Score: 493.07


# Evaluate

In [30]:
def evaluate(agent, env, num_episodes, max_step):

  scores = []
  for episode in range(1, num_episodes + 1):

    state = env.reset()
    rewards = []

    for step in range(max_step):
      action, _ = agent.act(state)

      state, reward, done, _ = env.step(action)
      rewards.append(reward)

      if done: break

    scores.append(sum(rewards))

  mean_score = np.mean(scores)
  std_score = np.std(scores)

  return mean_score, std_score

In [31]:
mean, std = evaluate(
    agent = agent,
    env = env,
    num_episodes = hyperparameters["evaluate_episodes"],
    max_step = hyperparameters["max_step"]
)

In [32]:
mean, std

(500.0, 0.0)

# Play Video

In [33]:
import os
import imageio

from IPython.display import HTML
from base64 import b64encode

In [34]:
def create_record(agent, env, filepath, fps=30):

    if os.path.exists(filepath):
        os.remove(filepath)

    images = []
    done = False

    state = env.reset()
    img = env.render(mode='rgb_array')
    images.append(img)

    while not done:
        action, _ = agent.act(state)
        state, reward, done, _ = env.step(action)
        img = env.render(mode='rgb_array')
        images.append(img)

    imageio.mimsave(filepath, images, fps=fps)

In [35]:
def show_record(filepath):
    mp4 = open(filepath,'rb').read()
    data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
    return HTML(f"""
        <video width=480 controls>
            <source src="{data_url}" type="video/mp4">
        </video>
    """)

In [54]:
create_record(agent, env, 'record.mp4')



In [55]:
show_record("record.mp4")

# Push to Hub

In [43]:
from huggingface_hub import HfApi, login
from huggingface_hub.repocard import metadata_eval_result, metadata_save

import json
import datetime
from pathlib import Path

import tempfile
from google.colab import userdata

In [50]:
def push_to_hub(repo_id, agent, env, hyperparameters, video_fps=30):

  _, repo_name = repo_id.split("/")
  api = HfApi()

  # Step 1: Create the repo
  repo_url = api.create_repo(
        repo_id=repo_id,
        exist_ok=True,
  )

  with tempfile.TemporaryDirectory() as tmpdirname:
    local_directory = Path(tmpdirname)

    # Step 2: Save the model
    torch.save(agent, local_directory / "model.pt")

    # Step 3: Save the hyperparameters to JSON
    with open(local_directory / "hyperparameters.json", "w") as outfile:
      json.dump(hyperparameters, outfile)

    # Step 4: Evaluate the model and build JSON
    mean_reward, std_reward = evaluate(
        agent,
        env,
        num_episodes=hyperparameters["evaluate_episodes"],
        max_step=hyperparameters["max_step"]
    )

    eval_datetime = datetime.datetime.now()
    eval_form_datetime = eval_datetime.isoformat() # ISO 8601, ex 2025-05-21T21:35:42.123456. T ngăn cách giữa ngày và giờ, giờ:phút:giây.microsecond

    evaluate_data = {
          "env_id": hyperparameters["env_id"],
          "mean_reward": mean_reward,
          "n_evaluation_episodes": hyperparameters["evaluate_episodes"],
          "eval_datetime": eval_form_datetime,
    }

    with open(local_directory / "results.json", "w") as outfile:
        json.dump(evaluate_data, outfile)

    # Step 5: Create the model card
    env_name = hyperparameters["env_id"]

    metadata = {}
    metadata["tags"] = [
          env_name,
          "reinforce",
          "reinforcement-learning",
          "custom-implementation",
          "deep-rl-class"
      ]

    # Add metrics
    eval = metadata_eval_result(
        model_pretty_name=repo_name,
        task_pretty_name="reinforcement-learning",
        task_id="reinforcement-learning",
        metrics_pretty_name="mean_reward",
        metrics_id="mean_reward",
        metrics_value=f"{mean_reward:.2f} +/- {std_reward:.2f}",
        dataset_pretty_name=env_name,
        dataset_id=env_name,
      )

    # Merges both dictionaries
    metadata = {**metadata, **eval} # gộp 2 dict thành 1 dict mới

    model_card = f"""
  # **Reinforce** Agent playing **{env_id}**
  This is a trained model of a **Reinforce** agent playing **{env_id}** .
  To learn to use this model and train yours check Unit 4 of the Deep Reinforcement Learning Course: https://huggingface.co/deep-rl-course/unit4/introduction
  My name is VanDat =))
  """

    readme_path = local_directory / "README.md"
    readme = ""
    if readme_path.exists():
        with readme_path.open("r", encoding="utf8") as f:
          readme = f.read()
    else:
      readme = model_card

    with readme_path.open("w", encoding="utf-8") as f:
      f.write(readme)

    # Save our metrics to Readme metadata
    metadata_save(readme_path, metadata)

    # Step 6: Record a video
    video_path =  local_directory / "replay.mp4"
    create_record(agent, env, video_path, video_fps)

    # Step 7. Push everything to the Hub
    api.upload_folder(
          repo_id=repo_id,
          folder_path=local_directory,
          path_in_repo=".",
    )

    print(f"Your model is pushed to the Hub. You can view your model here: {repo_url}")

In [51]:
login(token=userdata.get("HF_TOKEN"))

In [52]:
push_to_hub(
    repo_id="vandat2601/Reinforce-CartPole",
    agent=agent,
    env=env,
    hyperparameters=hyperparameters,
    video_fps=30
)

See here for more information: https://www.gymlibrary.ml/content/api/[0m
  deprecation(


model.pt:   0%|          | 0.00/3.26k [00:00<?, ?B/s]

Your model is pushed to the Hub. You can view your model here: https://huggingface.co/vandat2601/Reinforce-CartPole
