In [None]:
import torch.nn as nn
import gymnasium as gym


In [None]:
import gymnasium as gym
import numpy as np
import cv2
from gymnasium.spaces import Box

def preprocess(img):
    img = img[:84, 6:90] # CarRacing-v2-specific cropping
    img = cv2.resize(img, dsize=(84, 84)) # or you can simply use rescaling

    img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY) / 255.0
    return img

class WrapperEnv(gym.Wrapper):
    def __init__(
        self,
        env,
        skip_frames=4,
        stack_frames=4,
        initial_no_op=50,
        **kwargs
    ):
        super(WrapperEnv, self).__init__(env, **kwargs)
        self.initial_no_op = initial_no_op
        self.skip_frames = skip_frames
        self.stack_frames = stack_frames
        
        self.observation_space = Box(
            low=0,
            high=255,
            shape=(4, 84, 84),
            dtype=np.uint8
        )
    def reset(self):
        # Reset the original environment.
        s, info = self.env.reset()

        # Do nothing for the next `self.initial_no_op` steps
        for i in range(self.initial_no_op):
            s, r, terminated, truncated, info = self.env.step(0)

        # Convert a frame to 84 X 84 gray scale one
        s = preprocess(s)

        # The initial observation is simply a copy of the frame `s`
        self.stacked_state = np.tile(s, (self.stack_frames, 1, 1))  # [4, 84, 84]
        return self.stacked_state, info

    def step(self, action):
        # We take an action for self.skip_frames steps
        reward = 0
        for _ in range(self.skip_frames):
            s, r, terminated, truncated, info = self.env.step(action)
            reward += r
            if terminated or truncated:
                break

        # Convert a frame to 84 X 84 gray scale one
        s = preprocess(s)

        # Push the current frame `s` at the end of self.stacked_state
        self.stacked_state = np.concatenate((self.stacked_state[1:], s[np.newaxis]), axis=0)

        return self.stacked_state, reward, terminated, truncated, info

        # Preprocess the observation (permute dimensions)
        obs = self._preprocess(obs)
        return obs


In [None]:
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions import Categorical
class Actor(nn.Module):
    def __init__(self, state_dim, action_dim, activation=F.relu):
        super(Actor, self).__init__()
        self.conv1 = nn.Conv2d(state_dim, 16, kernel_size=8, stride=4)  # [N, 4, 84, 84] -> [N, 16, 20, 20]
        self.conv2 = nn.Conv2d(16, 32, kernel_size=4, stride=2)  # [N, 16, 20, 20] -> [N, 32, 9, 9]
        self.in_features = 32 * 9 * 9
        self.fc1 = nn.Linear(self.in_features, 256)
        self.fc2 = nn.Linear(256, action_dim)
        self.activation = activation

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = x.view((-1, self.in_features))
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        distribution = Categorical(logits=x)
        return distribution
        

class Critic(nn.Module):
    def __init__(self, state_dim, activation=F.relu):
        super(Critic, self).__init__()
        self.conv1 = nn.Conv2d(state_dim, 16, kernel_size=8, stride=4)  # [N, 4, 84, 84] -> [N, 16, 20, 20]
        self.conv2 = nn.Conv2d(16, 32, kernel_size=4, stride=2)  # [N, 16, 20, 20] -> [N, 32, 9, 9]
        self.in_features = 32 * 9 * 9
        self.fc1 = nn.Linear(self.in_features, 256)
        self.fc2 = nn.Linear(256, 1)
        self.activation = activation

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = x.view((-1, self.in_features))
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return x


In [None]:
import torch
from ppo import PPO
env = gym.make("CarRacing-v2", continuous=False)
env = WrapperEnv(env)
obs_dim = env.observation_space.shape[0]
act_dim = env.action_space.n

actor = Actor(4, 5).to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
critic = Critic(4).to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))

agent = PPO(env, actor, critic)

n_iterations = 1000
n_eps_per_iter = 10


In [None]:
agent.train(n_iters=1000)


In [None]:
import matplotlib.pyplot as plt
plt.plot(agent.score_history)
plt.show()