# Train Summary Plots/Anaylisis (NEXT)

## Experiment selection

In [None]:
# Common imports.
from pathlib import Path
from collections import defaultdict

%matplotlib widget
import base

import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import ipywidgets

import dfaas_env
import dfaas_utils
import dfaas_upperbound

Select one or more experiments to view.

**WARNING**: If multiple experiments are selected, they must share the same number of training iterations!

In [None]:
experiments = base.get_experiments("/home/emanuele/marl-dfaas/results")

# Show the name as the portion of the path after "results",
# but anyway the values are full Path objects.
exp_select = ipywidgets.SelectMultiple(
    options=experiments,
    index=[0],
    description="Experiment(s):",
    style={"description_width": "initial"},
    layout=ipywidgets.Layout(width="70%"),
)

ipywidgets.AppLayout(center=exp_select)

## Experiment loading

This section must be run before any of the following sections to load the selected experiments.

In [None]:
exps_dir = exp_select.value
assert len(exps_dir) > 0, "must select at least one experiment"

# Preload the data (result.json file) for all selected experiments.
raw_exp_data = {}
for exp_dir in exps_dir:
    raw_exp_data[exp_dir] = dfaas_utils.parse_result_file(exp_dir / "result.json")

# Create the reference environment based on DFaaS.
for exp_dir in exps_dir:
    env = base.get_env(exp_dir)
    if env.__class__ == dfaas_env.DFaaS:
        break

# At least one experiment must be of type DFaaS. SingleDFaaS is used only as
# reference.
assert env.__class__ == dfaas_env.DFaaS, f"{env.__class__}"

print("Selected experiments:")
for exp_dir in exps_dir:
    print(f"  - {exp_dir.name}")

## Reward

In [None]:
# Common functions for average reward data.


def average_reward_step(iter, agent):
    """Returns the average reward per step for the given iteration and agent."""
    episodes = iter["env_runners"]["episodes_this_iter"]

    tmp = np.empty(episodes, dtype=np.float32)
    for epi_idx in range(episodes):
        tmp[epi_idx] = np.average(iter["env_runners"]["hist_stats"]["reward"][epi_idx][agent])

    return np.average(tmp)


def get_reward_data(iter_data):
    final_data = {}

    for exp_dir, iters in iter_data.items():
        env = base.get_env(exp_dir)
        if env.__class__ == dfaas_upperbound.SingleDFaaS:
            # The upperbound is not used for the reward plots.
            continue

        data = {}
        agents = env.agents

        data["agents"] = agents
        data["iterations"] = len(iters)
        data["episodes"] = iters[0]["env_runners"]["episodes_this_iter"]

        reward_total_avg = {}  # Average total reward per episode.
        reward_step_avg = {}  # Average reward per step.

        reward_total_avg["all"] = np.empty(data["iterations"], dtype=np.float32)
        for agent in data["agents"]:
            reward_total_avg[agent] = np.empty(data["iterations"], dtype=np.float32)
            reward_step_avg[agent] = np.empty(data["iterations"], dtype=np.float32)

        # For each iteration, get the average reward, since there are multiple
        # episodes played in each iteration.
        for iter in iters:
            # Index starts from one in log files, but Python list from zero.
            iter_idx = iter["training_iteration"] - 1

            reward_total_avg["all"][iter_idx] = np.average(iter["env_runners"]["hist_stats"]["episode_reward"])

            for agent in data["agents"]:
                reward_total_avg[agent][iter_idx] = np.average(
                    iter["env_runners"]["hist_stats"][f"policy_policy_{agent}_reward"]
                )
                reward_step_avg[agent][iter_idx] = average_reward_step(iter, agent)

        data["reward_total_avg"] = reward_total_avg
        data["reward_step_avg"] = reward_step_avg

        final_data[exp_dir] = data

    return final_data


reward_data = get_reward_data(raw_exp_data)

### Average per episode

#### All agents

In [None]:
def make_reward_plot_all_agents():
    plt.close(fig="reward_all")
    fig = plt.figure(num="reward_all", layout="constrained")
    fig.canvas.header_visible = False
    ax = fig.subplots()

    # ax.axhline(
    #   y=env.max_steps * len(env.agents), color="red", linestyle="--", label="Limit"
    # )
    for exp_dir in exps_dir:
        if base.get_env(exp_dir).__class__ == dfaas_upperbound.SingleDFaaS:
            # The upperbound is not used for the reward plots.
            continue
        ax.plot(reward_data[exp_dir]["reward_total_avg"]["all"], label=exp_dir.name)

    ax.set_title("Average reward per episode (all agents)")

    ax.set_ylabel("Reward per episode")
    # ax.set_ylim(bottom=0, top=env.max_steps * len(env.agents) * 1.05)

    ax.set_xlabel("Iteration")
    ax.xaxis.set_major_locator(ticker.MultipleLocator(50))  # Show x-axis ticks every 50 iterations.

    ax.legend(loc="lower center")
    ax.grid(axis="both")
    ax.set_axisbelow(True)  # By default the axis is over the content.


make_reward_plot_all_agents()

#### Single agents

In [None]:
def make_reward_plot_single_agents():
    for agent in env.agents:
        plt.close(fig=f"reward_{agent}")
        fig = plt.figure(num=f"reward_{agent}", layout="constrained")
        fig.canvas.header_visible = False
        ax = fig.subplots()

        # ax.axhline(y=env.max_steps, color="red", linestyle="--", label="Limit")
        for exp_dir in exps_dir:
            if base.get_env(exp_dir).__class__ == dfaas_upperbound.SingleDFaaS:
                # The upperbound is not used for the reward plots.
                continue
            ax.plot(reward_data[exp_dir]["reward_total_avg"][agent], label=exp_dir.name)

        ax.set_title(f"Average reward per episode ({agent = })")

        ax.set_ylabel("Reward per episode")
        # ax.set_ylim(bottom=0, top=env.max_steps * 1.05)

        ax.set_xlabel("Iteration")
        ax.xaxis.set_major_locator(ticker.MultipleLocator(50))  # Show x-axis ticks every 50 iterations.

        ax.legend(loc="lower center")
        ax.grid(axis="both")
        ax.set_axisbelow(True)  # By default the axis is over the content.


make_reward_plot_single_agents()

## Processed requests

In [None]:
# Common functions for processed requests.


def get_processed_requests_data_upperbound(iters):
    iterations = len(iters)
    data = {"input_reqs": np.empty(iterations), "processed_reqs": np.empty(iterations)}

    # Scan each iteration.
    for iter_idx in range(iterations):
        iter_data = iters[iter_idx]["env_runners"]

        episodes = iter_data["episodes_this_iter"]
        input_reqs_iter = np.empty(episodes)
        processed_local_iter = np.empty(episodes)

        # Iterate the episodes.
        for epi_idx in range(episodes):
            input_reqs = np.sum(iter_data["hist_stats"]["observation_input_requests"][epi_idx])
            processed_local = np.sum(iter_data["hist_stats"]["processed_local"][epi_idx])

            input_reqs_iter[epi_idx] = input_reqs
            processed_local_iter[epi_idx] = processed_local

        # Update iteration data (average the episodes values).
        data["input_reqs"][iter_idx] = np.average(input_reqs_iter)
        data["processed_reqs"][iter_idx] = np.average(processed_local_iter)

    return data


def get_processed_requests_data_episode(iter_data, epi_idx, env):
    episodes = iter_data["env_runners"]["episodes_this_iter"]
    iter_data = iter_data["env_runners"]["hist_stats"]

    data = defaultdict(lambda: defaultdict())
    (
        data["all"]["processed_reqs"],
        data["all"]["input_reqs"],
        data["all"]["processed_forwarded_reqs"],
    ) = (0, 0, 0)
    for agent in env.agents:
        processed_reqs = np.sum(iter_data["processed_local"][epi_idx][agent])
        input_reqs = np.sum(iter_data["observation_input_requests"][epi_idx][agent])

        try:
            processed_forward = np.sum(iter_data["processed_local_forward"][epi_idx][agent])
        except (KeyError, IndexError):
            # May be missing if the agent did not receive any forwarded request.
            processed_forward = 0

        data[agent]["processed_reqs"] = processed_reqs
        data[agent]["input_reqs"] = input_reqs
        data[agent]["processed_forwarded_reqs"] = processed_forward
        data["all"]["processed_reqs"] += processed_reqs
        data["all"]["input_reqs"] += input_reqs
        data["all"]["processed_forwarded_reqs"] += processed_forward

    return data


def get_processed_requests_data(iter_data):
    #  is a disctionary with three levels of depth (experiment, iteration, metrics).
    final_data = defaultdict(lambda: defaultdict(lambda: defaultdict()))

    for exp_dir, iters in iter_data.items():
        env = base.get_env(exp_dir)
        if env.__class__ == dfaas_upperbound.SingleDFaaS:
            # The upperbound data extraction is different from the normal flow.
            final_data[exp_dir] = get_processed_requests_data_upperbound(iters)
            continue

        agents = env.agents
        iterations = len(iters)

        # Create the portion of the dictionary for this experiment that
        # contains the average values of the metrics for each iteration.
        for agent in ["all"] + env.agents:
            for key in ["input_reqs", "processed_reqs", "processed_forwarded_reqs"]:
                final_data[exp_dir][agent][key] = np.empty(iterations)

        # For each iteration, calculate the metrics for each episode played,
        # then average the values for the number of episodes of that iteration.
        for iter_idx in range(iterations):
            episodes = iters[iter_idx]["env_runners"]["episodes_this_iter"]

            # Create the data dictionary that contains the metrics for each
            # episode in this iteration.
            data = defaultdict(lambda: defaultdict())
            for agent in ["all"] + env.agents:
                for key in ["input_reqs", "processed_reqs", "processed_forwarded_reqs"]:
                    data[agent][key] = np.empty(episodes, dtype=np.int32)

            # Iterate the episodes.
            for epi_idx in range(episodes):
                data_epi = get_processed_requests_data_episode(iters[iter_idx], epi_idx, env)

                for agent in ["all"] + env.agents:
                    for key in [
                        "input_reqs",
                        "processed_reqs",
                        "processed_forwarded_reqs",
                    ]:
                        data[agent][key][epi_idx] = data_epi[agent][key]

            # Update iteration data.
            for agent in ["all"] + env.agents:
                for key in ["input_reqs", "processed_reqs", "processed_forwarded_reqs"]:
                    final_data[exp_dir][agent][key][iter_idx] = np.average(data[agent][key])

    return final_data


processed_reqs_data = get_processed_requests_data(raw_exp_data)

### Average processed requests per episode

#### All agents

In [None]:
def make_avg_processed_reqs_plot_all_agents():
    plt.close(fig="avg_processed_reqs_all_agents")
    fig = plt.figure(num="avg_processed_reqs_all_agents", layout="constrained")
    fig.canvas.header_visible = False
    ax = fig.subplots()

    data = processed_reqs_data  # Alias for better readability.
    for exp_dir in exps_dir:
        if base.get_env(exp_dir).__class__ == dfaas_upperbound.SingleDFaaS:
            # This is the upperbound case.
            ratios = data[exp_dir]["processed_reqs"] / data[exp_dir]["input_reqs"]
            ax.plot(ratios, label=f"Upperbound {exp_dir.name}")
            continue

        ratios = data[exp_dir]["all"]["processed_reqs"] / data[exp_dir]["all"]["input_reqs"]
        ax.plot(ratios, label=exp_dir.name)

    ax.set_title("Average processed requests per episode (all agents)")

    ax.set_ylabel("Requests")
    ax.yaxis.set_major_formatter(ticker.PercentFormatter(1.0))
    ax.yaxis.set_major_locator(ticker.MultipleLocator(0.1))  # Show 10% ticks.
    ax.set_ylim(0, 1)

    ax.set_xlabel("Iteration")

    ax.legend(loc="lower center")
    ax.grid(axis="both")
    ax.set_axisbelow(True)  # By default the axis is over the content.


make_avg_processed_reqs_plot_all_agents()

#### Single agents

In [None]:
def make_avg_processed_reqs_plot_single_agents():
    for agent in env.agents:
        plt.close(fig=f"avg_processed_reqs_{agent}")
        fig = plt.figure(num=f"avg_processed_reqs_{agent}", layout="constrained")
        fig.canvas.header_visible = False
        ax = fig.subplots()

        data = processed_reqs_data  # Alias for better readability.
        for exp_dir in exps_dir:
            if base.get_env(exp_dir).__class__ == dfaas_upperbound.SingleDFaaS:
                # The upperbound is valid only for global view (all agents).
                continue

            ratios = data[exp_dir][agent]["processed_reqs"] / data[exp_dir][agent]["input_reqs"]
            ax.plot(ratios, label=exp_dir.name)

        ax.set_title(f"Average processed requests per episode ({agent = })")

        ax.set_ylabel("Requests")
        ax.yaxis.set_major_formatter(ticker.PercentFormatter(1.0))
        ax.yaxis.set_major_locator(ticker.MultipleLocator(0.1))  # Show 10% ticks.
        ax.set_ylim(0, 1)

        ax.set_xlabel("Iteration")

        ax.legend()
        ax.grid(axis="both")
        ax.set_axisbelow(True)  # By default the axis is over the content.


make_avg_processed_reqs_plot_single_agents()

### Average processed forwarded requests per episode

#### All agents

In [None]:
def make_avg_processed_fw_reqs_plot_all_agents():
    plt.close(fig="avg_processed_fw_reqs")
    fig = plt.figure(num="avg_processed_fw_reqs", layout="constrained")
    fig.canvas.header_visible = False
    ax = fig.subplots()

    data = processed_reqs_data  # Alias for better readability.
    for exp_dir in exps_dir:
        if base.get_env(exp_dir).__class__ == dfaas_upperbound.SingleDFaaS:
            # The upperbound is valid only for total processed requests.
            continue

        ratios_forwarded = data[exp_dir]["all"]["processed_forwarded_reqs"] / data[exp_dir]["all"]["processed_reqs"]
        ax.plot(ratios_forwarded, label=exp_dir.name)

    ax.set_title("Average processed forwarded requests per episode (all agents)")

    ax.set_ylabel("Requests")
    ax.yaxis.set_major_formatter(ticker.PercentFormatter(1.0))
    ax.yaxis.set_major_locator(ticker.MultipleLocator(0.1))  # Show 10% ticks.
    ax.set_ylim(0, 1)

    ax.set_xlabel("Iteration")

    ax.legend()
    ax.grid(axis="both")
    ax.set_axisbelow(True)  # By default the axis is over the content.


make_avg_processed_fw_reqs_plot_all_agents()

#### Single agents

In [None]:
def make_avg_processed_fw_reqs_plot_single_agents():
    for agent in env.agents:
        plt.close(fig=f"avg_processed_fw_reqs_{agent}")
        fig = plt.figure(num=f"avg_processed_fw_reqs_{agent}", layout="constrained")
        fig.canvas.header_visible = False
        ax = fig.subplots()

        data = processed_reqs_data  # Alias for better readability.
        for exp_dir in exps_dir:
            if base.get_env(exp_dir).__class__ == dfaas_upperbound.SingleDFaaS:
                # The upperbound is valid only for total processed requests.
                continue

            ratios_forwarded = data[exp_dir][agent]["processed_forwarded_reqs"] / data[exp_dir][agent]["processed_reqs"]
            ax.plot(ratios_forwarded, label=exp_dir.name)

        ax.set_title(f"Average processed forwarded requests per episode ({agent = })")

        ax.set_ylabel("Requests")
        ax.yaxis.set_major_formatter(ticker.PercentFormatter(1.0))
        ax.yaxis.set_major_locator(ticker.MultipleLocator(0.1))  # Show 10% ticks.
        ax.set_ylim(0, 1)

        ax.set_xlabel("Iteration")

        ax.legend()
        ax.grid(axis="both")
        ax.set_axisbelow(True)  # By default the axis is over the content.


make_avg_processed_fw_reqs_plot_single_agents()

## Replay Buffer (only for SAC)

This section shows some metrics related to the replay buffer. It is specific to SAC experiments only, so other experiments will be ignored.

In [None]:
from ray.rllib.algorithms.sac import SAC


def filter_sac(raw_exp_dir):
    """Returns a copy of raw_exp_dir with only experiments trained with the SAC
    algorithm. It may be empty!"""
    sac_raw_exp_dir = {}
    for exp_dir in raw_exp_data:
        config = base.get_exp_config(exp_dir)
        if config.algo_class is SAC:
            sac_raw_exp_dir[exp_dir] = raw_exp_data[exp_dir]

    return sac_raw_exp_dir


sac_raw_exp_data = filter_sac(raw_exp_data)

In [None]:
# Data extraction for replay buffer.


def get_replay_buffer_data(raw_exp_data):
    # Returned dict with three levels of depth (experiment, metrics, iterations).
    final_data = defaultdict(lambda: defaultdict(lambda: defaultdict()))

    for exp_dir, iters in raw_exp_data.items():
        iterations = len(iters)
        config = base.get_exp_config(exp_dir)
        policies = list(config.policies.keys())

        final_data[exp_dir]["policies"] = policies
        final_data[exp_dir]["capacity_per_policy"] = iters[0]["info"]["replay_buffer"]["capacity_per_policy"]
        for policy in policies:
            final_data[exp_dir]["num_entries"][policy] = np.empty(iterations, dtype=np.int64)
            final_data[exp_dir]["sampled"][policy] = np.empty(iterations, dtype=np.int64)

        for iter_idx in range(iterations):
            for policy in policies:
                num_entries = iters[iter_idx]["info"]["replay_buffer"][f"policy_{policy}"]["num_entries"]
                sampled = iters[iter_idx]["info"]["replay_buffer"][f"policy_{policy}"]["sampled_count"]

                final_data[exp_dir]["num_entries"][policy][iter_idx] = num_entries
                final_data[exp_dir]["sampled"][policy][iter_idx] = sampled

    return final_data


if len(sac_raw_exp_data) > 0:
    replay_buffer_data = get_replay_buffer_data(sac_raw_exp_data)
else:
    print("Skipping section since there are not experiments using SAC!")

### Plots

In [None]:
def make_replay_buffer_plot():
    for exp_dir in sac_raw_exp_data.keys():
        plt.close(fig=f"replay_buffer_{exp_dir.name}")
        fig = plt.figure(num=f"replay_buffer_{exp_dir.name}", layout="constrained")
        fig.canvas.header_visible = False

        ax = fig.subplots()
        # Show the plof only for one policy, since all policies have the same
        # replay buffer capacity and behaviour.
        policy = replay_buffer_data[exp_dir]["policies"][0]

        ax.plot(replay_buffer_data[exp_dir]["num_entries"][policy], label="Stored entries")
        ax.plot(replay_buffer_data[exp_dir]["sampled"][policy], label="Sampled entries")

        ax.set_title(f"Replay buffer status per iteration for one policy and\nexperiment {exp_dir.name!r}")

        ax.set_ylabel("Entries")
        ax.set_xlabel("Iteration")

        ax.legend()
        ax.grid(axis="both")
        ax.set_axisbelow(True)  # By default the axis is over the content.


if len(sac_raw_exp_data) > 0:
    make_replay_buffer_plot()

In [None]:
# Common functions for processed requests.


def get_processed_requests_data_upperbound(iters):
    iterations = len(iters)
    data = {"input_reqs": np.empty(iterations), "processed_reqs": np.empty(iterations)}

    # Scan each iteration.
    for iter_idx in range(iterations):
        iter_data = iters[iter_idx]["env_runners"]

        episodes = iter_data["episodes_this_iter"]
        input_reqs_iter = np.empty(episodes)
        processed_local_iter = np.empty(episodes)

        # Iterate the episodes.
        for epi_idx in range(episodes):
            input_reqs = np.sum(iter_data["hist_stats"]["observation_input_requests"][epi_idx])
            processed_local = np.sum(iter_data["hist_stats"]["processed_local"][epi_idx])

            input_reqs_iter[epi_idx] = input_reqs
            processed_local_iter[epi_idx] = processed_local

        # Update iteration data (average the episodes values).
        data["input_reqs"][iter_idx] = np.average(input_reqs_iter)
        data["processed_reqs"][iter_idx] = np.average(processed_local_iter)

    return data


def get_processed_requests_data_episode(iter_data, epi_idx, env):
    episodes = iter_data["env_runners"]["episodes_this_iter"]
    iter_data = iter_data["env_runners"]["hist_stats"]

    data = defaultdict(lambda: defaultdict())
    (
        data["all"]["processed_reqs"],
        data["all"]["input_reqs"],
        data["all"]["processed_forwarded_reqs"],
    ) = (0, 0, 0)
    for agent in env.agents:
        processed_reqs = np.sum(iter_data["processed_local"][epi_idx][agent])
        input_reqs = np.sum(iter_data["observation_input_requests"][epi_idx][agent])

        try:
            processed_forward = np.sum(iter_data["processed_local_forward"][epi_idx][agent])
        except (KeyError, IndexError):
            # May be missing if the agent did not receive any forwarded request.
            processed_forward = 0

        data[agent]["processed_reqs"] = processed_reqs
        data[agent]["input_reqs"] = input_reqs
        data[agent]["processed_forwarded_reqs"] = processed_forward
        data["all"]["processed_reqs"] += processed_reqs
        data["all"]["input_reqs"] += input_reqs
        data["all"]["processed_forwarded_reqs"] += processed_forward

    return data


def get_processed_requests_data(iter_data):
    #  is a disctionary with three levels of depth (experiment, iteration, metrics).
    final_data = defaultdict(lambda: defaultdict(lambda: defaultdict()))

    for exp_dir, iters in iter_data.items():
        env = base.get_env(exp_dir)
        if env.__class__ == dfaas_upperbound.SingleDFaaS:
            # The upperbound data extraction is different from the normal flow.
            final_data[exp_dir] = get_processed_requests_data_upperbound(iters)
            continue

        agents = env.agents
        iterations = len(iters)

        # Create the portion of the dictionary for this experiment that
        # contains the average values of the metrics for each iteration.
        for agent in ["all"] + env.agents:
            for key in ["input_reqs", "processed_reqs", "processed_forwarded_reqs"]:
                final_data[exp_dir][agent][key] = np.empty(iterations)

        # For each iteration, calculate the metrics for each episode played,
        # then average the values for the number of episodes of that iteration.
        for iter_idx in range(iterations):
            episodes = iters[iter_idx]["env_runners"]["episodes_this_iter"]

            # Create the data dictionary that contains the metrics for each
            # episode in this iteration.
            data = defaultdict(lambda: defaultdict())
            for agent in ["all"] + env.agents:
                for key in ["input_reqs", "processed_reqs", "processed_forwarded_reqs"]:
                    data[agent][key] = np.empty(episodes, dtype=np.int32)

            # Iterate the episodes.
            for epi_idx in range(episodes):
                data_epi = get_processed_requests_data_episode(iters[iter_idx], epi_idx, env)

                for agent in ["all"] + env.agents:
                    for key in [
                        "input_reqs",
                        "processed_reqs",
                        "processed_forwarded_reqs",
                    ]:
                        data[agent][key][epi_idx] = data_epi[agent][key]

            # Update iteration data.
            for agent in ["all"] + env.agents:
                for key in ["input_reqs", "processed_reqs", "processed_forwarded_reqs"]:
                    final_data[exp_dir][agent][key][iter_idx] = np.average(data[agent][key])

    return final_data


processed_reqs_data = get_processed_requests_data(raw_exp_data)