In [1]:
from time_series.data_generators import LorenzGenerator
from time_series.time_series_models import KernelRidgeRegression, MovingAverageEstimator
from time_series.kernels import GaussianKernel
from time_series.evaluators.mse_one_step import MeanSquaredError

[32m2025-09-19 09:32:24.949[0m | [1mINFO    [0m | [36mtime_series.config[0m:[36m<module>[0m:[36m13[0m - [1mPROJ_ROOT path is: /home/james/Repo/PhD Repo/time_series_clustering[0m


In [2]:
import numpy as np
import itertools
import optuna

  from .autonotebook import tqdm as notebook_tqdm


# Config parsing

In [3]:
import yaml

with open("experiment.yaml", "r") as file:
    config = yaml.safe_load(file)

In [4]:
defintitions_conf = config["definitions"]
experiments_conf = config["experiments"]

In [5]:
definitions = dict()
for k, conf in defintitions_conf.items():
    if k == "datasets":
        datasets = dict()
        for d, v  in conf.items():
            datasets[d] = dict()
            if v["generator"] == "Lorenz":
                datasets[d]["generator"] = LorenzGenerator
            if "parameters" in v:
                datasets[d]["parameters"] = v["parameters"]    
            else:
                datasets[d]["parameters"] = {}
        definitions[k] = datasets

    elif k == "models":
        models = dict()
        for m, v  in conf.items():
            models[m] = dict()
            if v["model"] == "KernelRidgeRegression":
                models[m]["model"] = KernelRidgeRegression
                
            if "parameters" in v:
                models[m]["parameters"] = v["parameters"]
            else:
                models[m]["parameters"] = {}

            if "hyperparameters" in v:
                models[m]["hyperparameters"] = v["hyperparameters"]

        definitions[k] = models

    elif k == "kernels":
        kernels = dict()
        for i, v  in conf.items():
            kernels[i] = dict()
            if v["kernel"] == "GaussianKernel":
                kernels[i]["kernel"] = GaussianKernel
                
            if "parameters" in v:
                kernels[i]["parameters"] = v["parameters"]
            else:
                kernels[i]["parameters"] = {}

            if "hyperparameters" in v:
                kernels[i]["hyperparameters"] = v["hyperparameters"]

        definitions[k] = kernels
    
    elif k == "evaluators":
        evaluators = dict()
        for i, v  in conf.items():
            evaluators[i] = dict()
            if v["evaluator"] == "MeanSquaredError":
                evaluators[i]["evaluator"] = MeanSquaredError
                
            if "parameters" in v:
                evaluators[i]["parameters"] = v["parameters"]
            else:
                evaluators[i]["parameters"] = {}
        definitions[k] = evaluators

# Dataset generation

In [6]:
class TimeSeriesData:
    def __init__(self, X, y=None, train_val_test_split=None, **kwargs):
        self.__dict__.update(kwargs)
        self.X = X
        self.y = y

        self.N = len(X)
        self.indices = np.arange(self.N)
        self.tvt_split = train_val_test_split

    def train_data(self):
        train_idx = self.indices[:int(self.tvt_split[0]*self.N)]

        if type(self.y) == type(None):
            return self.X[train_idx], None

        return self.X[train_idx], self.y[train_idx]

    def val_data(self, lag=0):
        val_idx = self.indices[int(self.tvt_split[1]*self.N) - lag:int(self.tvt_split[2]*self.N)]

        if type(self.y) == type(None):
            return self.X[val_idx], None

        return self.X[val_idx], self.y[val_idx]

    def test_data(self, lag=0):
        test_idx = self.indices[int(self.tvt_split[2]*self.N)-lag:]

        if type(self.y) == type(None):
            return self.X[test_idx], None

        return self.X[test_idx], self.y[test_idx]



In [7]:
def iterate_datasets(datasets):
    """
    Defines an iterator over the datasets specified in the config. 
    """
    for dataset_name, dataset_confs in datasets.items():
        dataset_def = definitions["datasets"][dataset_name]

        if "train_val_test_split" in dataset_confs:
            tvt_split = dataset_confs["train_val_test_split"]
        else:
            tvt_split = [1]

        result = dict(
            dataset=dataset_name,
            train_test_val_split = tvt_split,
            sweep_vals=None,
            parameters=dict(dataset_def["parameters"])
        )

        if "parameters" in dataset_confs:
            for param, value in dataset_confs["parameters"].items():
                result["parameters"][param] = value

        # Process overides
        if "sweeps" in dataset_confs:
            for sweep in dataset_confs["sweeps"]:
                sweep_result = dict(result)

                sweep_val_names = []
                sweep_values = []
                for sweep_param, sweep_conf in dataset_confs["sweeps"][sweep].items():
                    sweep_vals = np.linspace(
                        float(sweep_conf["min"]), 
                        float(sweep_conf["max"]), 
                        int(sweep_conf["N_steps"])
                    )
                    sweep_val_names.append(sweep_param)
                    sweep_values.append(sweep_vals)

                # Combine the sweep values
                all_combinations = itertools.product(*sweep_values)                
                for combined_vals in all_combinations:
                    for i, param in enumerate(sweep_val_names):
                        sweep_result["parameters"][param] = combined_vals[i]

                    t, data = dataset_def["generator"](**sweep_result["parameters"])()
                    sweep_result["data"] = TimeSeriesData(
                        t = t,
                        X = data[:-1],
                        y = data[1:],
                        train_val_test_split=tvt_split
                    )
                    yield sweep_result
        
        else:
            t, data = dataset_def["generator"](**result["parameters"])()
            result["data"] = TimeSeriesData(
                t = t,
                X = data[:-1],
                y = data[1:],
                train_val_test_split=tvt_split
            )
            yield result

In [8]:
for experiment_name, experiment in experiments_conf.items():
    # Process datasets
    dataset_generator = iterate_datasets(experiment["datasets"])
    for dataset in dataset_generator:
        break


# Hyperparameter tuning

In [10]:
for model_name, model_confs in experiment["models"].items():
    model = definitions["models"][model_name]["model"]
    parameters = dict(definitions["models"][model_name]["parameters"])

    if "hyperparameters" in model_confs:
        X_train, y_train = dataset["data"].train_data()

        if "lag" not in model_confs["hyperparameters"]:
            X_val, y_val = dataset["data"].val_data()

        evaluator = experiment["evaluators"]["hyperparameter_tuning"]

        def objective(trial):
            params = dict(parameters)
            for hparam, hparam_conf in model_confs["hyperparameters"].items():
                if hparam_conf["type"] == float:
                    params[hparam] = trial.suggest_float(
                        hparam,
                        hparam_conf["min"],
                        hparam_conf["max"] 
                    )
                elif hparam_conf["type"] == int:
                    params[hparam] = trial.suggest_int(
                        hparam,
                        hparam_conf["min"],
                        hparam_conf["max"] 
                    )
                else:
                    raise ValueError("Expecting either int or float hyperparameter")

            test_model = model(**params)
            test_model.fit(X_train, y_train)

            if "lag" in params:
                X_val, y_val = dataset["data"].val_data(lag = params["lag"])
            
            y_pred = test_model.predict(X_val)






    # # Process kernels
    # if "kernels" in model_confs:
    #     kernels = []
    #     for kernel_name in model_confs["kernels"]:
    #         kernel = definitions["kernels"][kernel_name]["kernel"]
    #         if "parameters" in definitions["kernels"][kernel_name]:
    #             kernel_params = dict(definitions["kernels"][kernel_name]["parameters"])
    #         else:
    #             kernel_params = dict()
    #         kernels.append(
    #             definitions["kernels"][kernel_name]["kernel"](**kernel_params)
    #         )


    # if "parameters" in model_confs:
    #     parameters.update(model_confs["parameters"])


In [11]:
model_confs["hyperparameters"]

{'reg': {'type': 'float', 'min': 1e-07, 'max': 0.01}}

In [12]:
experiment["models"][model_name]

{'kernels': ['k1', 'k1', 'k1'],
 'hyperparameters': {'reg': {'type': 'float', 'min': 1e-07, 'max': 0.01}}}

In [13]:
definitions["models"]

{'model_krr': {'model': time_series.time_series_models.kernel_ridge_regression.KernelRidgeRegression,
  'parameters': {}}}

In [14]:
[definitions["kernels"][i] for i in model_confs["kernels"]]

[{'kernel': time_series.kernels.gaussian_kernel.GaussianKernel,
  'parameters': {'bandwidth': 100}},
 {'kernel': time_series.kernels.gaussian_kernel.GaussianKernel,
  'parameters': {'bandwidth': 100}},
 {'kernel': time_series.kernels.gaussian_kernel.GaussianKernel,
  'parameters': {'bandwidth': 100}}]

In [15]:
1/0

ZeroDivisionError: division by zero

In [None]:
for model_name, model_confs in experiment["models"].items():
    model = definitions["models"][model_name]["model"]
    parameters = dict(definitions["models"][model_name]["parameters"])

    # Process kernels
    if "kernels" in model_confs:
        kernels = []
        for kernel_name in model_confs["kernels"]:
            kernel = definitions["kernels"][kernel_name]["kernel"]
            if "parameters" in definitions["kernels"][kernel_name]:
                kernel_params = dict(definitions["kernels"][kernel_name]["parameters"])
            else:
                kernel_params = dict()
            kernels.append(
                definitions["kernels"][kernel_name]["kernel"](**kernel_params)
            )


    if "parameters" in model_confs:
        parameters.update(model_confs["parameters"])



TypeError: GaussianKernel.__init__() missing 1 required positional argument: 'bandwidth'

In [None]:
definitions["kernels"]

{'k1': {'kernel': time_series.kernels.gaussian_kernel.GaussianKernel,
  'parameters': None},
 'k2': {'kernel': time_series.kernels.gaussian_kernel.GaussianKernel,
  'parameters': None},
 'k3': {'kernel': time_series.kernels.gaussian_kernel.GaussianKernel,
  'parameters': None}}

In [None]:
kernels

[{'kernel': time_series.kernels.gaussian_kernel.GaussianKernel,
  'parameters': None},
 {'kernel': time_series.kernels.gaussian_kernel.GaussianKernel,
  'parameters': None},
 {'kernel': time_series.kernels.gaussian_kernel.GaussianKernel,
  'parameters': None}]

In [None]:
model, parameters

(time_series.time_series_models.kernel_ridge_regression.KernelRidgeRegression,
 {'reg': 0.01})

In [None]:
model_confs

{'kernels': ['k1', 'k2', 'k3'], 'parameters': {'reg': 0.01}}

In [None]:
1/0

ZeroDivisionError: division by zero

In [None]:
def parse_configs():
    pass

def load_data():
    pass

def load_model():
    pass

def tune_parameters():
    pass

def train_final_model():
    pass

def evaluate_model():
    pass

def generate_reports():
    pass

In [None]:
class Experiment:
    pass

    def run(self):
        # Parse configs

        # Load data

        # Load model

        # Tune parameters

        # Train final model

        # Evaluate model

        # Generate reports