## Variability metrics for VHF timeseries 

In [None]:
import numpy as np
import pandas as pd

### Time-integrated area under the curve (AUC)

Non-normalized

In [None]:
def auc_signed(series, max_gap):
    """
    calculates AUC, excluding intervals larger than max_gap.
    """
    s = series.dropna().sort_index()
    dt = s.index.to_series().diff()

    valid = dt <= max_gap
    auc = (s.shift(1)[valid] * dt[valid]).sum() # exclude intervals larger than max_gap

    return auc

Normalized

In [None]:
def auc_signed_normalized(series, max_gap):
    """
    AUC normalized by total valid duration.
    """
    s = series.dropna().sort_index()
    dt = s.index.to_series().diff()

    valid = dt <= max_gap
    weighted_sum = (s.shift(1)[valid] * dt[valid]).sum()
    total_time = dt[valid].sum()

    if total_time.total_seconds() == 0:
        return np.nan

    return weighted_sum / total_time


### Fractuib of time being positive vs negative

Only duration

In [None]:
def fraction_positive(series, max_gap):
    """
    fraction of valid time spent in positive conditions.
    """
    s = series.dropna().sort_index()
    dt = s.index.to_series().diff()

    valid = dt <= max_gap
    pos_time = dt[valid & (s.shift(1) > 0)].sum()
    total_time = dt[valid].sum()

    if total_time.total_seconds() == 0:
        return np.nan

    return pos_time / total_time


Weighted by magnitude

In [None]:
def fraction_positive_weighted(series, max_gap):
    """
    fraction of total absolute magnitude-time contributed by positive values.
    """
    s = series.dropna().sort_index()
    dt = s.index.to_series().diff()

    valid = dt <= max_gap

    pos = (s.shift(1)[valid & (s.shift(1) > 0)] * dt[valid & (s.shift(1) > 0)]).sum()
    neg = (np.abs(s.shift(1)[valid & (s.shift(1) < 0)]) * dt[valid & (s.shift(1) < 0)]).sum()

    denom = pos + neg
    if denom == pd.Timedelta(0):
        return np.nan

    return pos / denom

### Number of times it crosses zero

Excluding data gaps

In [None]:
def zero_crossing_count_exc(series, max_gap):
    """
    number of sign changes, excluding large gaps.
    """
    s = series.dropna().sort_index()
    dt = s.index.to_series().diff() # compute time differences

    valid = dt <= max_gap
    signs = np.sign(s)  # get signs of the series

    crossings = (
        valid   
        & (signs.shift(1) * signs < 0)  # count sign changes only where valid
    )

    return crossings.sum()

Including data gaps

In [None]:
def zero_crossing_count_inc(series, max_gap):
    """
    number of sign changes, including data gaps.
    """
    s = series.dropna().sort_index()
    s = s[s != 0] # remove exact zeros to avoid ambiguity in sign changes
    signs = np.sign(s.values)

    crossings = (signs[:-1] * signs[1:] < 0) # count all sign changes
    return crossings.sum()

## Calculations

In [None]:
spring = pd.read_csv("../spring_HF.csv", parse_dates=["Date"], index_col="Date")
summer = pd.read_csv("../summer_HF.csv", parse_dates=["Date"], index_col="Date")

Spring 

In [None]:
# AUC 
for probe in ["T1", "T2", "T3", "T5", "T6", "T7", "T8"]:
    s = spring[probe]  # pd.Series with DatetimeIndex
    auc = auc_signed(s, max_gap=pd.Timedelta("2H"))
