In [None]:
import os
import pandas as pd
import json
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.stats import ttest_ind

In [None]:
def get_infractions(record):
    infraction_counts = {
        'collisions_layout': 0,
        'collisions_pedestrian': 0,
        'collisions_vehicle': 0,
        'outside_route_lanes': 0,
        'red_light': 0,
        'route_dev': 0,
        'route_timeout': 0,
        'stop_infraction': 0,
        'vehicle_blocked': 0
    }
    for key, value in record["infractions"].items():
        infraction_counts[key] += len(value)
    return infraction_counts

In [None]:
def get_route_df(exp_path):
    # Define the column names
    columns = [
        'model_name', 'model_number', 'run', 'route', 'score_composed', 'score_penalty', 'score_route',
        'collisions_layout', 'collisions_pedestrian', 'collisions_vehicle', 'outside_route_lanes',
        'red_light', 'route_dev', 'route_timeout', 'stop_infraction', 'vehicle_blocked'
    ]

    # Create an empty DataFrame with the specified columns
    df = pd.DataFrame(columns=columns)

    for run in os.listdir(exp_path):
        for model in os.listdir(f"{exp_path}/{run}"):
            model_name = model.split("_mn_")[0]
            model_number = model.split("_mn_")[1]
            results_path = f"{exp_path}/{run}/{model}/results.json"
            # Read json file
            with open(results_path) as f:
                results = json.load(f)
            if len(results["values"]) == 0:
                print(f"No results in {results_path}")
                continue
            for idx in range(10):
                record = results['_checkpoint']['records'][idx]
                infraction_counts = get_infractions(record)
                scores = record["scores"]
                
                row = [model_name, model_number, run, idx]
                row.extend(list(scores.values()))
                row.extend(list(infraction_counts.values()))
                df.loc[len(df)] = row
    return df

In [None]:
def plot_dist(stop_inf_no_pl, stop_inf_pl):
    data = pd.DataFrame({
        'No Pl': stop_inf_no_pl,
        'Pl': stop_inf_pl
    })
    # Plotting the histograms
    plt.figure(figsize=(14, 6))

    plt.subplot(1, 2, 1)
    sns.histplot(stop_inf_no_pl, kde=True, color='blue', label='No Pl', bins=10)
    sns.histplot(stop_inf_pl, kde=True, color='red', label='Pl', bins=10)
    plt.legend()
    plt.title('Histogram of No Pl and Pl')

    # Plotting the box plots
    plt.subplot(1, 2, 2)
    sns.boxplot(data=data)
    plt.title('Box Plot of No Pl and Pl')

    plt.tight_layout()
    plt.show()

In [None]:
def ttest(stop_inf_no_pl, stop_inf_pl, print_results=False):
    t_statistic, p_value = ttest_ind(stop_inf_no_pl, stop_inf_pl)
    data = pd.DataFrame({
        'No Pl': stop_inf_no_pl,
        'Pl': stop_inf_pl
    })
    aux = data.aggregate(['mean', 'std'])
    improvement = (aux["No Pl"]["mean"] - aux["Pl"]["mean"]) / aux["No Pl"]["mean"] * 100
    
    if print_results:
        print(f"t-statistic: {t_statistic}")
        print(f"P-value: {p_value}")
        if p_value < 0.05:
            print("There is a significant difference!")
        print("---")
        print(f"No Pl: {aux['No Pl']['mean']}")
        print(f"Pl: {aux['Pl']['mean']}")
        print(f"Improvement: {improvement:.2f}%")
    
    return t_statistic, p_value, improvement

# Table

In [None]:
attributes = ["score_composed", "collisions_layout", "collisions_pedestrian", "collisions_vehicle", "outside_route_lanes", "red_light", "route_dev", "route_timeout", "stop_infraction", "vehicle_blocked"]
avgs = {
    "pl": {},
    "nopl": {}
}
for att in attributes:
    avgs["pl"][att] = 0
    avgs["nopl"][att] = 0

results_df = pd.DataFrame(columns=["Epochs", "\\approach", *attributes])
studies = ["5e5_5e", "5e5_10e", "5e5_15e"]
base_path = "./results_summary/"
for study in studies:
    n_epochs = int(study.split("_")[1].split("e")[0])
    model_df = {}
    for model_type in ["pl", "nopl"]:
        study_path = base_path + f"final_{model_type}_5e5_{n_epochs}e"
        route_df = get_route_df(study_path)
        model_df[model_type] = route_df.groupby(['model_name', 'model_number', 'run']).agg(['mean'])
    row1 = [n_epochs, "\\cmark"]
    row2 = [n_epochs, "\\xmark"]
    for att in attributes:
        pl_data = model_df["pl"][att]["mean"].tolist()
        nopl_data = model_df["nopl"][att]["mean"].tolist()
        statistics, p_value, improvement = ttest(nopl_data, pl_data)
        s1 = f"{model_df['pl'][att]['mean'].mean():.2f} $\\pm$ {model_df['pl'][att]['mean'].std():.2f}"
        avgs["pl"][att] += model_df['pl'][att]['mean'].mean()
        s2 = f"{model_df['nopl'][att]['mean'].mean():.2f} $\\pm$ {model_df['nopl'][att]['mean'].std():.2f}"
        avgs["nopl"][att] += model_df['nopl'][att]['mean'].mean()
        if att == "score_composed":
            improvement = improvement * -1
        if improvement > 0:
            s1 = "\\textbf{" + s1 + "}"
        else:
            s2 = "\\textbf{" + s2 + "}"                
        if p_value < 0.05:
            if improvement > 0:
                s1 = "\\underline{" + s1 + "}"
            else:
                s2 = "\\underline{" + s2 + "}"
        row1.append(s1)
        row2.append(s2)
    results_df.loc[len(results_df)] = row1
    results_df.loc[len(results_df)] = row2
for att in attributes:
    avgs["pl"][att] /= len(studies)
    avgs["nopl"][att] /= len(studies)
# Summary
s1 = [f"{avgs['pl'][att]:.2f}" for att in attributes]
row1 = ["Avg", "\\cmark"] + ["\\multicolumn{1}{c}{" + s1[idx] + "}" for idx in range(len(attributes))]
results_df.loc[len(results_df)] = row1
s2 = [f"{avgs['nopl'][att]:.2f}" for att in attributes]
row2 = ["Avg", "\\xmark"] + ["\\multicolumn{1}{c}{" + s2[idx] + "}" for idx in range(len(attributes))]
results_df.loc[len(results_df)] = row2

In [None]:
results_df

In [None]:
results_df = results_df.loc[:, (results_df != '0.00').any(axis=0)]
results_df = results_df.reindex(columns=["Epochs", "\\approach", "stop_infraction", "score_composed", "collisions_vehicle", "red_light", "vehicle_blocked", "route_timeout"])
results_df.rename(columns={'score_composed': 'Avg. Driving Score','stop_infraction': 'Stop sign infractions' , 'collisions_vehicle': 'Collisions Vehicles', 'red_light': 'Red Light Infractions', 'vehicle_blocked': 'Agent Blocked', 'route_timeout': 'Route Timeout'}, inplace=True)
results_df

In [None]:
print(results_df.to_latex(index=False, escape=False))

# Property Loss

In [None]:
exp_path = "results_summary/final_pl_5e5_10e"
df_pl = get_route_df(exp_path)

In [None]:
df_pl_scores = df_pl.groupby(['model_name', 'model_number', 'run']).agg(['mean'])[['score_composed', 'score_route', 'score_penalty']]
# Remove the 'mean' row
df_pl_scores = df_pl_scores.reset_index(level=0, drop=True)
# Rename the columns
df_pl_scores.columns = ['driving_score', 'route_completion', 'infraction_penalty']
# Reset the index
df_pl_scores.reset_index(inplace=True)
# df_pl_scores

# No Property Loss

In [None]:
exp_path = "results_summary/final_nopl_5e5_10e"
df_no_pl = get_route_df(exp_path)

In [None]:
df_no_pl_scores = df_no_pl.groupby(['model_name', 'model_number', 'run']).agg(['mean'])[['score_composed', 'score_route', 'score_penalty']]
# Remove the 'mean' row
df_no_pl_scores = df_no_pl_scores.reset_index(level=0, drop=True)
# Rename the columns
df_no_pl_scores.columns = ['driving_score', 'route_completion', 'infraction_penalty']
# Reset the index
df_no_pl_scores.reset_index(inplace=True)
# df_no_pl_scores

# Non Zero infractions

In [None]:
non_zero_columns = df_no_pl.loc[:, (df_no_pl != 0).any(axis=0)]
non_zero_columns = non_zero_columns.iloc[:, 7:].columns.tolist()
non_zero_columns

In [None]:
results = {}
for infraction_type in non_zero_columns:
    # No Pl
    inf_no_pl = df_no_pl.groupby(['model_name', 'model_number','run']).agg(['sum'])[[infraction_type]]
    inf_no_pl = inf_no_pl[infraction_type]['sum'].to_list()
    # Pl
    inf_pl = df_pl.groupby(['model_name', 'model_number','run']).agg(['sum'])[[infraction_type]]
    inf_pl = inf_pl[infraction_type]['sum'].to_list()
    
    statistics, p_value, improvement = ttest(inf_no_pl, inf_pl)
    if p_value < 0.05:
        results[infraction_type] = f"Significant - {'Improve' if improvement > 0 else 'Worsen'} by {improvement:.2f}%"
    else:
        results[infraction_type] = f"Not Significant - {'Improve' if improvement > 0 else 'Worsen'} by {improvement:.2f}%"

In [None]:
results_df = pd.DataFrame.from_dict(results, orient='index', columns=['Results'])
results_df

In [None]:
infraction_type = "stop_infraction"

# No Pl
inf_no_pl = df_no_pl.groupby(['model_name', 'model_number','run']).agg(['sum'])[[infraction_type]]
inf_no_pl = inf_no_pl[infraction_type]['sum'].to_list()
# Pl
inf_pl = df_pl.groupby(['model_name', 'model_number','run']).agg(['sum'])[[infraction_type]]
inf_pl = inf_pl[infraction_type]['sum'].to_list()

ttest(inf_no_pl, inf_pl, print_results=True)
plot_dist(inf_no_pl, inf_pl)

## Carla Avg Score

In [None]:
# No Pl
no_pl_score = df_no_pl_scores.groupby(['model_number','run']).agg(['first'])[['driving_score']]
no_pl_score = no_pl_score['driving_score']['first'].to_list()
# Pl
pl_score = df_pl_scores.groupby(['model_number','run']).agg(['first'])[['driving_score']]
pl_score = pl_score['driving_score']['first'].to_list()

ttest(no_pl_score, pl_score, print_results=True)
plot_dist(no_pl_score, pl_score)