In [1]:
import os
import re
import gymnasium as gym
import time
import copy
import random
import warnings
import numpy as np
import cv2
from time import sleep
sleep(0.0416)

import torch
import torchvision
import torch.optim as optim
import torch.nn as nn

from IPython import display
from skimage.color import rgb2gray
from skimage.transform import rescale
from matplotlib import pyplot as plt
from tqdm import tqdm_notebook as tqdm
from collections import deque, namedtuple

In [2]:
def preprocess_frame(frame):
    # If frame is a tuple, extract the first element (assuming it's the image)
    if isinstance(frame, tuple):
        frame = frame[0]  # Adjust the index based on the structure
    
    # Convert to grayscale and rescale
    gray = rgb2gray(frame)
    scaled = rescale(gray, 0.5, anti_aliasing=True)
    return scaled


In [3]:
class DQN(nn.Module):
    def __init__(self, input_shape, n_actions):
        super(DQN, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU(),
        )
        # self.fc = nn.Sequential(
        #     nn.Linear(64 * 7 * 7, 512),
        #     nn.ReLU(),
        #     nn.Linear(512, n_actions)
        # )
        self.fc = nn.Sequential(
            nn.Linear(3456, 512),  # Adjust input size to match flattened convolutional output
            nn.ReLU(),
            nn.Linear(512, n_actions)
        )


    def forward(self, x):
        x = self.conv(x)
        x = x.view(x.size(0), -1)
        return self.fc(x)


In [4]:
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))

class ReplayMemory:
    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)
    
    def push(self, *args):
        self.memory.append(Transition(*args))
    
    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)
    
    def __len__(self):
        return len(self.memory)

In [5]:
# Hyperparameters
BATCH_SIZE = 32
GAMMA = 0.99
EPSILON = 0.05  # Exploration probability
TARGET_UPDATE = 10  # How often to update the target network
MEMORY_CAPACITY = 10000

# Setup the Atari environment
env = gym.make('Pong-v4', render_mode="human")
n_actions = env.action_space.n
print(env.unwrapped.get_action_meanings())

# Create networks
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
policy_net = DQN((1, 84, 84), n_actions).to(device)
target_net = DQN((1, 84, 84), n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

# Replay memory
memory = ReplayMemory(MEMORY_CAPACITY)

# Optimizer
optimizer = optim.Adam(policy_net.parameters())

['NOOP', 'FIRE', 'RIGHT', 'LEFT', 'RIGHTFIRE', 'LEFTFIRE']


In [6]:
import numpy as np

class PongEnv:
    def __init__(self):
        # Initialize your environment, paddles, ball, etc.
        self.reset()  # Initialize the state
        
    def reset(self):
        # Reset the environment to the initial state
        self.ball_position = np.array([0.0, 0.0])  # Example initial position
        self.paddle_position = np.array([0.0, 0.0])
        self.done = False
        return self.get_state()  # Return the initial state

    def get_state(self):
        # Return the current state of the game (e.g., positions of ball and paddles)
        return np.concatenate((self.ball_position, self.paddle_position))

    def step(self, action):
        # Update the game state based on the action
        self.update_game_state(action)
        
        # Check the ball position to determine the reward
        ball_x = self.ball_position[0]
        reward = 0
        
        if ball_x < -1:  # Assuming -1 is the left boundary
            reward = -1  # Enemy scores
            self.done = True  # Episode ends
        elif ball_x > 1:  # Assuming 1 is the right boundary
            reward = 1  # Agent scores
            self.done = True  # Episode ends
        else:
            # Reward for hitting the ball or no change
            reward = 0  # Could be modified for hitting logic
        
        return self.get_state(), reward, self.done, {}  # Return next state, reward, done flag, info

    def update_game_state(self, action):
        # Implement logic to update ball and paddle positions based on action
        # Example:
        if action == 0:  # Move paddle up
            self.paddle_position[1] += 1  # Update paddle position
        elif action == 1:  # Move paddle down
            self.paddle_position[1] -= 1  # Update paddle position
        
        # Update the ball position (dummy example)
        self.ball_position[0] += 0.1  # Move ball to the right for simplicity


In [7]:
def select_action(state, steps_done):
    sample = random.random()
    eps_threshold = EPSILON
    if sample > eps_threshold:
        with torch.no_grad():
            return policy_net(state).max(1)[1].view(1, 1)
    else:
        return torch.tensor([[random.randrange(n_actions)]], device=device, dtype=torch.long)

In [8]:
def optimize_model():
    if len(memory) < BATCH_SIZE:
        return
    transitions = memory.sample(BATCH_SIZE)
    batch = Transition(*zip(*transitions))

    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)
    next_state_batch = torch.cat(batch.next_state)

    state_action_values = policy_net(state_batch).gather(1, action_batch)

    next_state_values = target_net(next_state_batch).max(1)[0].detach()
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    loss = nn.SmoothL1Loss()(state_action_values, expected_state_action_values.unsqueeze(1))

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

In [9]:
def reward_function(observation, action, next_observation):
    reward = 0

    # Example reward structure based on game events
    if next_observation['score'] > observation['score']:
        reward += 1  # Positive reward for increasing the score
    elif next_observation['lives'] < observation['lives']:
        reward -= 1  # Negative reward for losing a life
    elif action == "shoot" and next_observation['enemy_defeated']:
        reward += 5  # Reward for defeating an enemy
    
    return reward


In [10]:
num_episodes = 50
for i_episode in range(num_episodes):
    state = env.reset()
    state = preprocess_frame(state)
    state = torch.tensor(state, device=device, dtype=torch.float32).unsqueeze(0).unsqueeze(0)
    
    for t in range(10000):
        env.render()
        action = select_action(state, t)
        next_state, reward, done, _, _= env.step(action.item())
        print(reward)
        #cv2.imshow("Game", next_state)
        
        
        next_state = preprocess_frame(next_state)
        next_state = torch.tensor(next_state, device=device, dtype=torch.float32).unsqueeze(0).unsqueeze(0)
        reward = torch.tensor([reward], device=device)


        memory.push(state, action, next_state, reward)

        state = next_state

        optimize_model()

        #if cv2.waitKey(1) & 0xFF == ord('q'):
            #break

        if done:
            break
    
    if i_episode % TARGET_UPDATE == 0:
        target_net.load_state_dict(policy_net.state_dict())

env.close()

  logger.warn(


KeyboardInterrupt: 