In [None]:
# Standard library
import os
from collections import defaultdict
from collections.abc import Callable
from pathlib import Path
from typing import Any

import matplotlib.pyplot as plt

# Third-party libraries
import numpy as np
import pandas as pd
import seaborn as sns
from joblib import Parallel, delayed
from matplotlib.ticker import PercentFormatter
from networkx import DiGraph
from rich.console import Console
from rich.progress import track

# Local libraries
from ariel.body_phenotypes.robogen_lite.config import (
    NUM_OF_FACES,
    NUM_OF_ROTATIONS,
    NUM_OF_TYPES_OF_MODULES,
)
from ariel.body_phenotypes.robogen_lite.decoders.hi_prob_decoding import (
    HighProbabilityDecoder,
)
from ariel.body_phenotypes.robogen_lite.decoders.visualize_tree import (
    visualize_tree_from_graph,
)

console = Console()

In [None]:
# Global constants
SCRIPT_NAME = "Initial"
CWD = Path.cwd()
DATA = Path(CWD / "__data__" / SCRIPT_NAME)
DATA.mkdir(exist_ok=True)
SEED = 40

# Global functions
console = Console()
RNG = np.random.default_rng(SEED)


In [None]:
def generate_body(num_modules: int = 20) -> DiGraph:
    """Generate random robot with HighProbabilityDecoder."""
    # System parameters

    # "Type" probability space
    type_probability_space = RNG.random(
        size=(num_modules, NUM_OF_TYPES_OF_MODULES),
        dtype=np.float32,
    )

    # "Connection" probability space
    conn_probability_space = RNG.random(
        size=(num_modules, num_modules, NUM_OF_FACES),
        dtype=np.float32,
    )

    # "Rotation" probability space
    rotation_probability_space = RNG.random(
        size=(num_modules, NUM_OF_ROTATIONS),
        dtype=np.float32,
    )

    # Decode the high-probability graph
    hpd = HighProbabilityDecoder(num_modules)
    hpd.probability_matrices_to_graph(
        type_probability_space,
        conn_probability_space,
        rotation_probability_space,
    )

    # Decode the high-probability graph
    hpd = HighProbabilityDecoder(num_modules)
    return hpd.probability_matrices_to_graph(
        type_probability_space,
        conn_probability_space,
        rotation_probability_space,
    )

In [None]:
# def run(
#     robot: CoreModule,
#     *,
#     with_viewer: bool = False,
# ) -> None:
#     """Entry point."""
#     # MuJoCo configuration
#     viz_options = mujoco.MjvOption()  # visualization of various elements

#     # Visualization of the corresponding model or decoration element
#     viz_options.flags[mujoco.mjtVisFlag.mjVIS_TRANSPARENT] = True
#     viz_options.flags[mujoco.mjtVisFlag.mjVIS_ACTUATOR] = True
#     viz_options.flags[mujoco.mjtVisFlag.mjVIS_BODYBVH] = True

#     # MuJoCo basics
#     world = TiltedFlatWorld()

#     # Set random colors for geoms
#     for i in range(len(robot.spec.geoms)):
#         robot.spec.geoms[i].rgba[-1] = 0.5

#     # Spawn the robot at the world
#     world.spawn(robot.spec)

#     # Compile the model
#     model = world.spec.compile()
#     data = mujoco.MjData(model)

#     # Save the model to XML
#     xml = world.spec.to_xml()
#     with (DATA / f"{SCRIPT_NAME}.xml").open("w", encoding="utf-8") as f:
#         f.write(xml)

#     # Number of actuators and DoFs
#     console.log(f"DoF (model.nv): {model.nv}, Actuators (model.nu): {model.nu}")

#     # Reset state and time of simulation
#     mujoco.mj_resetData(model, data)

#     # Render
#     # single_frame_renderer(model, data, steps=10)

#     # View
#     if with_viewer:
#         viewer.launch(model=model, data=data)

In [None]:
# delta = 10000  # just a random nr that bigger than 0.1
# diversity = 0
# nr_of_blocks = []
# old_avg_b = 0
# old_avg_h = 0
# nr_of_hinges = []
# learnability = 0  # maybe later
# graph_list = []
# while delta > 0.00001:
#     graph = generate_body()
#     graph_list.append(graph)
#     graph = dict(graph.nodes)
#     hinges = sum(graph[node]["type"] == "HINGE" for node in graph)
#     blocks = sum(graph[node]["type"] == "BRICK" for node in graph)
#     nr_of_blocks.append(blocks)
#     nr_of_hinges.append(hinges)

#     # skip the first 100
#     if len(nr_of_blocks) < 100:
#         continue

#     # go until we broke
#     new_avg_b = np.average(nr_of_blocks)
#     new_avg_h = np.average(nr_of_hinges)
#     delta = max(abs(old_avg_b - new_avg_b), abs(old_avg_h - new_avg_h))

#     if len(graph_list) >= 100_000:
#         break

#     old_avg_b = new_avg_b
#     old_avg_h = new_avg_h

# print(
#     f"Done!\nEnded with a delta of {max(abs(old_avg_b - new_avg_b), abs(old_avg_h - new_avg_h))}\nThe avg nr of blocks are: {new_avg_b}\nThe avg nr of hinges is: {new_avg_h}\nConcluded after {len(graph_list)} generated robots!"
# )


---

### Functions for analysis on individual

In [None]:
def get_counts(individual: DiGraph) -> dict[str, int]:
    counts: dict[str, int] = {
        "core": sum(
            data["type"] == "CORE" for _, data in individual.nodes(data=True)
        ),
        "brick": sum(
            data["type"] == "BRICK" for _, data in individual.nodes(data=True)
        ),
        "hinge": sum(
            data["type"] == "HINGE" for _, data in individual.nodes(data=True)
        ),
        "none": sum(
            data["type"] == "NONE" for _, data in individual.nodes(data=True)
        ),
        "edges": len(individual.edges()),
    }
    counts["not-none"] = counts["core"] + counts["brick"] + counts["hinge"]
    assert counts["core"] == 1
    return counts

In [None]:
def analyze_mass(individual: DiGraph) -> dict[str, float]:
    from ariel.body_phenotypes.robogen_lite.modules.brick import (
        BRICK_MASS as bm,
    )
    from ariel.body_phenotypes.robogen_lite.modules.core import CORE_MASS as cm
    from ariel.body_phenotypes.robogen_lite.modules.hinge import (
        ROTOR_MASS as rm,
    )
    from ariel.body_phenotypes.robogen_lite.modules.hinge import (
        STATOR_MASS as sm,
    )

    counts = get_counts(individual)
    core_mass = counts["core"] * cm
    brick_tot_mass = counts["brick"] * bm
    hinge_tot_mass = counts["hinge"] * (rm + sm)
    total_mass = core_mass + brick_tot_mass + hinge_tot_mass
    return {"mass": total_mass}

### Parallel running method for individual analysis data

In [None]:
def analyze_individual(analyzers, num_modules=None):
    individual = generate_body(num_modules) if num_modules else generate_body()
    results = {}
    results["population"] = [individual]
    for analyzer in analyzers:
        results.update(analyzer(individual))
    return results


def analyze_population(
    population_size: int,
    *,
    num_modules: int | None = None,
    analyzers: Callable[[DiGraph], dict[str, Any]] | None = None,
    n_jobs: int = -1,
) -> tuple[dict[str, Any], list[DiGraph]]:
    if analyzers is None:
        analyzers = [get_counts, analyze_mass]

    if n_jobs == -1:
        available_cpus = os.cpu_count()
        if available_cpus is not None and available_cpus > 2:
            n_jobs = available_cpus - 2
        elif available_cpus is not None and available_cpus > 1:
            n_jobs = available_cpus - 1
        else:
            n_jobs = 1

    console.log(f"Using {n_jobs} cpus")
    console.log(f"Starting analysis of {population_size} individuals...")

    # Parallel execution with progress bar
    with Parallel(n_jobs=n_jobs) as parallel:
        results = parallel(
            delayed(analyze_individual)(analyzers)
            for _ in track(
                range(population_size), description="Analyzing population"
            )
        )

    properties_dict = defaultdict(list)
    for result in results:
        for key, value in result.items():
            properties_dict[key].append(value)

    console.log("Analysis complete.")
    final_dict = dict(properties_dict)
    population = final_dict.pop("population")
    return final_dict, population

### Visualization methods for the analysis on a population

In [None]:
def create_boxplot_from_dict(
    properties_dict: dict[str, Any], keys: list[str] | None = None
) -> None:
    # If no keys provided, plot all except 'population'
    if keys is None:
        keys = list(properties_dict.keys())
    # Prepare data for seaborn
    data = []
    labels = []
    for key in keys:
        if key in properties_dict:
            data.append(properties_dict[key])
            labels.append(key)
    # Create boxplot
    plt.figure(figsize=(10, 6))
    sns.boxplot(data=data)
    plt.xticks(ticks=range(len(labels)), labels=labels)
    plt.ylabel("Value")
    plt.title("Boxplot of Robot Properties")
    plt.show()

In [None]:
def create_histogram_from_dict(
    properties_dict: dict[str, Any],
    keys: list[str] | None = None,
    bins: int = 30,
) -> None:
    """
    Draw a grouped histogram of the selected keys.
    Title is the key names if there are fewer than 3,
    otherwise "Histogram of Robot Properties".
    """
    if keys is None:
        keys = list(properties_dict.keys())

    # Keep only keys that exist in the dictionary
    valid_keys = [k for k in keys if k in properties_dict]
    data = [properties_dict[k] for k in valid_keys]

    plt.figure(figsize=(10, 6))
    counts, bins, _patches = plt.hist(
        data,
        bins=bins,
        label=valid_keys,
        edgecolor="black",
        linewidth=1.2,
        alpha=0.8,
        histtype="bar",
        rwidth=0.9,
    )
    total = counts.sum()
    plt.gca().yaxis.set_major_formatter(PercentFormatter(xmax=total))
    plt.legend()
    plt.xlabel("Value")
    plt.ylabel("Frequency")

    # Dynamic title
    if len(valid_keys) <= 3:
        plt.title(", ".join(valid_keys))
    else:
        plt.title("Histogram of Robot Properties")

    plt.show()


In [None]:
def statistical_data_from_dict(
    properties_dict: dict[str, Any],
    keys: list[str] | None = None,
    save_file: Path | str | None = None,
) -> pd.DataFrame:
    """
    For each numeric key in properties_dict, return a DataFrame with:
      mean, std, median, count,
      Q1, Q3,
      list of outlier indexes (Tukey's method),
      number of outliers.
    """
    if keys is None:
        keys = [k for k in properties_dict if k != "population"]

    results = []

    for key in keys:
        if key not in properties_dict:
            continue

        arr = np.asarray(properties_dict[key], dtype=float)

        if arr.size == 0:
            continue  # skip empty arrays

        mean = arr.mean()
        std = arr.std(ddof=1)
        median = np.median(arr)
        q1 = np.percentile(arr, 25)
        q3 = np.percentile(arr, 75)
        iqr = q3 - q1

        # Tukey's fences for outliers (same as seaborn)
        lower = q1 - 1.5 * iqr
        upper = q3 + 1.5 * iqr
        outlier_indexes = np.where((arr < lower) | (arr > upper))[0].tolist()

        results.append({
            "key": key,
            "count": len(arr),
            "mean": mean,
            "std": std,
            "median": median,
            "Q1": q1,
            "Q3": q3,
            "num_outliers": len(outlier_indexes),
            "outlier_indexes": outlier_indexes,
        })

    df = pd.DataFrame(results)

    if save_file:
        df.to_csv(save_file)

    return df

---

### Running the experiment

In [None]:
properties_dict, population = analyze_population(100_000)


In [None]:
create_boxplot_from_dict(properties_dict)

create_histogram_from_dict(properties_dict, keys=["brick", "hinge", "none"])
create_histogram_from_dict(properties_dict, keys=["not-none"])
create_histogram_from_dict(properties_dict, keys=["mass"])
create_histogram_from_dict(properties_dict, keys=["edges"])

stats_df = statistical_data_from_dict(properties_dict)

---

### Visualize Outliers

In [None]:
keys = list(properties_dict.keys())


# Compute keys_with_outliers once and use for key_index logic
def get_keys_with_outliers():
    outlier_keys = []
    for key in keys:
        outlier_row = stats_df[stats_df["key"] == key]
        total_outliers = (
            len(outlier_row.iloc[0]["outlier_indexes"])
            if not outlier_row.empty
            else 0
        )
        if total_outliers > 0:
            outlier_keys.append(key)
    return outlier_keys


# Use keys_with_outliers for key_index, but keep all keys for lookup
if "keys_with_outliers" not in globals():
    keys_with_outliers = get_keys_with_outliers()
if "key_index" not in globals():
    key_index = 0  # Tracks current key (in keys_with_outliers)
if "outlier_index" not in globals():
    outlier_index = 0  # Tracks index in outlier list
if "direction" not in globals():
    direction = 1  # 1 for forward, -1 for backward


def select_key(delta=1) -> None:
    """Cycle through keys with outliers only, highlight current, and show total outliers for each key."""
    global key_index, outlier_index, keys_with_outliers
    keys_with_outliers = get_keys_with_outliers()
    outlier_counts = []
    for key in keys_with_outliers:
        outlier_row = stats_df[stats_df["key"] == key]
        total_outliers = (
            len(outlier_row.iloc[0]["outlier_indexes"])
            if not outlier_row.empty
            else 0
        )
        outlier_counts.append(total_outliers)

    if not keys_with_outliers:
        console.print("[red]No keys with outliers found.[/red]")
        return

    key_index = (key_index + delta) % len(keys_with_outliers)
    outlier_index = 0  # Reset outlier index when changing key

    key_lines = []
    for i, (key, total_outliers) in enumerate(
        zip(keys_with_outliers, outlier_counts, strict=False)
    ):
        if i == key_index:
            key_lines.append(
                f"[bold green]> {key} <[/bold green] [yellow]({total_outliers} outliers)[/yellow]",
            )
        else:
            key_lines.append(
                f"{key} [dim]({total_outliers} outliers)[/dim]",
            )
    console.print("\n".join(key_lines))
    # show_current_selection()


def toggle_direction() -> None:
    """Toggle direction for cycling outlier indexes."""
    global direction
    direction = -direction
    console.print(
        f"[cyan]Direction toggled. Now: {'forward' if direction == 1 else 'backward'}[/cyan]"
    )


def get_outlier_list():
    # Use keys_with_outliers for key_index
    key = keys_with_outliers[key_index]
    outlier_row = stats_df[stats_df["key"] == key]
    if not outlier_row.empty:
        return outlier_row.iloc[0]["outlier_indexes"]
    return []


def cycle_through_outlier_index():
    """Cycle through outlier indexes for the selected key."""
    global outlier_index
    outliers = get_outlier_list()
    if outliers:
        outlier_index = (outlier_index + direction) % len(outliers)
    show_current_selection()
    return get_current_outlier_value()


def show_current_selection() -> None:
    key = keys_with_outliers[key_index]
    outliers = get_outlier_list()
    total = len(outliers)
    if outliers:
        idx = outlier_index % total
        progress_str = f"[{idx + 1}/{total}]"
        console.print(
            f"[bold green]Key:[/bold green] {key} | "
            f"[blue]Outlier index:[/blue] {idx} {progress_str} | "
            f"[magenta]Value:[/magenta] {outliers[idx]} | "
            f"[yellow]Total outliers:[/yellow] {total}",
        )
    else:
        console.print(
            f"[bold green]Key:[/bold green] {key} | [red]No outliers[/red]"
        )


def get_current_outlier_value():
    """Return the value of the current outlier for the selected key."""
    outliers = get_outlier_list()
    if outliers:
        return outliers[outlier_index % len(outliers)]
    return None


In [None]:
select_key()

In [None]:
toggle_direction()

In [None]:
visualize_tree_from_graph(population[cycle_through_outlier_index()][0])


The exeperiment shows that the outliers seem to have the same genotype. This leads to the question for how many unique genotypes are actually being created. To run this experiment, a new function should be created that easily parses the given phenotype into a deterministic string. this can than be used in a histogram to count the unique types?

___

### To be implemented

In [None]:
def analyze_branching(individual: DiGraph) -> dict[str, float]:
    """
    Measures the level of branching in the robot's morphology (M1).

    Calculation Method: The specific formula for 'Branching' (M1) is not detailed in this source material,
    but it is noted that this descriptor ranges in value from 0 to 1 [1].
    This descriptor is positively correlated with the 'Number of Limbs' [2].
    """
    return {"branching": 0.0}


def analyze_number_of_limbs(individual: DiGraph) -> dict[str, float]:
    """
    Measures the number of effective limbs attached to the robot (M2).

    Calculation Method: The specific formula for 'Number of Limbs' (M2) is not detailed in this source material,
    but it is noted that this descriptor ranges in value from 0 to 1 [1].
    It is used to assess the tendency for morphologies to have few limbs [2, 3].
    """
    return {"number_of_limbs": 0.0}


def analyze_length_of_limbs(individual: DiGraph) -> dict[str, float]:
    """
    Measures the relative length of the limbs (M3).

    Calculation Method: The measure 'Length of Limbs' (E or M3) is defined by the following equation
    (used, for example, in the S3 fitness function as a penalty) [4]:

    E = { e / e_max, if m >= 3
        { 0, otherwise

    Where 'm' is the total number of modules in the body, 'e' is the number of modules
    which have two of their faces attached to other modules (excluding the core-component),
    and 'e_max' is the maximum amount of modules that a body with 'm' modules could have
    with two attached faces, calculated as m - 2 [4].
    A higher value suggests fewer, longer limbs, as it rewards maximizing the length relative to body size [2, 3].
    This descriptor ranges in value from 0 to 1 [1].
    """
    return {"length_of_limbs": 0.0}


def analyze_coverage(individual: DiGraph) -> dict[str, float]:
    """
    Measures how much of the morphology space is covered (M4).

    Calculation Method: The specific formula for 'Coverage' (M4) is not detailed in this source material,
    but it is noted that this descriptor ranges in value from 0 to 1 [1].
    Morphologies similar to snakes (like those predominant under S1 fitness) tended to have high coverage,
    covering the whole body area [5].
    """
    return {"coverage": 0.0}


def analyze_joints(individual: DiGraph) -> dict[str, float]:
    """
    Measures the number of effective joints in the morphology (M5).

    Calculation Method: This descriptor was reformulated for the study.
    The concept of an effective joint is defined by a joint module that is attached to any other module type [1].
    (Previously, attachment was required to be specifically to the core-component or a structural brick [1, 6].)
    This reformulation was done because robots were often observed developing limbs purely formed by a sequence of joints [1].
    The descriptor ranges in value from 0 to 1 [1].
    """
    return {"joints": 0.0}


def analyze_proportion(individual: DiGraph) -> dict[str, float]:
    """
    Measures the proportionality or balance of the robot's shape (M6).

    Calculation Method: The specific formula for 'Proportion' (M6) is not detailed in this source material,
    but it is noted that this descriptor ranges in value from 0 to 1 [1].
    Proportion was observed to drop drastically for fitness S1, which was dominated by single-limb, disproportional robots [5].
    """
    return {"proportion": 0.0}


def analyze_symmetry(individual: DiGraph) -> dict[str, float]:
    """
    Measures the symmetry of the robot's structure (M7).

    Calculation Method: The specific formula for 'Symmetry' (M7) is not detailed in this source material,
    but it is noted that this descriptor ranges in value from 0 to 1 [1].
    Symmetry tended to be higher when a penalty for long limbs (S3 fitness) was applied [7].
    The results suggest that higher symmetry is correlated with lower average speed [8].
    """
    return {"symmetry": 0.0}


def analyze_size(individual: DiGraph) -> dict[str, float]:
    """
    Measures the overall size of the robot's morphology (M8).

    Calculation Method: The specific formula for 'Size' (M8) is not detailed in this source material,
    but it is noted that this descriptor ranges in value from 0 to 1 [1].
    All behavior-oriented searches tended to explore larger Size, as a large body can more easily produce a large displacement for high speed [5].
    """
    return {"size": 0.0}


def analyze_sensors(individual: DiGraph) -> dict[str, float]:
    """
    Measures the ratio of sensors to available slots in the morphology (M9).

    Calculation Method: This is a new descriptor introduced in the study [1]. It is defined by the equation [9]:

    C = { c / c_max, if c_max > 0
        { 0, otherwise

    Where 'c' is the number of sensors in the morphology, and 'c_max' is the number of free slots in the morphology [9].
    The descriptor ranges in value from 0 to 1 [1].
    """
    return {"sensors": 0.0}