**REINFORCE Method**

**Name:** Marcos Augusto Burgos Saavedra

**Student ID:** S4740705

In [None]:
import sys
import os

# Specify the absolute path to the game_models directory
absolute_path_to_game_models = r'G:\Mi unidad\[00 GENERAL\04 Proyectos personales\04 Rubiks-cube - Vaz, Glassenbury, Hendriawan, Fauzan, Burgos\rubiks-cube'
sys.path.insert(0, absolute_path_to_game_models)

In [None]:
import torch
from game_models.rc_entropy_v01 import *
import numpy as np
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.autograd import Variable
import matplotlib.pyplot as plt
import pandas as pd

# Constants
GAMMA = 0.99 # How much do you want the model learn from the new experiences

In [None]:
class PolicyNetwork(nn.Module):
    def __init__(self, num_inputs, num_actions, hidden_size, learning_rate=1e-4):
        super(PolicyNetwork, self).__init__()

        # Number of elements in the action space
        self.num_actions = num_actions
        # Build a model of two layers
        self.linear1 = nn.Linear(num_inputs, hidden_size)
        self.linear2 = nn.Linear(hidden_size, hidden_size)
        self.linear3 = nn.Linear(hidden_size, hidden_size)
        self.linear4 = nn.Linear(hidden_size, hidden_size)
        self.linear5 = nn.Linear(hidden_size, hidden_size)
        self.linear6 = nn.Linear(hidden_size, hidden_size)
        self.linear7 = nn.Linear(hidden_size, hidden_size)
        self.linear8 = nn.Linear(hidden_size, hidden_size)
        self.linear9 = nn.Linear(hidden_size, hidden_size)
        self.linear10 = nn.Linear(hidden_size, num_actions)
        
        # Optimizer
        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)
        # Initialize weights
        self.apply(init_weights)  # Apply the weight initialization
    
    def forward(self, state):
        '''
        Obtain the probabilities for each action based on the state
        '''
        x = F.relu(self.linear1(state))
        x = F.relu(self.linear2(x))
        x = F.relu(self.linear3(x))
        x = F.relu(self.linear4(x))
        x = F.relu(self.linear5(x))
        x = F.relu(self.linear6(x))
        x = F.relu(self.linear7(x))
        x = F.relu(self.linear8(x))
        x = F.relu(self.linear9(x))
        x = F.softmax(self.linear10(x), dim=-1)
        return x
    
    def _get_action(self, state):
        state = torch.from_numpy(state).float() # Prepare state
        probs = self.forward(Variable(state)) # Get the probabilities of using each action
        #print("Probabilities:", probs)  # Debugging line to check for NaNs
        highest_prob_action = np.random.choice(self.num_actions, p=np.squeeze(probs.detach().numpy())) # Randomly select 
        # an action taking into account the probability p -> This for the random nature of the policy
        log_prob = torch.log(probs.squeeze(0)[highest_prob_action]) # Compute the log of the selected action
        return highest_prob_action, log_prob # return the randomly selected action based on the policy and its log
    
    def get_action(self, state, action=None):
        if action != None:
            state = torch.from_numpy(state).float() # Prepare state
            probs = self.forward(Variable(state)) # Get the probabilities of using each action
            highest_prob_action = action
            log_prob = torch.log(probs.squeeze(0)[action])
        else:
            state = torch.from_numpy(state).float() # Prepare state
            probs = self.forward(Variable(state)) # Get the probabilities of using each action
            highest_prob_action = np.random.choice(self.num_actions, p=np.squeeze(probs.detach().numpy())) # Randomly select
            log_prob = torch.log(probs.squeeze(0)[highest_prob_action]) # Compute the log of the selected action
        return highest_prob_action, log_prob
      
def init_weights(m):
    if isinstance(m, nn.Linear):
        torch.nn.init.kaiming_uniform_(m.weight, nonlinearity='relu')
        m.bias.data.fill_(0.01)

In [None]:
def update_policy(policy_network, rewards, log_probs):
    discounted_rewards = []

    # Calculate discounted rewards
    for t in range(len(rewards)):
        Gt = 0
        pw = 0
        for r in rewards[t:]:
            Gt += GAMMA**pw * r
            pw = pw + 1
        discounted_rewards.append(Gt)

    # normalize discounted rewards
    discounted_rewards = torch.tensor(discounted_rewards)
    #discounted_rewards = (discounted_rewards - discounted_rewards.mean()) / (discounted_rewards.std() + 1e-9)

    # Calculate the policy_gradient
    policy_gradient = []
    for log_prob, Gt in zip(log_probs, discounted_rewards):
        policy_gradient.append(-log_prob * Gt)

    policy_network.optimizer.zero_grad() # Start the NN
    policy_gradient = torch.stack(policy_gradient).sum()
    policy_gradient.backward() # Derivate
    policy_network.optimizer.step() # Update theta

In [None]:
def exponential_decay(initial_value, decay_rate, episode):
    return initial_value * np.exp(-decay_rate * episode)

In [None]:
def validate_performance(policy_net, max_number_scrambles, number_moves_allowed, number_games):
    game = RC_entropy(max_number_scrambles, number_moves_allowed)
    completed_games = 0
    for _ in range(number_games):
        state, _ = game.reset()
        for steps in range(number_moves_allowed*10):
            action, _ = policy_net.get_action(state, action=None)
            new_state, reward, terminated, truncated, completed = game.step(action)
            if completed or truncated:
                state=new_state
                if completed: completed_games += 1
                break
            state=new_state
    return completed_games/number_games

In [None]:
# Build the environment
env = RC_entropy(max_number_scrambles=30, number_moves_allowed=100)

# Create the policy Network
#policy_net = PolicyNetwork(num_inputs=env.environment_space, num_actions=env.action_space, hidden_size=128)
policy_net = torch.load('policy_net.pth')

# Define the variables
max_episode_num = 1000*(10**3)*1000 # I want an status each 1000 games
max_steps = 1000 # It does not mind since it will truncate before always
all_lengths = []
average_lengths = []
all_rewards = []
completed_games = 0 # Count the number of episodes that conclude the Rubiks cube
model_alone = 0

print("Start of training\n")

for episode in range(583001, max_episode_num):
    # Start a new episode
    #print(terminated, truncated, completed)
    #print("\nNew Episode\n") 
    state, best_actions = env.reset()
    
    #print("\nenter")
    #print(state)
    state = np.array(state)
    log_probs = []
    rewards = []

    # Calculate decayed expert probability
    initial_expert_prob = 1.0  # Initial probability of using expert actions
    decay_rate = 10**(-9)  # Decay rate for the expert probability
    expert_prob = exponential_decay(initial_expert_prob, decay_rate, episode)

    if np.random.rand() < expert_prob:
        use_best_actions = True
    else:
        model_alone += 1
        use_best_actions = False

    for steps in range(max_steps):

        if use_best_actions:
            action, log_prob = policy_net.get_action(state, action=best_actions[steps])
        else:
            action, log_prob = policy_net.get_action(state, action=None)

        new_state, reward, terminated, truncated, completed = env.step(action)
        log_probs.append(log_prob)
        rewards.append(reward)

        if completed or truncated:
            update_policy(policy_net, rewards, log_probs)
            # Storage important information
            all_lengths.append(steps+1)
            average_lengths.append(np.mean(average_lengths[-10:]))
            all_rewards.append(np.sum(rewards))
            if completed: completed_games+=1
            break
        
        state = new_state
    
    show_and_save = 1000

    scramble_val = None
    performance_val = None
    if (episode+1) % (show_and_save*10) == 0:
        for scramble_val in range(1,51):
            performance_val = validate_performance(policy_net, max_number_scrambles=scramble_val, number_moves_allowed=100, number_games = 100)
            if performance_val < 0.8:
                break
        scramble_val = scramble_val
        performance_val = performance_val*100

    if (episode+1) % show_and_save == 0:
        torch.save(policy_net, 'policy_net.pth')
    
    if (episode+1) % show_and_save == 0:
        print(f"\nBunch of episodes number: {(episode+1)//show_and_save}")
        # Data in a dictionary format where the keys are column names
        data = {
            'Min Reward:': [np.min(all_rewards)],
            'Mean Reward': [np.mean(all_rewards)],
            'Std Reward': [np.std(all_rewards)],
            'Max Reward:': [np.max(all_rewards)],
            'Min length': [np.min(all_lengths)],
            'Mean length': [np.mean(all_lengths)],
            'Std length': [np.std(all_lengths)],
            'Max length': [np.max(all_lengths)],
            'Completed_games': [completed_games],
            'rate': [expert_prob],
            'Model alone': [model_alone],
            'Best scramble': [scramble_val],
            '% Performance': [performance_val]
            }

        # Creating DataFrame from the dictionary
        df = pd.DataFrame(data)
        #print(df)

        # Append the DataFrame to a text file
        with open('progress_reinforce.txt', 'a') as file:  # 'a' is for append mode
            file.write(df.to_string(index=False))
            file.write("\n")  # Add extra newline for separation between entries

        all_lengths = []
        average_lengths = []
        all_rewards = []
        completed_games = 0
        model_alone = 0

print("\nEnd of training")

In [None]:
validate_performance(max_number_scrambles=10, number_moves_allowed=100, number_games = 1000)