In [1]:
import os
import json
from typing import Dict, Tuple
import numpy as np


In [2]:
experiments_path = "experiments/"

# json_path : json_data
json_data : Dict[str, Dict] = {}

for root, _, files in os.walk(experiments_path):
    for file in files:
        if file.endswith(".json"):
            file_path = os.path.join(root, file)
            try:
                with open(file_path, "r") as f:
                    json_data[file_path] = json.load(f)
            except json.JSONDecodeError:
                print(f"Warning: Failed to parse {file_path}")


json_paths = json_data.keys()

# unique_exp_name : json_path
exp_index : Dict[str, str] = {}

for path in json_paths:
    basename = path.split("/")[-1]
    exp_name = basename.split("_")[0]
    if exp_name not in exp_index.keys():
        exp_index[exp_name] = path
    else:
        count = 1
        new_exp_name = f"{exp_name}_{count}"
        while new_exp_name in exp_index:
            count += 1
            new_exp_name = f"{exp_name}_{count}"
        exp_index[new_exp_name] = file_path

for exp_name, _ in exp_index.items():
    print(exp_name)

wm-1step
blind-ac
wm-5step
cnn
cnn_1
lower
low
small
high
large
high_1
vae
vae_1
td
td_1
triplet
large_1
return
agressive
high_2
td_2
td_3
reward
kl
uniform-policy
mvp
lower_1
low_1
small_1
high_3
large_2
high_4


In [3]:


timeseries_data : Dict[str, Tuple[np.ndarray]] = {}
for exp_name, path in exp_index.items():
    try:
        avg_reward = np.array(json_data[path]["performance"]["avg_rewards"], dtype=np.float32)
        std_reward = np.array(json_data[path]["performance"]["std_rewards"], dtype=np.float32)
        timeseries_data[path] = (avg_reward, std_reward)
    except KeyError:
        print(f"Warning: Missing keys in {path}")

print(f"Collected time series data for {len(timeseries_data)} experiments.")


Collected time series data for 19 experiments.


In [4]:
import numpy as np
from scipy.stats import norm
from statsmodels.tsa.stattools import adfuller

class StationarityAnalyzer:
    def __init__(
        self,
        avg_reward: np.ndarray,
        std_reward: np.ndarray,
        n: int = 3,
        window_size: int = 10,
        threshold: float = 0.05,
        alpha: float = 0.05,
        ):
        self.avg_reward = avg_reward
        self.std_reward = std_reward
        self.n = n
        self.window_size = window_size
        self.threshold = threshold
        self.alpha = alpha

        self._stationary_index = -1 # Default is not achieving stationarity


        self._perform_analysis()

    def _moving_average(self, data: np.ndarray) -> np.ndarray:
        """Compute moving average with given window size."""
        return np.convolve(data, np.ones(self.window_size) / self.window_size, mode='valid')
    
    def _moving_variance(self, data: np.ndarray) -> np.ndarray:
        """Compute moving variance with given window_size."""
        return self._moving_average(data**2) - self._moving_average(data)**2

    def _confidence_width(self) -> np.ndarray:
        """Compute confidence interval width for each time step.
        Assumes Gaussian distribution approximation."""
        return (1.96 * self.std_reward) / np.sqrt(self.n)
    
    def _geweke_test(self, first_frac: float = 0.1, last_frac: float = 0.5):
        """Perform geweke test for compare early and late parts of the series."""
        N = len(self.avg_reward)
        first_segment = self.avg_reward[:int(N * first_frac)]
        last_segment = self.avg_reward[int(N * (1 - last_frac)):]

        mean_first, mean_last = np.mean(first_segment), np.mean(last_segment)
        var_first = np.var(first_segment, ddof=1) / len(first_segment)
        var_last = np.var(last_segment, ddof=1) / len(last_segment)

        z_score = (mean_first - mean_last) / np.sqrt(var_first + var_last)
        p_value = 2 * (1 - norm.cdf(abs(z_score)))

        return p_value > self.alpha # Accept stationarity if p > alpha
    
    def _adf_test(self):
        """Perform Augmented Dickey-Fuller test to check for stationarity."""
        adf_result = adfuller(self.avg_reward, autolag='AIC')
        return adf_result[1] < self.alpha # Stationarity if p < alpha


    def _perform_analysis(self):
        """
        Perform stationarity analysis by checking stability of mean, variance, and confidence interval.
        """
        moving_avg = self._moving_average(self.avg_reward)
        moving_var = self._moving_variance(self.avg_reward)
        conf_interval = self._confidence_width()

        for t in range(len(moving_avg) - 1):
            mean_change = np.abs(moving_avg[t + 1] - moving_avg[t])
            var_change = np.abs(moving_var[t + 1] - moving_var[t])

            ci_change = np.abs(conf_interval[t + 1] - conf_interval[t])

            geweke_result = self._geweke_test()

            adf_result = self._adf_test()

            if (
                mean_change < self.threshold and
                var_change < self.threshold and 
                ci_change < self.threshold and
                geweke_result and
                adf_result
            ):
                self._stationary_index = t + self.window_size
                return
            
    @property
    def stationarity(self) -> int:
        """Return index where time series achieves stationrity.
        If stationarity is never reached, return -1"""
        return self._stationary_index


avg_r, std_r = timeseries_data[exp_index["vae"]]
n = 3
window_size = 10
threhsold = 0.05

analyzer = StationarityAnalyzer(avg_r, std_r, n=n, window_size=window_size, threshold=threhsold)

print("Stationarity Index:", analyzer.stationarity)

ModuleNotFoundError: No module named 'scipy'

SyntaxError: invalid syntax (1722139382.py, line 1)