In [1]:
from pathlib import Path
import json
from math import sqrt
from typing import Any, Optional
from copy import deepcopy
import re

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from rich import print as rprint

In [2]:
%matplotlib qt
rng = np.random.default_rng()

In [3]:
proc_data_path_fmt = "data/experiments/analyzed_{exp_t}.json"
data = {}
for exp_t in ("tsp", "vrp", "vrpp", "irp"):
    proc_data_path = Path(proc_data_path_fmt.format(exp_t=exp_t))
    if not proc_data_path.exists():
        continue
    with proc_data_path.open("r") as f:
        data[exp_t] = json.load(f)

In [4]:
criteria = tuple(data.values())[0].keys()
rprint("criteria:\n\t" + "\n\t".join(criteria))

In [5]:
criterium_translation = {
    "destination_n": "Rozmiar mapy (n x n)",
    "population_size": "Rozmiar populacji",
    "map": "Mapa",
}
crossover_translation = {
    "crossover_ndarray": "1-punktowy",
    "crossover_k_loci_ndarray": "k-punktowy",
    "crossover_k_loci_with_inversion_ndarray": "k-punktowy z inwersją",
    "crossover_k_loci_poisson_ndarray": "Poisson",
    "crossover_k_loci_poisson_with_inversion_ndarray": "Poisson z inwersją",
}
mutator_translation = {
    "mutate_swap": "zamienianie",
    "mutate_del": "usuwanie",
    "mutate_insert": "wstawianie",
    "mutate_change": "zmienianie",
}
mutator_translation.update((f"{k}_irp", v) for k, v in tuple(mutator_translation.items()))

In [6]:
# def plot_exp_stats(exp_t: str, criterium: "str", title: str):
#     criterium_data = pd.DataFrame(
#         {
#             int(sqrt(int(dest_n))): deepcopy(vals)
#             for dest_n, vals in data[exp_t][criterium].items()
#         }
#     ).T.sort_index()
#     # to %
#     criterium_data["mean_cost_improvement"] = [
#         100 * x for x in criterium_data["mean_cost_improvement"]
#     ]

#     fig, (ax_t, ax_impr, ax_i) = plt.subplots(nrows=3, sharex=True)
#     for ax, (k, k_tr) in zip([ax_t, ax_impr, ax_i], key_translation.items()):
#         sns.lineplot(data=criterium_data[k], ax=ax, marker="o")
#         ax.set_ylabel(k_tr)
#         ax.grid()
#     plt.suptitle(title)


# def plot_crossovers(exp_t: str) -> pd.DataFrame:
#     data_cross = deepcopy(data[exp_t]["by_crossover"])
#     # to %
#     for cr in data_cross:
#         data_cross[cr]["mean_cost_improvement"] = (
#             100 * data_cross[cr]["mean_cost_improvement"]
#         )
#     df = pd.DataFrame(data_cross)
#     fig, (ax_t, ax_impr, ax_i) = plt.subplots(nrows=3, sharex=True)
#     axs = (ax_t, ax_impr, ax_i)
#     xs = list(data_cross.keys())
#     for ax, k in zip(axs, ("mean_exec_t", "mean_cost_improvement", "mean_iters")):
#         ys = [data_cross[cr][k] for cr in xs]
#         sns.barplot(x=[crossover_translation[x] for x in xs], y=ys, ax=ax)
#         ax.set_ylabel(key_translation[k])
#     plt.suptitle("Operatory krzyżowania")
#     return df


# def latexify_crossover_results(df: pd.DataFrame) -> str:
#     ptn = re.compile(r" +")
#     return ptn.sub(
#         " ",
#         pd.DataFrame(
#             {
#                 crossover_translation[c]: {
#                     key_translation[k]: f"{v:.2f}" for k, v in cv.items()
#                 }
#                 for c, cv in df.to_dict().items()
#             }
#         ).T.to_latex(),
#     )

In [7]:
def plot_exp_stats(exp_t: str, criterium: "str", xlabel: str, title: Optional[str] = None):
    criterium_data = pd.DataFrame(
        {
            int(sqrt(int(dest_n))): deepcopy(vals)
            for dest_n, vals in data[exp_t][criterium].items()
        }
    ).T.sort_index()
    # to %
    criterium_data.cost_improvement = pd.Series(
        data=[{k: 100*v for k, v in d.items()} for d in criterium_data.cost_improvement],
        index=criterium_data.cost_improvement.index
    )
    
    fig, (ax_t, ax_impr, ax_i) = plt.subplots(nrows=3, sharex=True)
    # t - time
    # theta - cost improvement
    # i - no. of iterations
    symbol_map = {
        "exec_t": r"$\overline{t}$",
        "cost_improvement": r"$\overline{\theta}$",
        "iters": r"$\overline{i}$",
    }
    for ax, (k, symbol) in zip([ax_t, ax_impr, ax_i], symbol_map.items()):
        d_as_dict = criterium_data[k].to_dict()
        xs = list(d_as_dict.keys())
        # FIXME quickfix
        squares = xlabel == r"$n^2$"
        if squares:
            xs = [x**2 for x in xs]
        means = np.array([x["mean"] for x in d_as_dict.values()])
        stddevs = np.array([x["stddev"] for x in d_as_dict.values()])
        high = means + stddevs
        low = means - stddevs
        sns.lineplot(x=xs, y=means, ax=ax, marker="o", color="#0000ff", label=r"$\mu$", linestyle="dashed")
        sns.lineplot(x=xs, y=high, ax=ax, marker="o", color="#ff8800", label=r"$\mu \pm \sigma$", linestyle="dashed")
        sns.lineplot(x=xs, y=low, ax=ax, marker="o", color="#ff8800", linestyle="dashed")
        ax.fill_between(x=xs, y1=low, y2=high, color="#ff880088")
        h = ax.set_ylabel(symbol)
        h.set_rotation(0)
        ax.grid()
        ax.set_xlabel(xlabel)
        if squares:
            ax.set_xticks(xs)
    if title is not None:
        plt.suptitle(title)


def plot_crossovers(exp_t: str) -> pd.DataFrame:
    data_cross = deepcopy(data[exp_t]["crossover"])
    # to %
    for cr in data_cross:
        data_cross[cr]["cost_improvement"] = {
            k: 100 * v for k, v in data_cross[cr]["cost_improvement"].items()
        }
    df = pd.DataFrame(data_cross)
    fig, (ax_t, ax_impr, ax_i) = plt.subplots(nrows=3, sharex=True)
    axs = (ax_t, ax_impr, ax_i)
    xs = list(data_cross.keys())
    rprint(f"{xs = }")
    symbol_map = {
        "exec_t": r"$\overline{t}$",
        "cost_improvement": r"$\overline{\theta}$",
        "iters": r"$\overline{i}$",
    }
    for ax, k in zip(axs, ("exec_t", "cost_improvement", "iters")):
        means = [data_cross[cr][k]["mean"] for cr in xs]
        stddevs = [data_cross[cr][k]["stddev"] for cr in xs]
        # sns.barplot(x=[crossover_translation[x] for x in xs], y=ys, ax=ax, ci="sd")
        ax.bar(np.arange(len(xs)), means, yerr=stddevs ,capsize=10)
        ax.set_xticklabels(["", *(crossover_translation[c] for c in xs)])
        ax.grid()
        h = ax.set_ylabel(symbol_map[k])
        h.set_rotation(0)
    plt.suptitle("Operatory krzyżowania")
    return df


def latexify_crossover_results(df: pd.DataFrame) -> str:
    ptn = re.compile(r" +")
    return ptn.sub(
        " ",
        pd.DataFrame(
            {
                crossover_translation[c]: {
                    key_translation[k]: f"{v:.2f}" for k, v in cv.items()
                }
                for c, cv in df.to_dict().items()
            }
        ).T.to_latex(),
    )

## Ogólne statystyki

### TSP

In [8]:
plot_exp_stats("tsp", "destination_n", xlabel=r"$n^2$", title="Rozmiar mapy")

In [9]:
plot_exp_stats("tsp", "pop_size", xlabel="$p$", title="Rozmiar populacji")

In [10]:
df_tsp = plot_crossovers("tsp")
# print(latexify_crossover_results(df_tsp))

  ax.set_xticklabels(["", *(crossover_translation[c] for c in xs)])


### VRP

In [11]:
plot_exp_stats("vrp", "destination_n", xlabel=r"$n^2$", title="Rozmiar mapy")

In [12]:
plot_exp_stats("vrp", "pop_size", xlabel="$p$", title="Rozmiar populacji")

In [13]:
df_vrp = plot_crossovers("vrp")
# print(latexify_crossover_results(df_vrp))

  ax.set_xticklabels(["", *(crossover_translation[c] for c in xs)])


### VRPP

In [14]:
plot_exp_stats("vrpp", "destination_n", xlabel=r"$n^2$", title="Rozmiar mapy")

In [15]:
plot_exp_stats("vrpp", "pop_size", xlabel="$p$", title="Rozmiar populacji")

In [16]:
df_vrpp = plot_crossovers("vrpp")
# print(latexify_crossover_results(df_vrpp))

  ax.set_xticklabels(["", *(crossover_translation[c] for c in xs)])


### IRP

In [17]:
plot_exp_stats("irp", "destination_n", xlabel=r"$n^2$", title="Rozmiar mapy")

In [18]:
plot_exp_stats("irp", "pop_size", xlabel="$p$", title="Rozmiar populacji")

In [19]:
df_irp = plot_crossovers("irp")

  ax.set_xticklabels(["", *(crossover_translation[c] for c in xs)])


## Konkretne eksperymenty

In [28]:
rand_exp_run_fmt = "data/experiments/runs/{exp_t}/"


def get_rand_exp_data(exp_t: str) -> dict[str, Any]:
    file_paths = {
        p
        for p in Path(rand_exp_run_fmt.format(exp_t=exp_t)).iterdir()
        if p.is_file() and p.parts and p.parts[-1].endswith(".json")
    }
    while True:
        f_path = rng.choice(tuple(file_paths))
        with f_path.open("r") as f:
            run_data = json.load(f)
            if "exception" in run_data and run_data["exception"]:
                file_paths.remove(f_path)
                continue
            break
    with Path(run_data["map_path"]).open("r") as f:
        map_data = json.load(f)
    with Path(run_data["experiment_config_path"]).open("r") as f:
        conf_data = json.load(f)
    return {
        "run": run_data,
        "map": map_data,
        "conf": conf_data,
    }


def plot_exp_data(data: dict[str, Any]) -> pd.Series:
    costs = data["run"]["costs"]
    xs_best = np.array(costs["current_best"])
    std = np.array(costs["std_dev"])
    xs_mean = np.array(costs["mean"])
    xs_mean_low = xs_mean - std
    xs_mean_high = xs_mean + std
    fig, (ax_best, ax_mean) = plt.subplots(nrows=2)
    sns.lineplot(data=xs_best, ax=ax_best, color="#0000ff", marker="o")
    sns.lineplot(data=xs_mean, ax=ax_mean, color="#ffff00", marker="o", label=r"$\mu$")
    sns.lineplot(
        data=xs_mean_high,
        ax=ax_mean,
        color="#00ff00",
        marker="o",
        label=r"$\mu \pm \sigma$",
    )
    sns.lineplot(data=xs_mean_low, ax=ax_mean, color="#00ff00", marker="o")
    ax_mean.lines[0].set_linestyle("--")
    ax_mean.lines[1].set_linestyle("--")
    ax_mean.lines[2].set_linestyle("--")
    ax_mean.fill_between(
        x=np.arange(0, len(xs_mean)), y1=xs_mean_low, y2=xs_mean_high, color="#00ff0088"
    )
    # fi bar - mean cost in population
    # fi underbar - lowest cost in population at given iteration
    h1 = ax_mean.set_ylabel(r"$\overline{\phi}$")
    h2 = ax_best.set_ylabel(r"$\phi_{min}$")
    h1.set_rotation(0)
    h2.set_rotation(0)
    plt.suptitle("Zależność kosztów od iteracji")
    conf_data = data["conf"]
    dist_mx = conf_data["dist_mx"]
    fig, ax = plt.subplots()
    sns.heatmap(data=np.array(dist_mx), cmap="magma")
    plt.title("Macierz odległości")
    n = len(dist_mx)
    p_map = {
        mutator_translation[mutator_name]: p
        for mutator_name, p in conf_data["mut_ps"].items()
    }
    ss = pd.Series(
        {
            "rozmiar populacji": str(len(conf_data["population"])),
            "rozmiar mapy": f"{n}x{n}",
            **{f"prawdopodobieństwo mutacji - {m}": p for m, p in p_map.items()},
            **(
                {
                    "prawdopodobieństwo inwersji": conf_data["crossover_kwargs"][
                        "inversion_p"
                    ]
                }
                if "inversion_p" in conf_data["crossover_kwargs"]
                else {}
            ),
        }
    )
    return ss

### TSP

In [40]:
data_rnd_tsp = get_rand_exp_data("tsp")
ss = plot_exp_data(data_rnd_tsp)
rprint(ss)

### VRP

In [94]:
data_rnd_vrp = get_rand_exp_data("vrp")
plot_exp_data(data_rnd_vrp)

rozmiar populacji                          80
rozmiar mapy                            49x49
prawdopodobieństwo mutacji - zamiana      0.3
prawdopodobieństwo inwersji               0.1
dtype: object

### VRPP

In [93]:
data_rnd_vrpp = get_rand_exp_data("vrpp")
plot_exp_data(data_rnd_vrpp)

rozmiar populacji                          100
rozmiar mapy                             25x25
prawdopodobieństwo mutacji - zamiana       0.3
prawdopodobieństwo mutacji - insercja      0.2
prawdopodobieństwo mutacji - delecja       0.3
prawdopodobieństwo inwersji                0.2
dtype: object

In [45]:
data_rnd_irp = get_rand_exp_data("irp")
plot_exp_data(data_rnd_irp)

rozmiar populacji                            10
rozmiar mapy                                9x9
prawdopodobieństwo mutacji - zamienianie    0.2
prawdopodobieństwo mutacji - wstawianie     0.1
prawdopodobieństwo mutacji - usuwanie       0.2
dtype: object