In [1]:
import gymnasium as gym
from pyboy import PyBoy, WindowEvent
from gym import spaces
import numpy as np
import os
class GameBoyEnv(gym.Env):
    metadata = {'render.modes': ['human', 'rgb_array']}
    gameboy_buttons = [
            WindowEvent.PRESS_BUTTON_A, WindowEvent.PRESS_BUTTON_B,
            WindowEvent.PRESS_ARROW_UP, WindowEvent.PRESS_ARROW_DOWN,
            WindowEvent.PRESS_ARROW_LEFT, WindowEvent.PRESS_ARROW_RIGHT
        ]
    class FrameStorage:
        def __init__(self):
            self.unique_frame_hashes = set()

        def __len__(self):
            # Return the number of unique frames
            return len(self.unique_frame_hashes)

        def add_frame(self, observation):
            # Convert the frame to a byte string and then hash it
            frame_hash = hash(observation.tostring())
            reward = 1 if frame_hash not in self.unique_frame_hashes else 0
            self.unique_frame_hashes.add(frame_hash)
            return reward

    def __init__(self):
        super(GameBoyEnv, self).__init__()
        self.pyboy = PyBoy("roms/Super Mario Land (JUE) (V1.1) [!].gb")  # Use headless mode to prevent GUI issues
        self.action_space = spaces.Discrete(6)  # Updated to match the number of buttons used in step
        self.observation_space = spaces.Box(low=0, high=255, shape=(160, 144, 3), dtype=np.uint8)
        self.unique_frames = self.FrameStorage()
        self.initialize_game()

    def initialize_game(self):
        self.pyboy.set_emulation_speed(0)  # 0 for max speed
        self.pyboy.send_input(WindowEvent.PRESS_BUTTON_START)
        for _ in range(100):  # Slight increase in tick count to ensure game starts
            self.pyboy.tick()
        
        if(not os.path.exists("initial_state.state")):
            self.pyboy.save_state(open("initial_state.state", "wb"))        

    def step(self, action):
        self.gameboy_buttons = [
            WindowEvent.PRESS_BUTTON_A, WindowEvent.PRESS_BUTTON_B,
            WindowEvent.PRESS_ARROW_UP, WindowEvent.PRESS_ARROW_DOWN,
            WindowEvent.PRESS_ARROW_LEFT, WindowEvent.PRESS_ARROW_RIGHT
        ]
        if action < len(self.gameboy_buttons):
            self.pyboy.send_input(self.gameboy_buttons[action])
        self.pyboy.tick()

        # Get the current frame from the emulator
        observation = self.pyboy.botsupport_manager().screen().screen_ndarray()

        # Additional rewards
        level_reward = self.get_level_reward()
        coin_reward = self.get_coin_reward()
        score_reward = self.get_score_reward()
        frame_reward = self.get_frame_reward(self.unique_frames.add_frame(observation))
        death_penalty = self.get_death_penalty_or_reward()
        # Sum up all rewards
        reward = frame_reward + level_reward + coin_reward + score_reward + death_penalty

        # Check if the game is over
        done = self.check_game_over()

        # Additional info (optional)
        info = {}

        return observation, reward, done, info
    
    def get_death_penalty_or_reward(self):
        lives = self.pyboy.get_memory_value(0xDA15)
        return (lives-3)*0.5

    def get_frame_reward(self, frame_reward):
        return frame_reward * 0.001
    
    def get_level_reward(self):
        current_world = self.pyboy.get_memory_value(0x982C)
        current_stage = self.pyboy.get_memory_value(0x982E)
        level = current_world * 4 + current_stage  # Simplistic level calculation
        return level * 0.1  # Adjust the reward scaling as needed

    def get_coin_reward(self):
        # Read the total amount of coins
        coins = self.pyboy.get_memory_value(0xFFFA)
        return coins * 0.05  # Adjust the reward scaling as needed
    
    def get_score(self):
        score_bytes = [self.pyboy.get_memory_value(addr) for addr in (0xC0A0, 0xC0A1, 0xC0A2)]
        score = bcd_to_int(score_bytes)
        return score

    def get_score_reward(self):
        return self.get_score() * 0.001  # Adjust the reward scaling as needed

    def reset(self):

        self.pyboy.load_state(open("initial_state.state", "rb"))
        # self.pyboy.reset_game()
        self.initialize_game()
        self.unique_frames = self.FrameStorage()  # Reset the frame storage
        return self.pyboy.botsupport_manager().screen().screen_ndarray()

    def check_game_over(self):
        lives_address = 0xDA15
        lives = self.pyboy.get_memory_value(lives_address)
        return lives <= 0

    def close(self):
        self.pyboy.stop()

    def render(self, mode='human'):
        if mode == 'rgb_array':
            return self.pyboy.botsupport_manager().screen().screen_ndarray()
        elif mode == 'human':
            img = self.pyboy.botsupport_manager().screen().screen_ndarray()
            plt.imshow(img)
            plt.show()
        
        
def bcd_to_int(bcd_bytes):
    """
    Convert a sequence of BCD (Binary-Coded Decimal) bytes to an integer.
    bcd_bytes: A list or tuple of bytes representing the BCD value, 
               ordered from most significant byte to least significant byte.
    """
    total_value = 0
    # Process each BCD byte starting from the most significant byte
    for byte in bcd_bytes:
        # Shift the total value by a decimal place (multiply by 10) for each nibble (4 bits) in the byte
        for nibble_shift in (4, 0):
            nibble_value = (byte >> nibble_shift) & 0xF
            total_value = total_value * 10 + nibble_value
    return total_value



In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
from collections import deque
import gym

class DQN(nn.Module):
    def __init__(self, input_shape, n_actions):
        super(DQN, self).__init__()
        self.input_dim = np.prod(input_shape)  # Calculate the total input dimension
        
        self.net = nn.Sequential(
            nn.Linear(self.input_dim, 128),  # Adjust the input dimension
            nn.ReLU(),
            nn.Linear(128, n_actions)
        )

    def forward(self, x):
        x = x.float() / 255.0  # Normalize input
        x = x.view(x.size(0), -1)  # Flatten the input
        return self.net(x)

class ImprovedDQN(nn.Module):
    def __init__(self, input_shape, n_actions):
        super(ImprovedDQN, self).__init__()
        self.input_dim = np.prod(input_shape)  # Calculate the total input dimension
        
        self.net = nn.Sequential(
            nn.Linear(self.input_dim, 128),
            nn.LeakyReLU(0.01),  # Use LeakyReLU for potentially faster convergence
            nn.Linear(128, 256),  # Added layer for more complex function approximation
            nn.LeakyReLU(0.01),  # Use LeakyReLU again
            nn.Linear(256, n_actions)
        )
        
        self.apply(self.init_weights)  # Apply weight initialization

    def init_weights(self, m):
        if type(m) == nn.Linear:
            nn.init.xavier_uniform_(m.weight)  # Xavier initialization
            m.bias.data.fill_(0.01)  # Initialize biases to a small value

    def forward(self, x):
        x = x.float() / 255.0  # Normalize input
        x = x.view(x.size(0), -1)  # Flatten the input
        return self.net(x)

In [3]:
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        state, action, reward, next_state, done = zip(*random.sample(self.buffer, batch_size))
        return np.array(state), action, reward, np.array(next_state), done
    
    def clear(self):
        self.buffer.clear()

    def __len__(self):
        return len(self.buffer)


In [4]:
import matplotlib.pyplot as plt
from IPython.display import clear_output
class Agent:
    def __init__(self, env):
        self.env = env
        self.replay_buffer = ReplayBuffer(100000)
        self.state_shape = env.observation_space.shape
        self.n_actions = env.action_space.n
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        self.model = ImprovedDQN(self.state_shape, self.n_actions).to(self.device)
        self.target_model = ImprovedDQN(self.state_shape, self.n_actions).to(self.device)  # Use ImprovedDQN for consistency
        self.optimizer = optim.Adam(self.model.parameters())
        self.loss_fn = nn.MSELoss()

        self.gamma = 0.99
        self.batch_size = 64
        self.epsilon = 1.0
        self.epsilon_decay = 0.995
        self.epsilon_min = 0.01
        self.update_target_every = 30

        # For live plotting
        self.total_rewards = []
        self.epsilons = []

    def act(self, state):
        if random.random() < self.epsilon:
            return self.env.action_space.sample()
        state_t = torch.FloatTensor(state).unsqueeze(0).to(self.device)
        q_values = self.model(state_t)
        return q_values.max(1)[1].item()

    def plot(self):
        clear_output(wait=True)
        plt.figure(figsize=(20, 5))
        
        plt.subplot(1, 2, 1)
        plt.title("Total Reward per Episode")
        plt.plot(self.total_rewards)
        plt.xlabel("Episode")
        plt.ylabel("Total Reward")
        
        plt.subplot(1, 2, 2)
        plt.title("Epsilon Value Over Time")
        plt.plot(self.epsilons)
        plt.xlabel("Episode")
        plt.ylabel("Epsilon")

        plt.tight_layout()
        plt.show()

    def update(self):
        if len(self.replay_buffer) < self.batch_size:
            return
        state, action, reward, next_state, done = self.replay_buffer.sample(self.batch_size)

        state = torch.FloatTensor(state).to(self.device)
        next_state = torch.FloatTensor(next_state).to(self.device)
        action = torch.LongTensor(action).unsqueeze(-1).to(self.device)
        reward = torch.FloatTensor(reward).to(self.device)
        done = torch.FloatTensor(done).to(self.device)

        q_values = self.model(state).gather(1, action).squeeze(-1)
        next_q_values = self.target_model(next_state).max(1)[0]
        expected_q_values = reward + self.gamma * next_q_values * (1 - done)

        loss = self.loss_fn(q_values, expected_q_values.detach())
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def train(self, num_episodes):
        for episode in range(num_episodes):
            state = self.env.reset()
            total_reward = 0
            done = False

            while not done:
                action = self.act(state)
                next_state, reward, done, _ = self.env.step(action)
                self.replay_buffer.push(state, action, reward, next_state, done)

                state = next_state
                total_reward += reward

                self.update()

            # if episode % self.update_target_every == 0:
            #     self.target_model.load_state_dict(self.model.state_dict())

            if episode % self.update_target_every == 0:
                self.target_model.load_state_dict(self.model.state_dict())

            self.total_rewards.append(total_reward)
            self.epsilons.append(self.epsilon)

            if episode % 10 == 0:  # Update the plot every 10 episodes
                self.plot()

            print(f"Episode: {episode}, Total reward: {total_reward}, Epsilon: {self.epsilon}")


            if episode % 50 == 0:
                torch.save(self.model.state_dict(), "checkpoints/model_"+str(episode)+".pth")
            

print( torch.cuda.is_available())
env = GameBoyEnv()
agent = Agent(env)

agent.train(1000)  # Train for 1000 episodes


True


  _torch_pytree._register_pytree_node(
  frame_hash = hash(observation.tostring())


NameError: name 'clear_output' is not defined