In [1]:
!pip install gymnasium[atari] gymnasium[accept-rom-license] opencv-python > /dev/null 2>&1


In [2]:
import os
import cv2
import torch
import random
import numpy as np
import torch.nn as nn
import gymnasium as gym
import torch.optim as optim
import torch.nn.functional as F
from gymnasium.wrappers import AtariPreprocessing, FrameStack


In [None]:
def record_video(env, agent):
    env.reset()
    fourcc = cv2.VideoWriter_fourcc(*'XVID') 
    video_writer = cv2.VideoWriter('recorded_video.avi', fourcc, 20.0, (env.metadata['render.modes'][0]['width'], env.metadata['render.modes'][0]['height']))
    done = False
    while not done:
        action = agent.select_action(state)
        observation, reward, done, truncated, info = env.step(action)
        frame = env.render(mode='rgb_array')
        frame = np.array(frame)
        video_writer.write(frame)
    video_writer.release()
    env.close()
    

In [3]:
cuda = torch.cuda.is_available()

In [4]:
class network(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(network, self).__init__()
        self.layers = nn.Sequential(
                        nn.Conv2d(in_channels=input_dim, out_channels=16, kernel_size=8, stride=4),
                        nn.ReLU(),
                        nn.Conv2d(in_channels=16, out_channels=32, kernel_size=4, stride=2),
                        nn.ReLU(),
                        nn.Flatten(),
                        nn.Linear(in_features=32*9*9, out_features=256),
                        nn.ReLU(),
                        nn.Linear(in_features=256, out_features=output_dim)
                    )
    def forward(self, x):
        x = x.float()
        return self.layers(x)

In [5]:
class game():
    def __init__(self, game_name):
        self.env = gym.make(f"{game_name}NoFrameskip-v4")
        self.env = AtariPreprocessing(self.env, grayscale_obs=True, scale_obs=False, terminal_on_life_loss=True)
        self.env = FrameStack(self.env, num_stack=4)
    def environment(self):
        return self.env

In [6]:
class experience_replay():
    def __init__(self, buffer_size, batch_size):
        self.buffer_size = buffer_size
        self.batch_size = batch_size
        self.buffer = []
    def update(self, current_experience):
        if(len(self.buffer) == self.buffer_size):
            self.buffer = self.buffer[1:]
        self.buffer.append(current_experience)
    def sample(self):
        batch = random.sample(self.buffer, self.batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        states = torch.tensor(np.array(states)).float()
        actions = torch.tensor(actions).long()
        rewards = torch.tensor(rewards).float()
        next_states = torch.tensor(np.array(next_states)).float()
        dones = torch.tensor(dones).float()
        if torch.cuda.is_available():
            states = states.cuda()
            actions = actions.cuda()
            rewards = rewards.cuda()
            next_states = next_states.cuda()
            dones = dones.cuda()
        return states, actions, rewards, next_states, dones

In [7]:
class agent():
    def __init__(self, input_dim, output_dim, buffer_size=10000, batch_size=32, learning_rate=0.001):
        self.q_network = network(input_dim, output_dim).cuda() if torch.cuda.is_available() else network(input_dim, output_dim)
        self.target_network = network(input_dim, output_dim).cuda() if torch.cuda.is_available() else network(input_dim, output_dim)
        self.optimizer = optim.Adam(self.q_network.parameters(), lr=learning_rate)
        self.experience_replay = experience_replay(buffer_size, batch_size)
        self.tau = 0.005
        self.update_target_network()

    def save_weights(self, path = "/kaggle/working/"):
        torch.save(self.q_network.state_dict(), f"{path}weights.pt")
        
    def update_target_network(self):
        for target_param, local_param in zip(self.target_network.parameters(), self.q_network.parameters()):
            target_param.data.copy_(self.tau * local_param.data + (1.0 - self.tau) * target_param.data)

    def select_action(self, state, epsilon = 0):
        if random.random() <= epsilon:
            return random.randint(0, self.q_network.layers[-1].out_features - 1)
        else:
            with torch.no_grad():
                state = torch.tensor(state).float().unsqueeze(0).cuda() if torch.cuda.is_available() else torch.tensor(state).float().unsqueeze(0)
                q_values = self.q_network(state)
                return q_values.argmax().item()

    def learn(self):
        if len(self.experience_replay.buffer) < self.experience_replay.batch_size:
            return
        states, actions, rewards, next_states, dones = self.experience_replay.sample()

        q_values = self.q_network(states)
        q_values_next = self.target_network(next_states)

        target = q_values.clone()
        target[range(self.experience_replay.batch_size), actions] = rewards + (1 - dones) * 0.99 * q_values_next.max(1)[0]

        loss = F.mse_loss(q_values, target)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        self.update_target_network()
    

In [8]:
class Trainer():
    def __init__(self, game_name, update_frequency = 4, max_frames = 1000000, epsilon_start=1.0, epsilon_end=0.01, epsilon_decay=0.995):
        self.env = game(game_name).environment()
        self.actions = self.env.action_space.n
        self.agent = agent(4, self.actions)
        self.max_frames = max_frames
        self.epsilon_start = epsilon_start
        self.epsilon_end = epsilon_end
        self.epsilon_decay = epsilon_decay
        self.update_frequency = update_frequency
        
    def get_instance(self):
        return self.agent
    
    def get_env(self):
        return self.env
    
    def train(self):
        epsilon = self.epsilon_start
        frames = 0
        episodes = 0
        while(frames < self.max_frames):
            total_reward = 0
            episodes += 1
            state, _ = self.env.reset()
            while(True):
                frames += 1
                state = np.array(state)
                action = self.agent.select_action(state, epsilon)
                next_state, reward, done, truncated, info = self.env.step(action)
                next_state = np.array(next_state)
                self.agent.experience_replay.update([state, action, reward, next_state, done])
                if(frames % self.update_frequency == 0):
                    self.agent.learn()
                state = next_state
                total_reward += reward
                if done or frames >= self.max_frames:
                    break
            epsilon = max(self.epsilon_end, epsilon * self.epsilon_decay)
            if(frames % 10000 == 0):
                print(f"Episode {episodes}, Total Reward: {total_reward}, Epsilon: {epsilon:.4f}")

        self.env.close()

In [None]:
game_name = 'Pong'
trainer = Trainer(game_name)
trainer.train()

A.L.E: Arcade Learning Environment (version 0.8.1+53f58b7)
[Powered by Stella]


In [None]:
trainer.get_instance().save_weights()

In [None]:
agent = trainer.get_instance()

In [None]:
record_video(trainer.get_env(), agent)