In [100]:
import os
from natsort import natsorted as nsort
import pandas as pd
from logger.test_log import load_test_log
from logger.evaluation_log import load_eval_log
import matplotlib.pyplot as plt
import ipywidgets as widgets
from IPython.display import display
import itertools

In [101]:
def highlight_rows(s, column, color, min = None, max = None):
    is_fit = pd.Series(data=False, index=s.index)
    is_fit[column] = True

    if min is not None:
        is_fit[column] &= s.loc[column] >= min
    
    if max is not None:
        is_fit[column] &= s.loc[column] < max
        
    return [f'background-color: {color}' if is_fit.any() else '' for v in is_fit]

In [102]:
N_CS = '10'

# Show Test Log

In [103]:
Base_Algorithms = ["PPO", 
              "PPO_CAPG",
              "PPO_MAXA", 
              "PPO_TANH", 
              "SB3_TRPO",
            ]

In [104]:
Algorithms = []

for base in Base_Algorithms:
    Algorithms.append(base)
    Algorithms.append(base + "_USER")
    Algorithms.append(base + "_GRID")
    Algorithms.append(base + "_TOTAL_SUM")
    Algorithms.append(base + "_TOTAL_MIN")

In [105]:
columns = ["Algorithm",
           "Testing environment",
           "Reward(e3)",
           "Profits / Costs(€)", 
           "User satisfaction(%)",
           "Energy Charged(kWh)",
           "Energy Discharged(kWh)",
           "Transformer Overload(kWh)",
           "Total Battery Capacity Loss(e-3)",
           "Total Battery Degradation Calender(e-3)",
           "Total Battery Degradation Cycle(e-3)",
           "Execution Time of episodes(s)",
           ]

In [None]:
test_array = []
max_rewards = []

for algo in Algorithms:
    file = nsort(os.listdir(f"log/test/{N_CS}/{algo}"))[-1]
    if file.endswith(".pkl"):
        test_logs = load_test_log(f"log/test/{N_CS}/{algo}/{file}")
        for i in range(len(test_logs)):

            if len(max_rewards) <= i:
                max_rewards.append([])

            max_rewards[i].append(test_logs[i]["reward"])

            if len(test_array) <= i: 
                test_array.append([])

            if len(max_rewards[i]) > 1:
                max_rewards[i] = sorted(max_rewards[i], reverse=True)[:3]
                
            test_array[i].append({
                "Algorithm": algo,
                "Testing environment": i+1,
                "Profits / Costs(€)": test_logs[i]["profits"],
                "User satisfaction(%)": test_logs[i]["user_satisfaction"],
                "Energy Charged(kWh)": test_logs[i]["energy_charged"],
                "Energy Discharged(kWh)": test_logs[i]["energy_discharged"],
                "Transformer Overload(kWh)": test_logs[i]["transformer_overload"],
                "Total Battery Capacity Loss(e-3)": test_logs[i]["battery_degradation"],
                "Total Battery Degradation Calender(e-3)": test_logs[i]["battery_degradation_calendar"],
                "Total Battery Degradation Cycle(e-3)": test_logs[i]["battery_degradation_cycling"],
                "Execution Time of episodes(s)": test_logs[i]["execution_time"],
                "Reward(e3)": test_logs[i]["reward"],
            })

test_dfs = []

for i, test_logs in enumerate(test_array):
    test_dfs.append(
        pd.DataFrame(test_logs, 
                     columns=columns
                     ).style.apply(highlight_rows, 
                                   min=max_rewards[i][0],
                                   column='Reward(e3)', 
                                   color="#0000CC", 
                                   axis=1)\
                      .apply(highlight_rows,
                                   min=max_rewards[i][1],
                                   max=max_rewards[i][0],
                                   column='Reward(e3)', 
                                   color="#000099", 
                                   axis=1)\
                      .apply(highlight_rows,
                                   min=max_rewards[i][2],
                                   max=max_rewards[i][1],
                                   column='Reward(e3)', 
                                   color="#000066", 
                                   axis=1)
                    )

for i, test_df in enumerate(test_dfs):
    display(test_df)

# Evaluate Graph

In [107]:
algorithm_evals = []

for algo in Algorithms:
    file = nsort(os.listdir(f"log/eval/{N_CS}/{algo}"))[-1]
    if file.endswith(".pkl"):
        eval_logs = load_eval_log(f"log/eval/{N_CS}/{algo}/{file}")
        algorithm_evals.append(eval_logs)

In [108]:
def draw_rewards_graphs(rewards_list, title="Title", group_size=10, skip=0, ignore_list=None, interactive=True):
    if ignore_list is None:
        ignore_list = []

    # Generate a fixed color map for algorithms
    color_cycle = plt.cm.tab10.colors  # Use a colormap with 10 distinct colors
    algorithm_colors = {name: color for name, color in zip([r[1] for r in rewards_list], itertools.cycle(color_cycle))}

    def plot_graph(group_size, skip, ignore_list):
        plt.figure(figsize=(10, 6))
        plt.ylabel('reward')
        plt.xlabel(title)
        plt.title(title)

        for rewards in rewards_list:
            if rewards[1] in ignore_list:
                continue
            values = rewards[0][skip:]
            color = algorithm_colors[rewards[1]]  # Get the fixed color for the algorithm
            plt.plot([sum(values[i * group_size: (i + 1) * group_size]) / group_size for i in range(int(len(values) / group_size))], 
                     label=rewards[1], color=color)

        plt.legend()
        plt.show()

    if interactive:
        group_size_slider = widgets.IntSlider(value=group_size, min=1, max=100, step=1, description='Group Size')
        skip_slider = widgets.IntSlider(value=skip, min=0, max=100, step=1, description='Skip')

        # Checkbox creation
        checkboxes = []
        for _, name in rewards_list:
            checkboxes.append(widgets.Checkbox(value=True, description=name))

        # Arrange checkboxes in rows of 4
        checkbox_grid = []
        for i in range(0, len(checkboxes), 4):
            checkbox_grid.append(widgets.HBox(checkboxes[i:i+4]))

        def update_plot(group_size, skip, **kwargs):
            current_ignore_list = [name for name, value in kwargs.items() if not value]
            plot_graph(group_size, skip, current_ignore_list)

        checkbox_dict = {checkbox.description: checkbox for checkbox in checkboxes}
        ui = widgets.VBox([group_size_slider, skip_slider, widgets.VBox(checkbox_grid)])

        out = widgets.interactive_output(update_plot, {'group_size': group_size_slider, 'skip': skip_slider, **checkbox_dict})

        display(ui, out)
    else:
        plot_graph(group_size, skip, ignore_list)


In [None]:
draw_rewards_graphs(algorithm_evals, title="Evaluation")