In [1]:
import os
import time
import json
import subprocess
import pyautogui
import socket
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from PIL import Image
import matplotlib.pyplot as plt
from ultralytics import YOLO

device = 'mps' if torch.backends.mps.is_available() else 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Device: {device}")

Device: mps


### Loading the pretrained gameplay model

In [5]:
from models.PokemonModelLSTM import PokemonModelLSTM
model = PokemonModelLSTM()

print("Gameplay model successfully loaded!")
# gameplay_model.eval()

<All keys matched successfully>

In [6]:

# Load in annotation model
annotation_model_pth = "runs/detect/firstRun/weights/best.pt"
annotation_model = YOLO(annotation_model_pth)
print("Annotation model successfully loaded!")

Annotation model successfully loaded


In [7]:
# Data paths
states_dir = ""
actions_dir = ""
annotations_dir = ""

### Setting model hyperparameters

In [8]:
gamma = 0.99            # Discount factor for future rewards
epsilon = 1.0           # Initial exploration rate
epsilon_decay = 0.995   # Decay rate for the exploration probability per episode
min_epsilon = 0.01      # Minimum exploration rate
learning_rate = 0.001   # Rate at which model changes parameters
num_episodes = 1000     # Number of episodes to engage in

# Action mapping for emulator
ACTION_MAP = {
    'A': 'x',
    'B': 'z',
    'X': 's',
    'Y': 'a',
    'Up': 'up',
    'Down': 'down',
    'Left': 'left',
    'Right': 'right'
}

In [4]:
# Importing modular scripts
from models.PokemonModelLSTM import PokemonModelLSTM
from modular_scripts.rlhf_utils import open_emulator, get_feedback, get_state_from_emulator, perform_action, is_at_route_203
open_emulator()


Microphone successfully inited.
DeSmuME 0.9.13 ARM64 NEON-A64
Loaded state 4


#### Examining how epsilon (exploration rate) changes through episodes (training)

### Connecting script to emulator (DeSmuME) via .lua script

In [None]:
# Functions to interact with the emulator
# def send_command_to_lua(command):
#     client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#     client.connect(("localhost", 12345))
#     client.sendall(command.encode())
#     response = client.recv(1024).decode()
#     client.close()
#     return response

def perform_action(action):
    if action in ACTION_MAP:
        response = send_command_to_lua(f"PERFORM_ACTION {ACTION_MAP[action]}")
        print(response)

def get_game_state(output_dir='game_state_screenshots', frame_num=0):
    response = send_command_to_lua("GET_STATE")
    print(response)
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    screenshot = pyautogui.screenshot()
    frame_path = os.path.join(output_dir, f'frame_{frame_num}.png')
    screenshot.save(frame_path)
    print(f"Saved game state to {frame_path}")

### Defining Rewards

In [None]:
def calculate_reward(state, action, next_state):
    # Define rewards based on game events (example logic)
    if 'battle' in state and 'battle' not in next_state:
        return 10  # Reward for winning a battle
    elif 'city' in next_state:
        return 5  # Reward for entering a city
    else:
        return -1  # Small penalty for other actions

# Example reward function with annotations (optional)
def calculate_reward_with_annotations(state, action, next_state, annotations):
    reward = calculate_reward(state, action, next_state)
    if 'desired_object' in annotations:
        reward += 2  # Additional reward for detecting desired object
    return reward

### Collecting Human Feedback

In [None]:
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

def train_rlhf_model(epochs, model, criterion, optimizer, annotation_model):
    for epoch in range(epochs):
        state = get_game_state()
        state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
        
        # Annotate the state using the annotation model
        annotations = annotation_model.predict(state_tensor)
        
        # Decide on action
        action = model(state_tensor)
        action_idx = action.argmax().item()
        action_key = list(ACTION_MAP.keys())[action_idx]
        
        # Execute action
        perform_action(action_key)
        
        # Get next state
        next_state = get_game_state()
        next_state_tensor = torch.tensor(next_state, dtype=torch.float32).unsqueeze(0)
        
        # Calculate reward
        reward = calculate_reward(state, action, next_state)
        
        # Update model
        target = reward + gamma * model(next_state_tensor).max().item()
        loss = criterion(action, torch.tensor([target]))
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        print(f"Epoch {epoch + 1}/{epochs}, Loss: {loss.item()}")

# Example usage
annotation_model = load_annotation_model()
train_rlhf_model(epochs=100, model=model, criterion=criterion, optimizer=optimizer, annotation_model=annotation_model)

In [9]:
# List to store epsilon values
epsilon_values = []

# Simulate epsilon decay over episodes
for episode in range(num_episodes):
    epsilon = max(min_epsilon, epsilon * epsilon_decay)
    epsilon_values.append(epsilon)

# Plotting the epsilon decay
plt.figure(figsize=(10, 6))
plt.plot(epsilon_values, label='Epsilon')
plt.xlabel('Episode')
plt.ylabel('Epsilon Value')
plt.title('Epsilon Decay Over Episodes')
plt.legend()
plt.grid(True)
plt.show()


KeyboardInterrupt: 