## Imports

In [None]:
import os
import json
import numpy as np
import pandas as pd

from typing import Dict, List
from copy import deepcopy

## Set Parameters

In [None]:
RESULTS_PATH = os.path.join(".", "minigrid-results")
SEEDS = [100, 200, 300, 400, 500]
EVAL_EVERY = 1000
STARTING_EVAL_STEP = 2500
END_EVAL_STEP = 100500
NUM_EVAL_EPISODES = 15

## Initialise True and Empirical Transition Probs

In [None]:
TRUE_BLUE_PROBS = [1 / 3, 1 / 3, 1 / 3, 0]  # to: green, middle goal, purple, wrong
TRUE_GREEN_PROBS = [1 / 2, 1 / 2, 0]  # to: top goal, purple, wrong
TRUE_LEFT_PURPLE_PROBS = [1 / 2, 1 / 2, 0]  # to: middle goal, blue, wrong
TRUE_RIGHT_PURPLE_PROBS = [1 / 2, 1 / 2, 0]  # to: middle goal, bottom goal, wrong

initial_empirical_probs = {
    "blue": {
        "green": 0.0,
        "middle": 0.0,
        "purple": 0.0,
        "wrong": 0.0,
    },
    "green": {
        "top": 0.0,
        "purple": 0.0,
        "wrong": 0.0,
    },
    "left_purple": {
        "middle": 0.0,
        "blue": 0.0,
        "wrong": 0.0,
    },
    "right_purple": {
        "middle": 0.0,
        "bottom": 0.0,
        "wrong": 0.0,
    },
}

running_empirical_estimates: List[Dict[str, Dict[str, float]]] = []

## Process One Eval Step

In [None]:
def get_empirical_estimates(
    seed: int, eval_step: int
) -> Dict[str, Dict[str, float]]:
    """Get the empirical transition estimates for the current eval step.

    Args:
        seed (int): experiment seed to load data from
        eval_step (int): current eval step

    Returns:
        Dict[str, Dict[str, float]]: empirical estimates for current eval step
    """

    step_dir = os.path.join(
        RESULTS_PATH,
        f"minigrid-language-no-grad-seed-{seed}-results",
        f"step_{eval_step}",
    )
    transition_estimate = deepcopy(initial_empirical_probs)
    transition_counts = deepcopy(initial_empirical_probs)
    for eval_episode in range(NUM_EVAL_EPISODES):
        with open(
            os.path.join(step_dir, f"eval_episode_{eval_episode}.json"), "r"
        ) as f:
            episode_data = json.load(f)
            transition_data = episode_data["transition_probabilities"]
            # Update blue transition counts and estimates
            if transition_data["blue_green_prob"] != -1:
                transition_estimate["blue"]["green"] += transition_data[
                    "blue_green_prob"
                ]
                transition_counts["blue"]["green"] += 1
                transition_estimate["blue"]["middle"] += transition_data[
                    "blue_middle_prob"
                ]
                transition_counts["blue"]["middle"] += 1
                transition_estimate["blue"]["purple"] += transition_data[
                    "blue_purple_prob"
                ]
                transition_counts["blue"]["purple"] += 1
                transition_estimate["blue"]["wrong"] += transition_data[
                    "blue_wrong_prob"
                ]
                transition_counts["blue"]["wrong"] += 1
            # Update green transition counts and estimates
            if transition_data["green_leftgoal_prob"] != -1:
                transition_estimate["green"]["top"] += transition_data[
                    "green_leftgoal_prob"
                ]
                transition_counts["green"]["top"] += 1
                transition_estimate["green"]["purple"] += transition_data[
                    "green_purple_prob"
                ]
                transition_counts["green"]["purple"] += 1
                transition_estimate["green"]["wrong"] += transition_data[
                    "green_wrong_prob"
                ]
                transition_counts["green"]["wrong"] += 1
            # Update left purple transition counts and estimates
            if transition_data["purple_left_blue_prob"] != -1:
                transition_estimate["left_purple"]["middle"] += transition_data[
                    "purple_left_middle_prob"
                ]
                transition_counts["left_purple"]["middle"] += 1
                transition_estimate["left_purple"]["blue"] += transition_data[
                    "purple_left_blue_prob"
                ]
                transition_counts["left_purple"]["blue"] += 1
                transition_estimate["left_purple"]["wrong"] += transition_data[
                    "purple_left_wrong_prob"
                ]
                transition_counts["left_purple"]["wrong"] += 1
            # Update right purple transition counts and estimates
            if transition_data["purple_right_middle_prob"] != -1:
                transition_estimate["right_purple"]["middle"] += transition_data[
                    "purple_right_middle_prob"
                ]
                transition_counts["right_purple"]["middle"] += 1
                transition_estimate["right_purple"]["bottom"] += transition_data[
                    "purple_right_bottom_prob"
                ]
                transition_counts["right_purple"]["bottom"] += 1
                transition_estimate["right_purple"]["wrong"] += transition_data[
                    "purple_right_wrong_prob"
                ]
                transition_counts["right_purple"]["wrong"] += 1
    # Take the average of the estimates
    for key in transition_estimate.keys():
        for sub_key in transition_estimate[key].keys():
            if transition_counts[key][sub_key] > 0:
                transition_estimate[key][sub_key] /= transition_counts[key][sub_key]
            else:
                transition_estimate[key][sub_key] = -1
    return transition_estimate


test = get_empirical_estimates(100, 3500)
print(test)
del test

## Get probabilities over time

In [None]:
def get_empirical_estimates_for_seed(seed: int) -> List[Dict[str, Dict[str, float]]]:
    """Get the empirical transition estimates for all eval steps for a given seed.

    Args:
        seed (int): experiment seed to load data from

    Returns:
        List[Dict[str, Dict[str, float]]]: empirical estimates for all eval steps
    """

    empirical_estimates = []
    for eval_step in range(STARTING_EVAL_STEP, END_EVAL_STEP + 1, EVAL_EVERY):
        estimate = get_empirical_estimates(seed, eval_step)
        empirical_estimates.append(estimate)
    return empirical_estimates


test = get_empirical_estimates_for_seed(400)
print(test)
print(len(test))
del test

## Calculate TVD for each teleporter

In [None]:
def calculate_tvd_for_empirical_estimate(
    empirical_estimate: Dict[str, Dict[str, float]],
) -> Dict[str, float]:
    """Calculate the total variation distance for a given empirical estimate.

    Args:
        empirical_estimate (Dict[str, Dict[str, float]]): empirical estimate to calculate TVD for

    Returns:
        Dict[str, float]: total variation distance for each state
    """

    tvd = {}
    # Calculate the total variation distance for each state not equal to -1
    tvd["blue"] = (
        0.5
        * np.sum(
            np.abs(
                np.array(list(empirical_estimate["blue"].values()))
                - np.array(TRUE_BLUE_PROBS)
            )
        )
        if empirical_estimate["blue"]["green"] != -1
        else -1
    )
    tvd["green"] = (
        0.5
        * np.sum(
            np.abs(
                np.array(list(empirical_estimate["green"].values()))
                - np.array(TRUE_GREEN_PROBS)
            )
        )
        if empirical_estimate["green"]["top"] != -1
        else -1
    )
    tvd["left_purple"] = (
        0.5
        * np.sum(
            np.abs(
                np.array(list(empirical_estimate["left_purple"].values()))
                - np.array(TRUE_LEFT_PURPLE_PROBS)
            )
        )
        if empirical_estimate["left_purple"]["blue"] != -1
        else -1
    )
    tvd["right_purple"] = (
        0.5
        * np.sum(
            np.abs(
                np.array(list(empirical_estimate["right_purple"].values()))
                - np.array(TRUE_RIGHT_PURPLE_PROBS)
            )
        )
        if empirical_estimate["right_purple"]["bottom"] != -1
        else -1
    )
    return tvd


test = calculate_tvd_for_empirical_estimate(initial_empirical_probs)
print(test)
del test

test = calculate_tvd_for_empirical_estimate(
    get_empirical_estimates(100, 3500)
)
print(test)
del test

## Calculate TVD at each Eval Epoch

In [None]:
def get_tvd_for_seed(seed: int) -> List[Dict[str, float]]:
    """Get the total variation distance for all eval steps for a given seed.

    Args:
        seed (int): experiment seed to load data from

    Returns:
        List[Dict[str, float]]: total variation distance for all eval steps
    """
    
    empirical_estimates = get_empirical_estimates_for_seed(seed)
    tvd = []
    for estimate in empirical_estimates:
        tvd.append(calculate_tvd_for_empirical_estimate(estimate))
    return tvd

test = get_tvd_for_seed(100)
print(test)
print(len(test))
del test

## Compute results for each seed, add to DF and save to CSV

In [None]:
results_df = pd.DataFrame(
    columns=[
        "seed",
        "eval_step",
        "blue_tvd",
        "green_tvd",
        "left_purple_tvd",
        "right_purple_tvd",
        "mean_tvd",
        "mean_tvd_excluding_left_purple",
    ]
)


for seed in SEEDS:

    empirical_estimates = get_empirical_estimates_for_seed(seed)

    tvd = get_tvd_for_seed(seed)

    for eval_step, estimate in enumerate(empirical_estimates):

        tvd_values = tvd[eval_step]
        if any(
            -1 < tvd_values["blue"] < 0
            or -1 < tvd_values["green"] < 0
            or -1 < tvd_values["left_purple"] < 0
            or -1 < tvd_values["right_purple"] < 0
            for tvd_values in tvd
        ):
            print(f"Seed {seed} - Eval Step {eval_step} has negative values in TVD")
        # Take mean excluding -1

        values = np.array(
            [
                tvd_values["blue"],
                tvd_values["green"],
                tvd_values["left_purple"],
                tvd_values["right_purple"],
            ]
        )
        values = values[values != -1]
        mean_tvd = np.mean(values) if len(values) > 0 else np.NaN
        if mean_tvd < 0:
            print(f"Seed {seed} - Eval Step {eval_step} has negative mean TVD")
            raise ValueError(
                f"Seed {seed} - Eval Step {eval_step} has negative mean TVD"
            )

        mean_tvd_excluding_left_purple = np.mean(
            [tvd_values["blue"], tvd_values["green"], tvd_values["right_purple"]]
        )

        results_df = pd.concat(
            [
                results_df,
                pd.DataFrame(
                    [
                        [
                            seed,
                            STARTING_EVAL_STEP + eval_step * EVAL_EVERY,
                            tvd_values["blue"],
                            tvd_values["green"],
                            tvd_values["left_purple"],
                            tvd_values["right_purple"],
                            mean_tvd,
                            mean_tvd_excluding_left_purple,
                        ]
                    ],
                    columns=results_df.columns,
                ),
            ],
            ignore_index=True,
        )


results_df.to_csv("tvd_results.csv", index=False)


# Calculate mean and std across seeds for each eval step

mean_results_df = results_df.groupby("eval_step").agg(["mean", "std"])

print(mean_results_df)

mean_results_df.to_csv("tvd_mean_results.csv", index=True)