In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import os
import json
import pickle

from dataclasses import asdict
from itertools import product
from typing import Literal

import pandas as pd

from cosmos import Simulator, ModelAnalyzer
from cosmos.simulator import Config

In [3]:
SAVE_ROOT = "../test/sim"

In [4]:
def test_config(config: Config, name: str) -> None:

    save_dir = os.path.join(SAVE_ROOT, name)
    os.makedirs(save_dir, exist_ok=True)

    res_path = os.path.join(save_dir, "compare.pkl")

    if os.path.exists(res_path):
        print(f"Results for {name} already exist. Skipping simulation.")
        return

    with open(f"{save_dir}/config.pkl", "wb") as f:
        pickle.dump(config, f)
    with open(f"{save_dir}/readable_config.json", "w") as f:
        json.dump(asdict(config), f)  # For human readability

    simulator = Simulator(config=config)
    simulator.simulate()
    simulator.build_cosmos(os.path.join(save_dir, "model"))

    for i in simulator.model.all_group_new_index:
        print(f"Running Cosmos on combined group {i}...", end="\r")
        simulator.model.run_cosmos(group_new_idx=i, no_s_hat=False)

    analyzer = ModelAnalyzer(
        model=simulator.model,
        data_path=os.path.join(save_dir, "data"),
    )

    with open(os.path.join(save_dir, "simulator.pkl"), "wb") as f:
        pickle.dump(simulator, f)

    with open(os.path.join(save_dir, "analyzer.pkl"), "wb") as f:
        pickle.dump(analyzer, f)

    df_best_models = analyzer.best_models
    df_position = simulator.df_position

    true_models = df_position.set_index("position")["model"]
    selected_models = df_best_models.set_index("position")["model_rank1"]

    compare_df = pd.DataFrame(
        {
            "true_model": true_models,
            "selected_model": selected_models,
        }
    )

    compare_df.to_pickle(res_path)

In [5]:
def get_noisy_config(
    n_pos: int,
    magnitude: float = 1.0,
    param: Literal["all", "x", "y", "theta"] = "all",
    seed: int = 1000,
) -> Config:

    config = Simulator.default_config()
    config.simulation.n_position = n_pos
    config.simulation.seed = seed % (2**32)

    match param:
        case "all":
            config.observation.sigma_x *= magnitude
            config.observation.sigma_y *= magnitude
            config.observation.sigma_theta *= magnitude
        case "x":
            config.observation.sigma_x *= magnitude
        case "y":
            config.observation.sigma_y *= magnitude
        case "theta":
            config.observation.sigma_theta *= magnitude

    return config

In [6]:
N_POS = 100

for idx, (i, param) in enumerate(product(range(5), ["all", "x", "y", "theta"])):
    # This could be very slow, since it uses one core only.
    # See `04_simulate_noise.py` for a parallel implementation.
    # Results will be the same.

    if i == 0 and param != "all":
        continue  # i == 0 is equivalent to default noise, only need to run once

    magnitude = 2**i
    name = f"noise_{magnitude:d}_{param}"

    if os.path.exists(os.path.join(SAVE_ROOT, name, "compare.pkl")):
        print(f"Skipping {name} as results already exist.")
        continue

    config = get_noisy_config(
        n_pos=N_POS, magnitude=magnitude, param=param, seed=(idx + 5956) * 5243
    )
    print(f"Testing config: {name} with magnitude {magnitude} for parameter {param}")
    test_config(config=config, name=name)

Skipping noise_1_all as results already exist.
Skipping noise_2_all as results already exist.
Skipping noise_2_x as results already exist.
Skipping noise_2_y as results already exist.
Skipping noise_2_theta as results already exist.
Skipping noise_4_all as results already exist.
Skipping noise_4_x as results already exist.
Skipping noise_4_y as results already exist.
Skipping noise_4_theta as results already exist.
Skipping noise_8_all as results already exist.
Skipping noise_8_x as results already exist.
Skipping noise_8_y as results already exist.
Skipping noise_8_theta as results already exist.
Skipping noise_16_all as results already exist.
Skipping noise_16_x as results already exist.
Skipping noise_16_y as results already exist.
Skipping noise_16_theta as results already exist.


In [7]:
for idx, (log_magnitude, n_var_per_pos) in enumerate(product(range(5), [10, 15])):

    magnitude = 2**log_magnitude

    # n_var_per_pos == 20 is the default, so we only run for 10 and 15
    name = f"n_var_per_pos_{n_var_per_pos:d}_noise_{magnitude:d}"

    if os.path.exists(os.path.join(SAVE_ROOT, name, "compare.pkl")):
        print(f"Skipping {name} as results already exist.")
        continue

    seed = (idx + 5109) * 5616

    config = get_noisy_config(n_pos=N_POS, magnitude=magnitude, param="all", seed=seed)

    config.simulation.n_variant_per_position = n_var_per_pos

    print(
        f"Testing config: {name} with n_var_per_pos {n_var_per_pos} and magnitude {magnitude:d}"
    )
    test_config(config=config, name=name)

Skipping n_var_per_pos_10_noise_1 as results already exist.
Skipping n_var_per_pos_15_noise_1 as results already exist.
Skipping n_var_per_pos_10_noise_2 as results already exist.
Skipping n_var_per_pos_15_noise_2 as results already exist.
Skipping n_var_per_pos_10_noise_4 as results already exist.
Skipping n_var_per_pos_15_noise_4 as results already exist.
Skipping n_var_per_pos_10_noise_8 as results already exist.
Skipping n_var_per_pos_15_noise_8 as results already exist.
Skipping n_var_per_pos_10_noise_16 as results already exist.
Skipping n_var_per_pos_15_noise_16 as results already exist.
