In [1]:
import numpy as np
import scipy.stats as stats
import matplotlib.pyplot as plt
from math import log, e
from sklearn.decomposition import PCA

In [10]:
def frobnorm(mat): 
    r"""
        Compute the frobenius norm 
    """
    n, m = len(mat), len(mat[0])
    matsum = 0
    for i in range(0,n): 
        for j in range(0,m): 
            matsum += mat[i][j]**2

    return matsum**0.5


def cossim(a,b): 
    r"""
        Compute the cossine similarity
    """
    return np.trace(a.T @ b) / (frobnorm(a)*frobnorm(b))

In [63]:
def ell_inf(A, B): # chebyshev norm
    return max(abs(a-b) for a,b in zip(A,B))

def cor_sum(vec, r): # correlation sum
    r"""
        Compute the correlation sum given some radius r
    """
    
    n = len(vec)
    neighbor_counts = [
        sum(ell_inf(x_i, x_j) <= r for x_j in vec) / n
        for x_i in vec
    ]

    return sum(neighbor_counts) / n

def approx_entropy(time_series, m, d, r): 
    """
        Estimate the approximate/discrete entropy of a time series (approximating the growth rate of r-balls). 

        Params: 

        time_series: 1d array_like 
        m, d: int
            delay embedding parameters
        r: float
            radius determining the size of an r-ball
    """
    
    def phi(delay_dim, delay): 
        """
            Compute the log-average of r-balls assuming the delay embedding parameters result in a diffeomorphism 
            of the 1d time series from R^1 to R^d. 
        """
        X = delay_embed(time_series, delay_dim, delay)
        n = len(X)
        neighbor_counts = [
            sum(ell_inf(x_i, x_j) <= r for x_j in X) / n
            for x_i in X
        ]
        count = [c for c in neighbor_counts if c > 0]
        return sum(math.log(c) for c in count) / n

    return abs(phi(m+1, d) - phi(m, d))

def cor_dim(time_series, m, d, r): # correlation dimension; easier to compute than minkowski dimension
    """
        Estimate the correlation dimension of a time series
    """
    X = delay_embed(time_series, m, d)
    CS = cor_sum(X, r)

    return math.log(CS) / math.log(r)

In [59]:
# def AutoCor_k(arr, k): 
#     n = len(arr)
#     M = np.mean(arr)
#     num, denom = 0, 0
#     for i in range(0, n):
#         denom+=(arr[i]-M)**2
#     for i in range(0, n-k): 
#         num+=(arr[i]-M)*(arr[i+k]-M)

#     return num/denom

def acf(x, lag, unbiased=True): # autocorrelation function; used to determine the optimal time delay
    r"""
        Compute the autocorrelation of a signal given delay/lag k. Can be used to determine the optimal time delay. 

        Params:

        x: 1d array_like input data
        lag: integer 
    """
    x = np.asarray(x, dtype=float).ravel()
    x = x-x.mean()
    n = x.size

    num = np.dot(x[:n-lag], x[lag:])
    denom = np.dot(x, x)

    if unbiased: 
        num /= (n-lag)
        denom /= n
    else: 
        num /= n
        denom /= n

    return num / denom

def plot_acf(x, max_lag, unbiased=True): 
    r"""
        Plot ACF from delay/lag = 1 to max_lag k. Returns the list [acf_i] where i is the delay parameter. 

        Params:

        max_lag: int
    """
    x = np.asarray(x, dtype=float).ravel()
    acf_list = [acf(x, i, unbiased=unbiased) for i in range(max_lag)]

    fig, ax = plt.subplots(figsize=(10,6))
    lags = range(max_lag)
    ax.plot(lags, acf_list, 'bo-', markersize=4, linewidth=1)
    ax.axhline(y=0, color='black', linestyle='-', alpha=0.3)

    ax.set_xlabel('Lag')
    ax.set_ylabel('Autocorrelation')
    ax.set_title('Autocorrelation Function over Lag')
    ax.grid(True, alpha=0.3)
    ax.legend()

    ax.set_ylim(-1.2, 1.2)
    plt.tight_layout()
    plt.show()

    return acf_list

In [54]:
def p_hist(x, bins='fd', base=e): 
    r"""
        Compute the probability distribution from histogram. Returns the probability and edges (from the hist). 

        Params: 
        
        x: array-like input data
        bins: see numpy documentation. fd: Freedman Diaconis estimator
    """
    
    x = np.asarray(x, dtype=float).ravel()
    edges = np.histogram_bin_edges(x, bins=bins)
    counts, _ = np.histogram(x, bins=edges)
    total = counts.sum()

    if total == 0: 
        return np.array([1.0]), edges

    p = counts.astype(float) / total

    return p, edges

def joint_p_hist(x, y, bins='fd', base=e, edge_x=None, edge_y=None):
    r"""
        Compute the joint probability distribution P_xy from histogram.

        Params: 

        x, y: array_like input data
        edge_x, edge_y: predefined edges of hist_x and hist_y. 
            Specify edges to ensure consistency. 
    """

    x = np.asarray(x, dtype=float).ravel()
    y = np.asarray(y, dtype=float).ravel()

    if edge_x is None: 
        edge_x = np.histogram_bin_edges(x, bins=bins)
    if edge_y is None: 
        edge_y = np.histogram_bin_edges(y, bins=bins)

    counts, _, _ = np.hisogram2d(x, y, bins=(edge_x, edge_y))
    total = counts.sum()

    if total == 0: 
        return np.array([[1.0]], edge_x, edge_y)

    p = counts.astype(float) / total

    return p, edge_x, edge_y

def H_from_p(p, base=e): 
    r"""
        Compute the shannon entropy given a probability distribution p. 
    """
    p = np.asarray(p, dtype=float)
    p = p[p > 0]

    if p.size == 0: 
        return 0.0
        
    return -np.sum(p * (np.log(p) / np.log(base)))

In [58]:
def delay_embed(time_series, embedding_dimension, time_delay):
    r"""
        Delay-embed a 1-dimensional seqeunce (time series). Returns R^{N-t(d-1) x d} delay embedding. 

        time_series: array-like input data
        embedding_dimension (d), time_delay (tau): delay-embedding parameters. 

        Original 1-d data transformed as [[x_0, x_t, ..., x_t(d-1)], ....]
    """
    N = len(time_series)
    num_vecs = N - (embedding_dimension - 1) * time_delay
    embedded = np.zeros((num_vecs, embedding_dimension))
    for i in range(num_vecs):
        for j in range(embedding_dimension):
            embedded[i, j] = time_series[i + j * time_delay]

    # delay_matrix = num_vecs**(-0.5)*embedded
    
    return embedded

def rolling_cor(x, y, dim, delay): 
    r"""
        Compute the rolling correlation between two delay-embedded time series. 
    """
    x_embed = delay_embed(x, dim, delay)
    y_embed = delay_embed(y, dim, delay)
    cors = []
    for i in range(len(x_embed)): 
        cor_mat = np.corrcoef(x_embed[i], y_embed[i])
        cors.append(cor_mat[0,1])

    return np.array(cors)

# Old --------
def shannon_entropy(x, bins='fd', base=e, from_probs=False): 
    x = np.asarray(x, dtype=float)

    if from_probs: 
        p = x/x.sum()
    else: 
        edges = np.histogram_bin_edges(x, bins=bins)
        counts, _ = np.histogram(x, bins=edges)
        p = counts / counts.sum()

    p = [p > 0]
    H = -np.sum(p * (np.log(p) / np.log(base)))

    return H

def joint_entropy(x, y, bins='fd', base=e, from_probs=False):
    x, y = np.asarray(x, dtype=float), np.asarray(y, dtype=float)

    if prob_probs: 
        p_xy = x / x.sum()
    else: 
        edges_x = np.histogram_bin_edges(x, bins=bins)
        edges_y = np.histogram_bin_edges(y, bins=bins)
        counts, _, _ = np.histogram2d(x, y, bins=[edges_x, edges_y]) 
        p_xy = counts / counts.sum()

    p_xy = p_xy.flatten()
    p_xy = p_xy[p_xy > 0]

    H_xy = -np.sum(p_xy * (np.log(p_xy) / np.log(base)))
    
    return Hxy

def conditional_entropy(x, y, bins='fd', base=e, from_probs=False): 
    H_xy = join_entropy(x, y, bins=bins, base=base, from_probs=from_probs)
    H_y = shannon_entropy(y, bins=bins, base=base, from_probs=from_probs)

    return H_xy - H_y

def mutual_info(x, y, bins='fd', base=e, from_probs=False): 
    H_x = shannon_entropy(x, bins=bins, base=base, from_probs=from_probs)
    H_y = shannon_entropy(y, bins=bins, base=base, from_probs=from_probs)
    H_xy = join_entropy(x, y, bins=bins, base=base, from_probs=from_probs)

    return H_x + H_y - H_xy

    
# mutual information estimate based on historgram
def ami(x, max_lag, bins='fd', base=e):
    x = np.asarray(x, dtype=float)
    N = x.size
    lags = np.arange(1, max_lag+1)
    ami = np.empty_like(lags, dtype=float)

    Hx = shannon_entropy(x, bins=bins, base=base)

    for idx, tau in enumerate(lags): 
        x1, x2 = x[:-tau], x[tau:]
        
    return lags, ami
# ------------

def delayPCA(time_series, delay_dim, delay, pca_dim): 
    ts_emb = delay_embed(time_series, delay_dim, delay)
    pca = PCA(pca_dim).fit(ts_emb)
    ts_delaypca = pca.transform(ts_emb) 

    return ts_delaypca, pca

def project_pca(data, pca, numcomps=2): 
    x_proj = (data-pca.mean_) @ pca.components_[0:numcomps].T
    interval = np.arange(len(x_proj))
    if numcomps == 2: 
        plt.scatter(x_proj[:,0], x_proj[:,1], c=interval, cmap='viridis')
        plt.xlabel("PC 1"); plt.ylabel("PC 2")
    elif numcomps == 3: 
        fig = plt.figure()
        ax = fig.add_subplot(111, projection='3d')
        ax.scatter(x_proj[:,0], x_proj[:,1], x_proj[:,2], c=interval, cmap='viridis', **scatter_kw)
        ax.set_xlabel("PC 1"); ax.set_ylabel("PC 2"); ax.set_zlabel("PC 3")
    plt.title(f"Projection onto first {numcomps} PCs")
    plt.tight_layout(); plt.show()
        
    return x_proj

def update_delayPCA(oldata, newobs, dim_delay, delay, dim_pca): 
    # make sure dim_pca is less than dim_delay
    # this is a horrible piece of code; remind myself to fix this
    assert dim_delay > dim_pca, "PCA dim must be less than delay dim"
    
    pca = PCA(dim_pca)
    oldata_emb = delay_embed(oldata, dim_delay, delay)
    old_pca = pca.fit(oldata_emb) 
    
    newdata = np.hstack((oldata, newobs))
    newdata_emb = delay_embed(newdata, dim_delay, delay)
    new_pca = pca.fit(newdata_emb)
    newdata_delaypca = new_pca.transform(newdata_emb)
    newdata_oldbasis = old_pca.transform(newdata_emb)
    
    return old_pca, new_pca, newdata_delaypca, newdata_oldbasis

def return_data_stats(data, dim, delay, r): 
    mean, std, ent, cordim = np.mean(data), np.std(data), approx_entropy(data, dim, delay, r), cor_dim(data, dim, delay, r)
    print(f"Data Mean: {mean} Data Stdev: {std} \nData Entropy: {ent} Data Correlation Dim: {cordim}")

    return mean, std, ent, cordim

def pca_diff_stats(pca1, pca2, ncomps): 
    angdiff = cossim(pca1.components_[:ncomps], pca2.components_[:ncomps])
    meandiff = np.abs(pca1.mean_ - pca2.mean_)
    print(f"Mean Difference: {meandiff} Cosine Similarity: {angdiff}")
    plt.figure(figsize=(8,8))
    plt.plot(pca1.explained_variance_, label=f'{pca1} Eigenvalues')
    plt.plot(pca2.explained_variance_, label=f'{pca2} Eigenvalues')
    plt.legend()
    plt.show()

    return angdiff, meandiff

def compare_pcas(pca1, pca2, k, X1=None, X2=None):
    from scipy.linalg import subspace_angles
    U1, U2 = pca1.components_[:k].T, pca2.components_[:k].T
    ang = subspace_angles(U1, U2)
    subspace_dist = np.linalg.norm(np.sin(ang))
    var_gap = (pca1.explained_variance_ratio_[:k].cumsum() -
               pca2.explained_variance_ratio_[:k].cumsum())[-1]
    overlap_sv = np.linalg.svd(U1.T @ U2, compute_uv=False)

    results = dict(
        principal_angles_deg=np.degrees(ang),
        grassmann_dist=subspace_dist,
        cum_variance_gap=var_gap,
        overlap_singular_vals=overlap_sv,
    )
    if X1 is not None and X2 is not None:
        results["recon_MSE_1to2"] = pca_cross_recon_loss(pca1, pca2, X2)
        results["recon_MSE_2to1"] = pca_cross_recon_loss(pca2, pca1, X1)
    return results

def pca_recon_loss(X, pca, k): 
    Z = pca.transform(X)
    Z[:, k:] = 0
    X_hat = pca.inverse_transform(Z)

    mse = np.mean((X-X_hat)**2)
    per_sample = ((X-X_hat)**2).mean(axis=1)
    per_feature = ((X-X_hat)**2).mean(axis=0)
    var_j = X.var(axis=0, ddof=0)
    r2 = 1.0 - per_feature/var_j

    return mse, r2, X_hat

# needs to be fixed
def pca_cross_recon_loss(X, pca_enc, pca_dec, k): #also use it two compare subspace alignment
    # k number of prin. comp.
    B_enc, mu_enc = pca_enc.components_[:k].T, pca_enc.mean_
    B_dec, mu_dec = pca_dec.components_[:k].T, pca_dec.mean_

    Z_enc = (X - mu_enc) @ B_enc

    U, _, Vt = np.linalg.svd(B_enc.T @ B_dec, full_matrices = False)
    R = U @ Vt

    Z_dec = Z_enc @ R

    X_hat = Z_dec @ B_dec.T + mu_dec

    mse = np.mean((X-X_hat)**2)
    per_sample = ((X-X_hat)**2).mean(axis=1)
    per_feature = ((X-X_hat)**2).mean(axis=0)
    var_j = X.var(axis=0, ddof=0)
    r2 = 1.0 - per_feature/var_j

    return mse, r2, X_hat

In [64]:
# def nearest_nbr(data): 
#     n, d = data.shape
#     M_idx = np.empty(N, dtype=int)
#     M_dist = np.empty(N, dtype=float)
#     for i in range(N):
#         best_dist = np.inf
#         best_idx = -1
#         x_i = data[i]
#         for j in range(N): 
#             if i == j: 
#                 continue
#             dist = np.dot(x_i - data[j], x_i - data[j])
#             if dist < best_dist: 
#                 best_dist = dist
#                 best_idx = j
#         M_idx[i] = best_idx
#         M_dist[i] = best_dist ** 0.5

#     return dist, M_idx 

# def nearest_nbr(X): 
#     N, d = X.shape
#     D = np.full((N,N), np.inf)
#     for i in range(N): 
#         for j in range(N): 
#             if i == j: 
#                 continue 
#             else: 
#                 D[i][j] = np.sqrt(np.dot(X[i]-X[j], X[i]-X[j])) 
                
#     M_idx = np.argmin(D, axis=1)
#     D_min = D[np.arange(N), M_idx]
#     return D_min, M_idx

def nearest_nbr(X): 
    diff = X[:, None, :] - X[None, :, :]
    D2 = np.sum(diff*diff, axis=-1)

    np.fill_diagonal(D2, np.inf)

    idx_nn = np.argmin(D2, axis=1)
    dists = np.sqrt(D2[np.arange(X.shape[0]), idx_nn])

    return dists, idx_nn
    
def FalseNN(data, max_dim, delay, Atol, Rtol): # false nearest neighbor used to determine optimal embedding dimension
    # assert len(data) > 
    data = np.asarray(data).ravel()
    stdev = np.std(data)
    if stdev == 0: 
        raise ValueError("Zero Standard Deviation")

    fnn_ptg = []
    
    for m in range(1, max_dim): 
        # emb_m = delay_embed(data, m, delay)
        emb_m1 = delay_embed(data, m+1, delay)
        emb_m = emb_m1[:, :-1]  

        d_m, idx_nn = nearest_nbr(emb_m)

        new_coord, new_coord_nn = emb_m1[:, -1], emb_m1[idx_nn, -1]
        rel_growth = np.abs(new_coord - new_coord_nn) / d_m
        cond_rel = rel_growth > Rtol

        d_m1 = np.linalg.norm(emb_m1 - emb_m1[idx_nn], axis=1)
        cond_abs = (d_m1/stdev) > Atol

        fnn_ptg.append(100.0 * np.mean(cond_rel | cond_abs))
        

    return np.array(fnn_ptg)

def lyap_exp(series, dim, delay, theiler_window=0, horizon=100, fs=1.0): 
    """
        Calculate the largest Laypunov exponent of a time series using Rosenstein. 

        Params-----

        series (1d array_like): 1d input time series data
        dim, delay (int): delay embedding params
        theiler_window (int): window to exclude temporally close neighbors. 
    """
    traj = delay_embed(series, dim, delay)
    

In [7]:
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.optim as optim

class MLP(nn.Module): 
    def __init__(self, num_input, num_hidden, num_output): 
        super(MLP, self).__init__()
        layers = []
        for i in range(len(num_hidden)): 
            if i==0: 
                layers.append(nn.Linear(num_input, num_hidden[i]))
            else: 
                layers.append(nn.Linear(num_hidden[i-1], num_hidden[i]))
            layers.append(nn.ReLU())
        self.hidden = nn.Sequential(*layers)
        self.output = nn.Linear(num_hidden[-1], num_output)
        
    def forward(self,x): 
        x = self.hidden(x)
        x = self.output(x)
        return x