<a href="https://colab.research.google.com/github/nsambel1980/causal_discovery/blob/main/KS_hurst.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Hurst Exponent estimation via KS test

In [18]:
import numpy as np
from scipy import stats
from typing import List, Tuple
import warnings

class KSHurstEstimator:
    """
    Implements Kolmogorov-Smirnov based Hurst exponent estimation.
    """

    def __init__(self, min_tau: int = 2, max_tau: int = None):
        """
        Initialize the estimator.

        Parameters:
        -----------
        min_tau : int
            Minimum scale to consider
        max_tau : int
            Maximum scale to consider (if None, will be set based on data length)
        """
        self.min_tau = min_tau
        self.max_tau = max_tau

    def _get_increments_ghe(self, data: np.ndarray, tau: int) -> np.ndarray:
        """
        Calculate increments for GHE method.
        """
        increments = np.abs(data[tau:] - data[:-tau])
        return increments

    def _get_tau_range(self, data_length: int) -> List[int]:
        """
        Generate range of tau values to use in estimation.
        """
        max_tau = self.max_tau or data_length // 8
        tau_range = []
        current_tau = self.min_tau
        while current_tau <= max_tau:
            tau_range.append(current_tau)
            current_tau *= 2
        return tau_range

    def _scale_samples(self, samples: np.ndarray, tau: float, h: float) -> np.ndarray:
        """
        Scale samples by tau^H.
        """
        return samples / (tau ** h)

    def _compute_ks_statistic(self, data: np.ndarray, h: float, tau_range: List[int]) -> Tuple[float, List[float]]:
        """
        Compute sum of KS statistics and p-values for given H value.
        """
        # Get reference distribution (tau=1)
        ref_samples = self._get_increments_ghe(data, 1)

        total_ks_stat = 0
        p_values = []

        for tau in tau_range:
            # Get and scale samples for current tau
            current_samples = self._get_increments_ghe(data, tau)
            scaled_samples = self._scale_samples(current_samples, tau, h)

            # Compute KS statistic and p-value
            ks_stat, p_value = stats.ks_2samp(ref_samples, scaled_samples)
            total_ks_stat += ks_stat
            p_values.append(p_value)

        return total_ks_stat, p_values

    def _compute_confidence_interval(self, data: np.ndarray, h_range: np.ndarray,
                                tau_range: List[int], confidence_level: float = 0.99) -> Tuple[List[float], List[float]]:
        """
        Compute confidence interval for H based on KS test p-values.
        Returns all H values where self-similarity property holds at given confidence level.
        """
        valid_h_values = []
        all_p_values = []

        # Test self-similarity for each H value
        alpha = 1 - confidence_level
        for h in h_range:
            _, p_values = self._compute_ks_statistic(data, h, tau_range)
            all_p_values.append(min(p_values))  # Store minimum p-value for this H

            # If all scales pass KS test at this H, include it
            if all(p >= alpha for p in p_values):
                valid_h_values.append(h)

        if not valid_h_values:
            warnings.warn("No H values satisfy self-similarity at specified confidence level")
            return [h_range[np.argmax(all_p_values)]], all_p_values

        return valid_h_values, all_p_values

    def _optimize_h(self, data: np.ndarray, tau_range: List[int],
                   h_range: np.ndarray) -> Tuple[float, np.ndarray, List[List[float]]]:
        """
        Find H that minimizes sum of KS statistics.
        """
        ks_stats = []
        all_p_values = []
        for h in h_range:
            ks_stat, p_values = self._compute_ks_statistic(data, h, tau_range)
            ks_stats.append(ks_stat)
            all_p_values.append(p_values)

        h_optimal = h_range[np.argmin(ks_stats)]
        return h_optimal, np.array(ks_stats), all_p_values

    def estimate(self, data: np.ndarray, h_steps: int = 100, confidence_level: float = 0.99) -> Tuple[float, dict]:
        """
        Estimate Hurst exponent using KS method.
        """
        if len(data) < 100:
            warnings.warn("Time series may be too short for reliable estimation")

        tau_range = self._get_tau_range(len(data))
        h_range = np.linspace(0.01, 0.99, h_steps)

        # Find optimal H
        h_estimate, ks_stats, all_p_values = self._optimize_h(data, tau_range, h_range)

        # Compute confidence interval using KS test p-values
        valid_h_values, p_values_by_h = self._compute_confidence_interval(
            data, h_range, tau_range, confidence_level)

        if len(valid_h_values) > 1:
            ci = [min(valid_h_values), max(valid_h_values)]
        else:
            ci = valid_h_values * 2

        results = {
            'h_range': h_range,
            'ks_statistics': ks_stats,
            'p_values': all_p_values[np.argmin(ks_stats)],
            'confidence_interval': ci,
            'tau_range': tau_range,
            'valid_h_values': valid_h_values,
            'p_values_by_h': p_values_by_h,
            'confidence_level': confidence_level
        }

        return h_estimate, results

    def test_self_similarity(self, data: np.ndarray, h: float, confidence_level: float = 0.99) -> Tuple[bool, dict]:
        """
        Test if the time series satisfies the self-similarity property for a given H.
        """
        tau_range = self._get_tau_range(len(data))
        _, p_values = self._compute_ks_statistic(data, h, tau_range)

        # A series is self-similar if all KS tests accept the null hypothesis
        alpha = 1 - confidence_level
        is_self_similar = all(p >= alpha for p in p_values)

        # Calculate overall confidence level
        overall_confidence = confidence_level ** len(tau_range)

        results = {
            'tau_range': tau_range,
            'p_values': p_values,
            'individual_confidence_level': confidence_level,
            'overall_confidence_level': overall_confidence,
            'critical_value': alpha,
            'failing_scales': [tau for tau, p in zip(tau_range, p_values) if p < alpha]
        }

        return is_self_similar, results

if __name__ == "__main__":
    # Generate some sample data (random walk, H should be ~0.5)
    np.random.seed(40)
    n_points = 1000
    random_walk = np.cumsum(np.random.randn(n_points))

    # Create estimator
    estimator = KSHurstEstimator()

    # Estimate H
    h_estimate, estimation_results = estimator.estimate(random_walk)
    print(f"\nEstimated Hurst exponent: {h_estimate:.3f}")
    print(f"Confidence interval: [{estimation_results['confidence_interval'][0]:.3f}, "
          f"{estimation_results['confidence_interval'][1]:.3f}]")

    # Test self-similarity at different confidence levels
    confidence_levels = [0.99, 0.95, 0.90]
    for cl in confidence_levels:
        is_self_similar, test_results = estimator.test_self_similarity(random_walk, h_estimate, cl)
        print(f"\nSelf-similarity test at {cl*100}% confidence level:")
        print(f"Is self-similar: {is_self_similar}")
        if not is_self_similar:
            print(f"Failing scales (tau): {test_results['failing_scales']}")
        print(f"Overall confidence level: {test_results['overall_confidence_level']:.3f}")
        print(f"P-values at each scale: {[f'{p:.3f}' for p in test_results['p_values']]}")


Estimated Hurst exponent: 0.515
Confidence interval: [0.495, 0.535]

Self-similarity test at 99.0% confidence level:
Is self-similar: True
Overall confidence level: 0.941
P-values at each scale: ['0.489', '0.320', '0.174', '0.381', '0.325', '0.690']

Self-similarity test at 95.0% confidence level:
Is self-similar: True
Overall confidence level: 0.735
P-values at each scale: ['0.489', '0.320', '0.174', '0.381', '0.325', '0.690']

Self-similarity test at 90.0% confidence level:
Is self-similar: True
Overall confidence level: 0.531
P-values at each scale: ['0.489', '0.320', '0.174', '0.381', '0.325', '0.690']
