In [37]:
import numpy as np
import pandas as pd
import matlab.engine
import yaml
import time
import nolitsa
import importlib
import pathlib
import matplotlib.pyplot as plt
import csv
from tqdm import tqdm
from scipy.stats import pearsonr
from loguru import logger
from Entropy import ApproximateEntropy
from functools import partial, wraps
from itertools import product
import os
from typing import Union
from numbers import Number
from utilities import ZScore

In [66]:
import importlib
import Nonlinear
importlib.reload(Nonlinear)

<module 'Nonlinear' from '/Users/jmoo2880/Documents/py-hctsa-project/pyhctsa/Operations/Nonlinear.py'>

In [3]:
def compare_outputs(outputs, exclude_keys=None):
    """
    Compare MATLAB and Python feature outputs, computing relative errors
    and Pearson correlations.

    Parameters
    ----------
    outputs : dict
        Nested dictionary of feature values with structure:
        {
            feature_name: {
                ts_id: {
                    'matlab': scalar_or_dict,
                    'python': scalar_or_dict
                }
            }
        }
    exclude_keys : set or list, optional
        Set or list of keys (feature names or feature.subkey) to exclude from comparison.

    Returns
    -------
    results : dict
        Dictionary mapping feature (or subfeature) names to correlation and stats:
        {
            'feature.subkey': {
                'r': float,
                'pval': float,
                'res_py': ndarray,
                'res_matlab': ndarray,
                'max_rel_err': float
            }
        }
    """
    if exclude_keys is None:
        exclude_keys = set()
    else:
        exclude_keys = set(exclude_keys)

    flat = {}
    for feat, ts_dict in outputs.items():
        for ts, run in ts_dict.items():
            ml = run['matlab']
            py = run['python']
            if isinstance(ml, dict) and isinstance(py, dict):
                for k, mlv in ml.items():
                    slot = f"{feat}.{k}"
                    if slot in exclude_keys or k not in py:
                        continue
                    pyv = py[k]
                    flat.setdefault(slot, {})[ts] = (mlv, pyv)
            elif isinstance(ml, Number) and isinstance(py, Number):
                if feat in exclude_keys:
                    continue
                flat.setdefault(feat, {})[ts] = (ml, py)
            else:
                raise ValueError(f"Feature {feat}@{ts} is neither both scalars nor both dicts.")

    results = {}
    for slot, tsmap in flat.items():
        ml_vals, py_vals = [], []
        rel_errors = []

        for ts, (mlv, pyv) in tsmap.items():
            ml_vals.append(mlv)
            py_vals.append(pyv)

            both_finite = np.isfinite(mlv) and np.isfinite(pyv)
            both_nan = np.isnan(mlv) and np.isnan(pyv)
            both_posinf = (mlv == np.inf) and (pyv == np.inf)
            both_neginf = (mlv == -np.inf) and (pyv == -np.inf)

            if both_finite:
                if mlv == 0:
                    rel_err = np.nan
                else:
                    rel_err = abs(mlv - pyv) / abs(mlv) * 100
                    rel_errors.append(rel_err)
                print(f"[{slot} | ts={ts}]  RelErr% = {rel_err:.2f}")
            elif both_nan or both_posinf or both_neginf:
                print(f"[{slot} | ts={ts}]  RelErr% = MATCH (both non-finite)")
            else:
                print(f"[{slot} | ts={ts}]  RelErr% = NaN (mismatch in finiteness)")

        ml_arr = np.array(ml_vals, dtype=float)
        py_arr = np.array(py_vals, dtype=float)
        finite_mask = np.isfinite(ml_arr) & np.isfinite(py_arr)

        if finite_mask.sum() > 1 and ml_arr[finite_mask].std() and py_arr[finite_mask].std():
            r, p = pearsonr(ml_arr[finite_mask], py_arr[finite_mask])
        else:
            r, p = np.nan, np.nan

        max_rel_err = np.nanmax(rel_errors) if rel_errors else np.nan

        results[slot] = {
            'r': r,
            'pval': p,
            'res_py': py_arr,
            'res_matlab': ml_arr,
            'max_rel_err': max_rel_err
        }

    return results

In [4]:
empirical1000 = []
with open("../../../empirical1000/hctsa_timeseries-data.csv") as csvfile:
    reader = csv.reader(csvfile, delimiter=',')
    for row in reader:
        # Convert each element from string to float (or int if appropriate)
        try:
            time_series = [float(value) for value in row if value != '']
            empirical1000.append(time_series)
        except ValueError as e:
            print(f"Skipping row due to conversion error: {row}")
            continue

In [7]:
from Scaling import FastDFA

In [11]:
FastDFA(np.array(empirical1000[100]))

np.float64(1.2568977625781386)

In [4]:
def load_yaml(file):
    print(f"Loading configuration file: {file.split('/')[-1]}")
    funcs = {}
    with open(file) as f:
        yf = yaml.load(f, Loader=yaml.FullLoader)

    for module_name in yf:
        print(f"\n*** Importing module {module_name} *** \n")
        module = importlib.import_module(module_name)
        for function_name in yf[module_name]:
            # Get the function's configuration dictionary
            function_config = yf[module_name][function_name]
            # If no configs section exists or if it's empty, use a list with single empty dict
            if ('configs' not in function_config or function_config.get('configs') is None or 
                function_config.get('configs') == []):
                configs = [{}]
            else:
                configs = function_config.get('configs', [{}])

            for params in configs:
                # Handle the case where params is None
                if params is None:
                    params = {}
                    
                zscore_first = params.pop("zscore", False)
                param_keys, param_vals = zip(*params.items()) if params else ([], [])
                
                param_combinations = [dict(zip(param_keys, values)) 
                                   for values in product(*[v if isinstance(v, list) 
                                                        else [v] for v in param_vals])]
                
                # If no parameter combinations were generated, add empty dict
                if not param_combinations:
                    param_combinations = [{}]
                
                # create a function for each parameter combination
                for param_set in param_combinations:
                    feature_name = (f"{module_name}_{function_name}_" + 
                                  "_".join(f"{v}" for k, v in param_set.items())
                                  if param_set else f"{module_name}_{function_name}")
                    if not zscore_first:
                        feature_name += "_raw"
                    
                    print(f"Adding operation {feature_name} with params {param_set} "
                          f"(Z-score={zscore_first})")
                    
                    base_func = partial(getattr(module, function_name), **param_set)
                    if zscore_first:
                        base_func = zscore_decorator(base_func)
                    
                    # return the MATLAB callable corresponding to the python implementation for direct comparison
                    # make sure to check whethe the data needs to be zscored when calling the MATLAB func, cannot be wrapped as it is not a python function
                    # so needs to be done manually when calling the function.
                    hctsa_name = function_config.get('hctsa_name')
                    hctsa_callable = eval(f"eng.{hctsa_name}")

                    # keep ordered args only for testing YAML otherwise bloats
                    funcs[feature_name] = {'callable': base_func, 'params': param_set, 'hctsa_name': function_config.get('hctsa_name'), 
                                           'matlab_callable': hctsa_callable, 'isZscore': zscore_first, 'ordered_args': function_config.get('ordered_args')}
                    
    return funcs

In [5]:
def eval_comparison(yaml, data):
    func_dict = load_yaml(yaml)
    func_res = dict()
    for func in func_dict:
        print(f"Evalutating {func}")
        f = func_dict[func]
        python_func = f['callable']
        matlab_func = f['matlab_callable']
        hctsa_name = f['hctsa_name']
        isZscore = f['isZscore']
        params = f['params']
        ordered_args = []
        if params:
            order = f['ordered_args']
            ordered_args = [params[k] for k in order]
            
        print(f"Comparing to {hctsa_name}")
        res = dict()
        for i in range(len(data)):
            x = np.array(data[i]).reshape(-1, 1)
            matlab_eval = matlab_func(ZScore(matlab.double(x)), *ordered_args) if isZscore else matlab_func(matlab.double(x), ordered_args)
            python_eval = python_func(x)
            res[i] = {'matlab': matlab_eval, 'python': python_eval}
        func_res[func] = res
    return func_res

In [16]:
eng = matlab.engine.start_matlab()

In [18]:
proj_root = pathlib.Path("/Users/jmoo2880/Documents/hctsa")
eng.addpath(eng.genpath(str(proj_root)), nargout=0)
def zscore_decorator(func):
    @wraps(func)
    def wrapper(y, *args, **kwargs):
        y = ZScore(y)
        return func(y, *args, **kwargs)
    return wrapper

def range_constructor(loader, node):
    start, end = loader.construct_sequence(node)
    return list(range(start, end+1))
yaml.add_constructor("!range", range_constructor)

In [21]:
fdfa_res = eval_comparison("/Users/jmoo2880/Documents/py-hctsa-project/pyhctsa/Configurations/scaling.yaml", empirical1000)

Loading configuration file: scaling.yaml

*** Importing module Scaling *** 

Adding operation Scaling_FastDFA with params {} (Z-score=True)
Evalutating Scaling_FastDFA
Comparing to SC_fastdfa


In [24]:
fdfa_res['Scaling_FastDFA'][190]

{'matlab': 0.8588145412460522, 'python': np.float64(0.8588145412460509)}

In [26]:
fdfa_res_compare = compare_outputs(fdfa_res)

[Scaling_FastDFA | ts=0]  RelErr% = 0.00
[Scaling_FastDFA | ts=1]  RelErr% = 0.00
[Scaling_FastDFA | ts=2]  RelErr% = 0.00
[Scaling_FastDFA | ts=3]  RelErr% = 0.00
[Scaling_FastDFA | ts=4]  RelErr% = 0.00
[Scaling_FastDFA | ts=5]  RelErr% = 0.00
[Scaling_FastDFA | ts=6]  RelErr% = 0.00
[Scaling_FastDFA | ts=7]  RelErr% = 0.00
[Scaling_FastDFA | ts=8]  RelErr% = 0.00
[Scaling_FastDFA | ts=9]  RelErr% = 0.00
[Scaling_FastDFA | ts=10]  RelErr% = 0.00
[Scaling_FastDFA | ts=11]  RelErr% = 0.00
[Scaling_FastDFA | ts=12]  RelErr% = 0.00
[Scaling_FastDFA | ts=13]  RelErr% = 0.00
[Scaling_FastDFA | ts=14]  RelErr% = 0.00
[Scaling_FastDFA | ts=15]  RelErr% = 0.00
[Scaling_FastDFA | ts=16]  RelErr% = 0.00
[Scaling_FastDFA | ts=17]  RelErr% = 0.00
[Scaling_FastDFA | ts=18]  RelErr% = 0.00
[Scaling_FastDFA | ts=19]  RelErr% = 0.00
[Scaling_FastDFA | ts=20]  RelErr% = 0.00
[Scaling_FastDFA | ts=21]  RelErr% = 0.00
[Scaling_FastDFA | ts=22]  RelErr% = 0.00
[Scaling_FastDFA | ts=23]  RelErr% = 0.00
[S

In [None]:
from typing import Iterable, Tuple, Union, Optional

def ms_embed(
    z: Union[np.ndarray, Iterable[float]],
    v: Optional[Union[int, Iterable[int]]] = None,
    w: Optional[int] = None,
    *,
    split: bool = False,
    auto_neg: bool = True
) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]:
  
    # ---- 1. Parse & normalise lag specification ----------------------------
    if w is not None:                       # three-argument MATLAB call
        if not isinstance(v, int) or v < 1:
            raise ValueError("When `w` is given, `v` must be a positive integer (embedding dimension).")
        lags = np.arange(0, v * w, w, dtype=int)
    elif v is None:                         # single-argument MATLAB call
        lags = np.array([0, 1, 2], dtype=int)
    else:                                   # explicit lag list
        lags = np.asarray(v, dtype=int).copy()

    if split and auto_neg and np.min(lags) >= 0:
        lags = np.insert(lags, 0, -1)

    lags.sort()
    min_lag, max_lag = lags[0], lags[-1]
    dim = len(lags)

    # ---- 2. Validate & reshape input series --------------------------------
    z = np.asarray(z, dtype=float).ravel()        # ensure 1-D row vector
    n = z.size
    if n <= max_lag - min_lag:
        raise ValueError(
            "Time series is too short for the requested lag window "
            f"(length={n}, window={max_lag - min_lag})."
        )

    # ---- 3. Build the embedding matrix -------------------------------------
    window = max_lag - min_lag
    m = n - window
    # Equivalent to MATLAB: t = (1:m) + max_lag  (but zero-based)
    t_idx = np.arange(m) + max_lag

    X = np.empty((dim, m), dtype=z.dtype)
    for i, lag in enumerate(lags):
        X[i] = z[t_idx - lag]

    # ---- 4. Split into positive/negative if requested ----------------------
    if split:
        neg_mask = lags < 0
        Y_neg = X[neg_mask]
        X_pos = X[~neg_mask]
        return X_pos, Y_neg
    return X

In [29]:
X = ms_embed(empirical1000[0], 3, 1)

In [4]:
from pyhctsa.Toolboxes.Michael_Small.ms_nearest_wrapper import ms_nearest_py

ImportError: cannot import name 'ms_nearest_py' from 'pyhctsa.Toolboxes.Michael_Small.ms_nearest_wrapper' (/Users/jmoo2880/Documents/py-hctsa-project/pyhctsa/Toolboxes/Michael_Small/ms_nearest_wrapper.py)

In [241]:
importlib.reload(Nonlinear)
from Nonlinear import FNN

In [243]:
FNN(empirical1000[513], justBest=False, tau='ac')

{'pfnn_1': np.float64(0.9798994974874372),
 'pfnn_2': np.float64(0.4301507537688442),
 'pfnn_3': np.float64(0.08274470232088799),
 'pfnn_4': np.float64(0.013171225937183385),
 'pfnn_5': np.float64(0.003051881993896236),
 'pfnn_6': np.float64(0.0),
 'pfnn_7': np.float64(0.0),
 'pfnn_8': np.float64(0.0),
 'pfnn_9': np.float64(0.0),
 'pfnn_10': np.float64(0.0),
 'meanpfnn': np.float64(0.15090180615082488),
 'stdpfnn': np.float64(0.3205115054420895),
 'firstunder02': np.int64(3),
 'firstunder01': np.int64(3),
 'firstunder005': np.int64(4),
 'firstunder002': np.int64(4),
 'firstunder001': np.int64(5),
 'max1stepchange': np.float64(0.5497487437185931)}