**Imports and global declarations**


In [None]:
# z5629534, Hoang-Nam Tran

import random 
from env import StaticGridEnv
import numpy as np
import matplotlib.pyplot as plt
import json
import pandas as pd
import seaborn as sn
import utils

#seeding for reproducibility
random.seed(42)

#environment variables
numActions = 4
validActions = [0, 1, 2, 3]
numStates = 10*10 #10x10 grid

**Defining methods for metrics calculation/display**


In [2]:
def plotCumulativeReward(cumulativeReward):
    
    #Plotting cumulative reward (y-axis) vs episodes (x-axis)
    
    plt.plot(cumulativeReward)
    plt.xlabel('Episodes')
    plt.ylabel('Cumulative Reward')
    plt.title('Cumulative Reward vs Episodes')
    plt.show()

def calculateSuccessRate(numSuccess, numEpisodes):
    
    #Calculating success rate
    
    successRate = (numSuccess / numEpisodes) * 100
    return successRate
    
    
def calculateAverageLearningSpeed(stepsPerEpisode):
    
    #Calculating average learning speed
    
    averageLearningSpeed = 1 / np.mean(stepsPerEpisode)
    return averageLearningSpeed

def calculateAverageRewardPerEpisode(totalRewardsPerEpisode):
    
    #Calculating average reward per episode
    
    averageRewardPerEpisode = np.mean(totalRewardsPerEpisode)
    return averageRewardPerEpisode

**Task 1: Training agent with Q-learning using epsilon-greedy method**

In [3]:
# Initialize the Q-learning environment
envQ = StaticGridEnv(42)

# Initialize the Q-table, states are 1D
q = np.zeros((numStates, numActions))

# Params for Q-learning
epsilonQ = 0.1
alphaQ = 0.8    # Learning rate
gammaQ = 0.95   # Discount factor

# Constraints
numEpisodesQ = 1000
maxStepsPerEpisodeQ = 100


# Cumulative metrics
totalRewardsPerEpisodeQ = []
stepsPerEpisodeQ = []
successfulEpisodesQ = 0

for episodeQ in range(numEpisodesQ):
    
    # Reset the environment and metrics for the episode, flatten the 2D state to 1D
    twoDimStateQ = envQ.reset()
    stateQ = twoDimStateQ[0]*10 + twoDimStateQ[1]
    doneQ = False
    totalRewardsQ = 0
    stepsQ = 0
    actionQ = 0
    
    
    for stepQ in range(maxStepsPerEpisodeQ):
        # Choose an action (epsilon-greedy policy)
        if random.uniform(0, 1) > epsilonQ: # Exploit
            actionQ =  np.argmax(q[stateQ, :])
        else: # Explore
            actionQ = random.choice(validActions)

        # Take the action
        nextStateTwoDimQ, rewardQ, doneQ, _ = envQ.step(actionQ)
        nextStateQ = nextStateTwoDimQ[0]*10 + nextStateTwoDimQ[1]

        # Update Q-table
        q[stateQ, actionQ] = q[stateQ, actionQ] + alphaQ * (rewardQ + gammaQ * np.max(q[nextStateQ, :]) - q[stateQ, actionQ])

        # Update metrics
        totalRewardsQ += rewardQ
        stepsQ += 1

        # Render the environment (optional)
        #envQ.render(episode=episodeQ, learning_type="Q-learning")

        # Transition to the next state
        twoDimStateQ = nextStateTwoDimQ
        stateQ = nextStateQ

        if doneQ:
            successfulEpisodesQ += 1
            break

    # Update cumulative metrics
    totalRewardsPerEpisodeQ.append(totalRewardsQ)
    stepsPerEpisodeQ.append(stepsQ)

**Performance metrics and plotting cumulative reward (Q-learning)**

**Saving metrics in JSON-file**

In [None]:
# Calculate performance metrics
successRateQ = calculateSuccessRate(successfulEpisodesQ, numEpisodesQ)
averageRewardPerEpQ = calculateAverageRewardPerEpisode(totalRewardsPerEpisodeQ)
averageLearningSpeedQ = calculateAverageLearningSpeed(stepsPerEpisodeQ)

print("Agent 1 – Q-Learning")

print("Success Rate (%): ", successRateQ)
print("Average Reward per Episode: ", averageRewardPerEpQ)
print("Average Learning Speed: ", averageLearningSpeedQ)

# Save performance metrics to a JSON file
metrics = {
    "Success Rate (%)": successRateQ,
    "Average Reward per Episode": averageRewardPerEpQ,
    "Average Learning Speed": averageLearningSpeedQ,
}

with open("metricsQ.json", "w") as fQ:
    json.dump(metrics, fQ, indent=4)

# Plot cumulative reward vs episodes
plotCumulativeReward(totalRewardsPerEpisodeQ)

**Task 2: Training agent with SARSA using epsilon-greedy method**

In [5]:
#Initialize the SARSA environment
envS = StaticGridEnv(42)

# Initialize the Q-table, states are 1D
sarsa = np.zeros((numStates, numActions))


# Params
epsilonS = 0.1
alphaS = 0.9    # Learning rate
gammaS = 0.8   # Discount factor

# Constraints
maxEpisodesS = 1000
maxStepsPerEpisodeS = 100


# Cumulative metrics
totalRewardsPerEpisodeS = []
stepsPerEpisodeS = []
successfulEpisodesS = 0

for episodeS in range(maxEpisodesS):
    
    # Reset the environment and metrics for the episode, flatten the 2D state to 1D
    twoDimStateS = envS.reset()
    stateS = twoDimStateS[0]*10 + twoDimStateS[1]
    doneS = False
    totalRewardsS = 0
    stepsS = 0
    actionS = 0
    nextActionS = 0
    
    
    # Choose an action (epsilon-greedy policy)
    if random.uniform(0, 1) > epsilonS: # Exploit
        actionS =  np.argmax(sarsa[stateS, :])
    else: # Explore
        actionS = random.choice(validActions)
    
    for stepS in range(maxStepsPerEpisodeS):
        
        # Take the action
        nextStateTwoDimS, rewardS, doneS, _ = envS.step(actionS)
        nextStateS = nextStateTwoDimS[0]*10 + nextStateTwoDimS[1]
        
        # Choose next action (epsilon-greedy policy)
        if random.uniform(0, 1) > epsilonS: # Exploit
            nextActionS =  np.argmax(sarsa[nextStateS, :])
        else: # Explore
            nextActionS = random.choice(validActions)

        # Update Q-table
        sarsa[stateS, actionS] = sarsa[stateS, actionS] + alphaS * (rewardS + gammaS * sarsa[nextStateS, nextActionS] - sarsa[stateS, actionS])

        # Update metrics
        totalRewardsS += rewardS
        stepsS += 1

        # Render the environment (optional)
        #envS.render(episode=episodeS, learning_type="Q-learning")

        # Transition to the next state and action
        twoDimStateS = nextStateTwoDimS
        stateS = nextStateS
        actionS = nextActionS

        if doneS:
            successfulEpisodesS += 1
            break

    # Update cumulative metrics
    totalRewardsPerEpisodeS.append(totalRewardsS)
    stepsPerEpisodeS.append(stepsS)

**Performance metrics and plotting cumulative reward (SARSA)**

**Saving metrics in JSON-file**


In [None]:
# Calculate performance metrics
successRateS = calculateSuccessRate(successfulEpisodesS, maxEpisodesS)
averageRewardPerEpisodeS = calculateAverageRewardPerEpisode(totalRewardsPerEpisodeS)
averageLearningSpeedS = calculateAverageLearningSpeed(stepsPerEpisodeS)

print("Agent 2 – SARSA")

print("Success Rate (%): ", successRateS)
print("Average Reward per Episode: ", averageRewardPerEpisodeS)
print("Average Learning Speed: ", averageLearningSpeedS)


# Save performance metrics to a JSON file
metricsS = {
    "Success Rate (%)": successRateS,
    "Average Reward per Episode": averageRewardPerEpisodeS,
    "Average Learning Speed": averageLearningSpeedS,
}

with open("metricsS.json", "w") as fS:
    json.dump(metricsS, fS, indent=4)
    
# Plot cumulative reward vs episodes
plotCumulativeReward(totalRewardsPerEpisodeS)

**Implementing Teacher Feedback Mechanism with possible values for availability/accuracy according to given assignment**

In [7]:
def provideTeacherAdvice(task1, currentState, availability, accuracy):
    
    # Choose teacher trained with Q-learning (Task1) or SARSA (Task2)
    # task1: True for Q-learning, False for SARSA
    if task1:
        teacherTable = q
    else:
        teacherTable = sarsa
    
    # In case of correct advice, choose the action with the highest Q-value    
    correctAction = np.argmax(teacherTable[currentState, :])
    
    # In case of incorrect advice, choose a random action from the remaining actions
    otherActions = [0, 1, 2, 3]
    otherActions.remove(correctAction)
    possibleAction = random.choice(otherActions)
    
    if random.uniform(0, 1) < availability:
        if random.uniform(0, 1) < accuracy:
            # Return correct advice
            return correctAction
        else:
            # Return incorrect advice
            return possibleAction
    else:
        # Return no advice (-1 is not valid action)
        return -1
    

availability = [0.2, 0.4, 0.6, 0.8, 1.0]
accuracy = [0.2, 0.4, 0.6, 0.8, 1.0]

**Task 3: Defining method to train student using the agent trained in Task 1 (Q-learning) as teacher**

In [8]:
# a is availability, b is accuracy

# Initialize the Q-table for the student, states are 1D
studentQ = np.zeros((numStates, numActions))

# Initialize the data frame to store the performance metrics
dataFrameQ = pd.DataFrame(columns=['Availability', 'Accuracy', 'Avg Reward', 'Success Rate (%)', 'Avg Learning Speed'])

# Initialize the data frame for the heatmap, accuracy is y-axis => index, availability is x-axis => columns
dataFrameHeatQ = pd.DataFrame(index=accuracy, columns=availability)


def studentQTraining(a, b):
    # Cumulative metrics
    successfulEpStudentQ = 0
    totalRewPerEpStudentQ = []
    stepsPerEpStudentQ = []
    
    for episodeStudQ in range(numEpisodesQ):
        
        # Reset the environment and metrics for the episode, flatten the 2D state to 1D
        twoDimStateStudQ = envQ.reset()
        stateStudQ = twoDimStateStudQ[0]*10 + twoDimStateStudQ[1]
        doneStudQ = False
        totalRewardsStudQ = 0
        stepsStudQ = 0
        actionStudQ = 0
            
        for stepStudQ in range(maxStepsPerEpisodeQ):
            # Ask teacher for advice, param task1 = True for teacher from Task1
            actionStudQ = provideTeacherAdvice(True, stateStudQ, a, b)
            
            # If no advice is provided, choose an action with epsilon-greedy policy
            if actionStudQ == -1:
                if random.uniform(0, 1) > epsilonQ: # Exploit
                    actionStudQ =  np.argmax(studentQ[stateStudQ, :])
                else: # Explore
                    actionStudQ = random.choice(validActions)
                
            # Take the action
            nextStateTwoDimStudQ, rewardStudQ, doneStudQ, _ = envQ.step(actionStudQ)
            nextStateStudQ = nextStateTwoDimStudQ[0]*10 + nextStateTwoDimStudQ[1]

            # Update student Q-table
            studentQ[stateStudQ, actionStudQ] = studentQ[stateStudQ, actionStudQ] + alphaQ * \
                (rewardStudQ + gammaQ * np.max(studentQ[nextStateStudQ, :]) - studentQ[stateStudQ, actionStudQ])

            # Update metrics
            totalRewardsStudQ += rewardStudQ
            stepsStudQ += 1
            

            # Transition to the next state
            twoDimStateStudQ = nextStateTwoDimStudQ
            stateStudQ = nextStateStudQ

            if doneStudQ:
                successfulEpStudentQ += 1
                break

        # Update cumulative metrics
        totalRewPerEpStudentQ.append(totalRewardsStudQ)
        stepsPerEpStudentQ.append(stepsStudQ)

    
    # Calculate performance metrics    
    successRateStudQ = calculateSuccessRate(successfulEpStudentQ, numEpisodesQ)
    averageRewardPerEpStudQ = calculateAverageRewardPerEpisode(totalRewPerEpStudentQ)
    averageLearningSpeedStudQ = calculateAverageLearningSpeed(stepsPerEpStudentQ)
    
    # Save performance metrics to the data frame
    dataFrameQ.loc[len(dataFrameQ)] = [a, b, averageRewardPerEpStudQ, successRateStudQ, averageLearningSpeedStudQ]
    
    # Save average reward to heatmap df, flip b and a because index is accuracy and column is availability
    dataFrameHeatQ.loc[b,a] = averageRewardPerEpStudQ

**Conducting teacher-assisted training for all specified availability/accuracy values (Q-learning)**

**Saving metrics, displaying DataFrame, and plotting the avg reward heatmap**

In [None]:
# Loop through all combinations of availability and accuracy
for a in availability:
    for b in accuracy:
        studentQTraining(a, b)

# Close the Q-learning environment
envQ.close()

# Save the dataframe to csv
dataFrameQ.to_csv('dataFrameQlearning.csv', index=False)


# Convert the heatmap data frame to numeric values for heatmap
dataFrameHeatQ = dataFrameHeatQ.apply(pd.to_numeric)

# Plot the heatmap
plt.figure(figsize=(10, 8))
sn.heatmap(dataFrameHeatQ, annot=True, fmt=".3f", cmap="crest")
plt.title("Average Reward for Different Teacher Availability and Accuracy (Q-learning Teacher)")
plt.xlabel("Availability")
plt.ylabel("Accuracy")
plt.show()

display(dataFrameQ)

**Task 4: Defining method to train student using the agent trained in Task 2 (SARSA) as teacher**

In [10]:
# a is availability, b is accuracy

# Initialize the Q-table for the student, states are 1D
studentSarsa = np.zeros((numStates, numActions))

# Initialize the data frame to store the performance metrics
dataFrameSarsa = pd.DataFrame(columns=['Availability', 'Accuracy', 'Avg Reward', 'Success Rate (%)', 'Avg Learning Speed'])

# Initialize the data frame for the heatmap, accuracy is y-axis => index, availability is x-axis => columns
dataFrameHeatSarsa = pd.DataFrame(index=accuracy, columns=availability)

def studentSarsaTraining(a,b):
    # Cumulative metrics
    totalRewardsPerEpStudS = []
    stepsPerEpStudS = []
    successfulEpStudS = 0

    for episodeStudS in range(maxEpisodesS):
        
        # Reset the environment and metrics for the episode, flatten the 2D state to 1D
        twoDimStateStudS = envS.reset()
        stateStudS = twoDimStateStudS[0]*10 + twoDimStateStudS[1]
        doneStudS = False
        totalRewardsStudS = 0
        stepsStudS = 0
        actionStudS = 0
        nextActionStudS = 0
        
        # Ask teacher for advice, param task1 = False for teacher from Task2
        actionStudS = provideTeacherAdvice(False, stateStudS, a, b)
        
        # If no advice is provided, choose an action with epsilon-greedy policy
        if actionStudS == -1:
            if random.uniform(0, 1) > epsilonS: # Exploit
                actionStudS =  np.argmax(studentSarsa[stateStudS, :])
            else: # Explore
                actionStudS = random.choice(validActions)
        
        for stepStudS in range(maxStepsPerEpisodeS):
            

            # Take the action
            nextStateTwoDimStudS, rewardStudS, doneStudS, _ = envS.step(actionStudS)
            nextStateStudS = nextStateTwoDimStudS[0]*10 + nextStateTwoDimStudS[1]
            
            
            # Ask teacher for advice on the next action
            nextActionStudS = provideTeacherAdvice(False, nextStateStudS, a, b)
            
            # If no advice is provided, choose an action (epsilon-greedy policy)
            if nextActionStudS == -1:
                if random.uniform(0, 1) > epsilonS: # Exploit
                    nextActionStudS =  np.argmax(studentSarsa[nextStateStudS, :])
                else: # Explore
                    nextActionStudS = random.choice(validActions)

            # Update Q-table
            studentSarsa[stateStudS, actionStudS] = studentSarsa[stateStudS, actionStudS] + \
                alphaS * (rewardStudS + gammaS * studentSarsa[nextStateStudS, nextActionStudS] - studentSarsa[stateStudS, actionStudS])

            # Update metrics
            totalRewardsStudS += rewardStudS
            stepsStudS += 1

            # Transition to the next state
            twoDimStateStudS = nextStateTwoDimStudS
            stateStudS = nextStateStudS
            actionStudS = nextActionStudS

            if doneStudS:
                successfulEpStudS += 1
                break

        # Track cumulative metrics
        totalRewardsPerEpStudS.append(totalRewardsStudS)
        stepsPerEpStudS.append(stepsStudS)
        

    # Calculate performance metrics
    successRateStudS = calculateSuccessRate(successfulEpStudS, maxEpisodesS)
    averageRewardPerEpStudS = calculateAverageRewardPerEpisode(totalRewardsPerEpStudS)
    averageLearningSpeedStudS = calculateAverageLearningSpeed(stepsPerEpStudS)
    
    # Save performance metrics to the data frame
    dataFrameSarsa.loc[len(dataFrameSarsa)] = [a, b, averageRewardPerEpStudS, successRateStudS, averageLearningSpeedStudS]
    
    # Save average reward to heatmap df, flip b and a because index is accuracy and column is availability
    dataFrameHeatSarsa.loc[b,a] = averageRewardPerEpStudS

**Conducting teacher-assisted training for all specified availability/accuracy values (SARSA)**

**Saving metrics, displaying DataFrame, and plotting the avg reward heatmap**

In [None]:
# Loop through all combinations of availability and accuracy
for a in availability:
    for b in accuracy:
        studentSarsaTraining(a, b)

# Close the SARSA environment
envS.close()

# Save the dataframe to csv
dataFrameSarsa.to_csv('dataFrameSarsa.csv', index=False)

# Convert the heatmap data frame to numeric values for heatmap
dataFrameHeatSarsa = dataFrameHeatSarsa.apply(pd.to_numeric)

# Plot the heatmap
plt.figure(figsize=(10, 8))
sn.heatmap(dataFrameHeatSarsa, annot=True, fmt=".3f", cmap="crest")
plt.title("Average Reward for Different Teacher Availability and Accuracy (SARSA Teacher)")
plt.xlabel("Availability")
plt.ylabel("Accuracy")
plt.show()

display(dataFrameSarsa)

**Q-learning: loading results from Task 1 and 3, performance comparison of teacher mechanism with baseline**

In [None]:
# Load the dataframe from Task3
dfLoadQ = pd.read_csv('dataFrameQlearning.csv')

# display(dfLoadQ)

# Open and read the JSON file from Task1
with open('metricsQ.json', 'r') as fileQ:
    baseQ = json.load(fileQ)

# Convert to a tuple    
baseQTuple = (baseQ['Average Reward per Episode'], baseQ['Success Rate (%)'], baseQ['Average Learning Speed'])

print("Performance Comparison with and without Teacher Advice for 100% Availability (Q-learning)")

# Use plot_comparison_with_baseline function from utils.py
utils.plot_comparison_with_baseline(availability=1.0, df_learning=dfLoadQ, baseline_learning=baseQTuple, algorithm='Q-learning')

**SARSA: loading results from Task 2 and 4, performance comparison of teacher mechanism with baseline**

In [None]:
# Load the dataframe from Task4
dfLoadSarsa = pd.read_csv('dataFrameSarsa.csv')

# display(dfLoadSarsa)

# Open and read the JSON file from Task2
with open('metricsS.json', 'r') as fileS:
    baseS = json.load(fileS)

# Convert to a tuple
baseSTuple = (baseS['Average Reward per Episode'], baseS['Success Rate (%)'], baseS['Average Learning Speed'])

print("Performance Comparison with and without Teacher Advice for 100% Availability (SARSA)")

# Use plot_comparison_with_baseline function from utils.py
utils.plot_comparison_with_baseline(availability=1.0, df_learning=dfLoadSarsa, baseline_learning=baseSTuple, algorithm='SARSA')